Skip to content

Commit

Permalink
Log changes to TokenMetadata (#561)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickbar01234 authored Nov 1, 2024
1 parent ebf2c40 commit c441c7c
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 7 deletions.
98 changes: 98 additions & 0 deletions src/java/com/palantir/cassandra/utils/MapUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.utils;

import java.net.InetAddress;
import java.util.*;
import java.util.stream.Collectors;

import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;

import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.PendingRangeMaps;
import org.apache.cassandra.utils.Pair;


public final class MapUtils
{
private MapUtils()
{
}

/**
* Returns a list of endpoints where its token range intersect with any token ranges in the input list.
*/
public static Set<InetAddress> intersection(Multimap<Range<Token>, InetAddress> addressRanges, Collection<Range<Token>> tokenRanges)
{
Set<InetAddress> intersection = new HashSet<>();

for (Map.Entry<Range<Token>, InetAddress> entry : addressRanges.entries())
{
for (Range<Token> range : tokenRanges)
{
if (entry.getKey().intersects(range))
{
intersection.add(entry.getValue());
}
}
}

return intersection;
}

/**
* Merge the pending ranges per endpoint and return as a sorted list.
*/
public static Map<InetAddress, List<Range<Token>>> coalesce(PendingRangeMaps pendingRangeMaps)
{
Map<InetAddress, List<Range<Token>>> coalesced = new HashMap<>();

for (Map.Entry<Range<Token>, List<InetAddress>> entry : pendingRangeMaps)
{
for (InetAddress endpoint : entry.getValue())
{
coalesced.computeIfAbsent(endpoint, _k -> new ArrayList<>()).add(entry.getKey());
}
}
coalesced.replaceAll((_k, tokenRanges) -> sort(tokenRanges));

return coalesced;
}

/**
* Similar to {@link MapUtils#coalesce(Multimap)}
*/
public static Map<InetAddress, List<Token>> coalesce(Multimap<InetAddress, Token> endpointToken)
{
Map<InetAddress, List<Token>> coalesced = new HashMap<>();

for (InetAddress endpoint : endpointToken.keys())
{
coalesced.put(endpoint, sort(endpointToken.get(endpoint)));
}

return coalesced;
}

private static <T> List<T> sort(Collection<T> collection) {
return collection.stream().sorted().collect(Collectors.toList());
}
}
67 changes: 61 additions & 6 deletions src/java/org/apache/cassandra/locator/TokenMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import com.google.common.collect.*;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.palantir.cassandra.utils.MapUtils;
import com.palantir.logsafe.SafeArg;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
Expand Down Expand Up @@ -97,6 +101,8 @@ public class TokenMetadata

private final Topology topology;

private boolean shouldLogTokenChanges;

private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
{
public int compare(InetAddress o1, InetAddress o2)
Expand All @@ -112,13 +118,21 @@ public TokenMetadata()
{
this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
HashBiMap.<InetAddress, UUID>create(),
new Topology());
new Topology(), false);
}

public TokenMetadata(boolean shouldLogTokenChanges)
{
this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp),
HashBiMap.<InetAddress, UUID>create(),
new Topology(), shouldLogTokenChanges);
}

private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology)
private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, boolean shouldLogTokenChanges)
{
this.tokenToEndpointMap = tokenToEndpointMap;
this.topology = topology;
this.shouldLogTokenChanges = shouldLogTokenChanges;
endpointToHostIdMap = endpointsMap;
sortedTokens = sortTokens();
}
Expand Down Expand Up @@ -185,6 +199,11 @@ public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens)
if (endpointTokens.isEmpty())
return;

if (StorageService.instance.hasJoined() && shouldLogTokenChanges)
{
logger.info("updateNormalTokens", SafeArg.of("endpointTokens", MapUtils.coalesce(endpointTokens)));
}

publicLock.readLock().lock();
lock.writeLock().lock();
try
Expand Down Expand Up @@ -654,7 +673,7 @@ public TokenMetadata cloneOnlyTokenMap()
{
return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
HashBiMap.create(endpointToHostIdMap),
new Topology(topology));
new Topology(topology), false);
}
finally
{
Expand Down Expand Up @@ -904,7 +923,7 @@ public void calculatePendingRanges(AbstractReplicationStrategy strategy, String
/**
* @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String)
*/
private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
private PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
TokenMetadata metadata,
BiMultiValMap<Token, InetAddress> bootstrapTokens,
Set<InetAddress> leavingEndpoints,
Expand Down Expand Up @@ -934,22 +953,27 @@ private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrate
}
}

logLeavingEndpointDifference(strategy.keyspaceName, leavingEndpoints, newPendingRanges);

// At this stage newPendingRanges has been updated according to leave operations. We can
// now continue the calculation by checking bootstrapping nodes.

// For each of the bootstrapping nodes, simply add and remove them one by one to
// allLeftMetadata and check in between what their ranges would be.
Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
Multimap<Range<Token>, InetAddress> addressRangesSnapshot = strategy.getRangeAddresses(allLeftMetadata);
for (InetAddress endpoint : bootstrapAddresses.keySet())
{
Collection<Token> tokens = bootstrapAddresses.get(endpoint);

allLeftMetadata.updateNormalTokens(tokens, endpoint);
for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
Collection<Range<Token>> tokenRangeForEndpoint = strategy.getAddressRanges(allLeftMetadata).get(endpoint);
for (Range<Token> range : tokenRangeForEndpoint)
{
newPendingRanges.addPendingRange(range, endpoint);
}
allLeftMetadata.removeEndpoint(endpoint);

logBootstrapDifference(strategy.keyspaceName, endpoint, addressRangesSnapshot, tokenRangeForEndpoint);
}

// At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
Expand Down Expand Up @@ -1322,6 +1346,37 @@ public void invalidateCachedRings()
cachedTokenMap.set(null);
}

private void logLeavingEndpointDifference(String keyspace, Set<InetAddress> leavingEndpoints, PendingRangeMaps pendingRangeMaps)
{
if (shouldLogTokenChanges && !leavingEndpoints.isEmpty())
{
Map<InetAddress, List<Range<Token>>> endpointToPendingRange = MapUtils.coalesce(pendingRangeMaps);
Map<InetAddress, Integer> endpointToPendingRangeCount = endpointToPendingRange
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().size()));

logger.info("Pending ranges for leaving endpoints",
SafeArg.of("keyspace", keyspace),
SafeArg.of("leavingEndpoints", leavingEndpoints),
SafeArg.of("pendingRangePerEndpoint", endpointToPendingRangeCount));
logger.debug("PendingRangeMaps for leaving endpoints", endpointToPendingRange);
}
}

private void logBootstrapDifference(String keyspace, InetAddress endpoint, Multimap<Range<Token>, InetAddress> snapshot, Collection<Range<Token>> tokenRangeForEndpoint)
{
if (shouldLogTokenChanges)
{
logger.info("Pending ranges for bootstrapping endpoint",
SafeArg.of("keyspace", keyspace),
SafeArg.of("bootstrapingEndpoint", endpoint),
SafeArg.of("previousOwners", MapUtils.intersection(snapshot, tokenRangeForEndpoint)),
SafeArg.of("pendingRangeCount", tokenRangeForEndpoint.size()));
logger.debug("Pending range for endpoint", SafeArg.of("pendingRange", tokenRangeForEndpoint.stream().sorted().collect(Collectors.toList())));
}
}

/**
* Tracks the assignment of racks and endpoints in each datacenter for all the "normal" endpoints
* in this TokenMetadata. This allows faster calculation of endpoints in NetworkTopologyStrategy.
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private static int getRingDelay()
}

/* This abstraction maintains the token/endpoint metadata information */
private TokenMetadata tokenMetadata = new TokenMetadata();
private TokenMetadata tokenMetadata = new TokenMetadata(true);

public volatile VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(getPartitioner());

Expand Down
120 changes: 120 additions & 0 deletions test/unit/com/palantir/cassandra/utils/MapUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.cassandra.utils;

import java.net.InetAddress;
import java.util.*;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.locator.PendingRangeMaps;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.junit.Before;
import org.junit.Test;

import org.apache.cassandra.dht.Token;

import static org.assertj.core.api.Assertions.assertThat;

public final class MapUtilsTest
{
private InetAddress ep1;

private InetAddress ep2;

private Multimap<Range<Token>, InetAddress> addressRange;

@Before
public void before() throws Exception
{
addressRange = HashMultimap.create();

ep1 = InetAddress.getByName("127.0.0.1");
ep2 = InetAddress.getByName("127.0.0.2");
}

@Test
public void intersection_noIntersectionReturnsEmptySet()
{
putAll(addressRange, rangeOf("0", "8"), ep1);
assertThat(MapUtils.intersection(addressRange, rangesOf(rangeOf("10", "12")))).isEmpty();
}

@Test
public void intersection_overlappingIntervalReturnsNonEmptySet()
{
putAll(addressRange, rangeOf("0", "10"), ep1);
putAll(addressRange, rangeOf("15", "30"), ep2);
assertThat(MapUtils.intersection(addressRange, rangesOf(rangeOf("4", "18"))))
.containsExactlyInAnyOrder(ep1, ep2);
}

@Test
public void coalesce_sortRanges()
{
PendingRangeMaps pendingRangeMaps = new PendingRangeMaps();
Range<Token> range1 = rangeOf("50", "60");
Range<Token> range2 = rangeOf("10", "20");
Range<Token> range3 = rangeOf("30", "32");
pendingRangeMaps.addPendingRange(range1, ep1);
pendingRangeMaps.addPendingRange(range2, ep1);
pendingRangeMaps.addPendingRange(range3, ep1);
assertThat(MapUtils.coalesce(pendingRangeMaps).keySet()).containsExactly(ep1);
assertThat(MapUtils.coalesce(pendingRangeMaps).get(ep1))
.isNotNull()
.containsExactly(range2, range3, range1);
}

@Test
public void coalesce_sortTokens()
{
Multimap<InetAddress, Token> endpointToken = HashMultimap.create();
putAll(endpointToken, ep1, token("f"), token("0"), token("a"));
Map<InetAddress, List<Token>> sortedEndpointToken = MapUtils.coalesce(endpointToken);
assertThat(sortedEndpointToken.keySet()).containsExactly(ep1);
assertThat(sortedEndpointToken.get(ep1))
.isNotNull()
.containsExactly(token("0"), token("a"), token("f"));
}

@SafeVarargs
private final <U, V> void putAll(Multimap<U, V> map, U key, V... tokens)
{
map.putAll(key, Arrays.asList(tokens));
}

@SafeVarargs
private final List<Range<Token>> rangesOf(Range<Token>... ranges)
{
return new ArrayList<>(Arrays.asList(ranges));
}

private Range<Token> rangeOf(String leftBound, String rightBound)
{
return new Range<>(new RandomPartitioner.BigIntegerToken(leftBound), new RandomPartitioner.BigIntegerToken(rightBound));
}

public static Token token(String key)
{
return StorageService.getPartitioner().getToken(ByteBufferUtil.bytes(key));
}
}

0 comments on commit c441c7c

Please sign in to comment.