Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move SeedProvider and AbstractNetworkTopologySnitch to InetAddressAndPort for forward compatibility #519

Open
wants to merge 13 commits into
base: palantir-cassandra-2.2.18
Choose a base branch
from
14 changes: 11 additions & 3 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.*;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;

Expand Down Expand Up @@ -295,6 +295,8 @@ else if (config.rpc_interface != null)
public static void applyConfig(Config config) throws ConfigurationException
{
conf = config;

InetAddressAndPort.initializeDefaultPort(getSSLStoragePort());
Comment on lines +298 to +299
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that on trunk the default port is getStoragePort() so 7000. I'm not sure if this matters.


if (conf.commitlog_sync == null)
{
Expand Down Expand Up @@ -731,7 +733,8 @@ else if (conf.num_tokens > MAX_NUM_TOKENS)
if (Boolean.getBoolean("palantir_cassandra.single_node_cluster"))
{
try {
if (!seedProvider.getSeeds().equals(ImmutableList.of(InetAddress.getLocalHost())))
List<InetAddressAndPort> seeds = seedProvider.getSeeds();
if (seeds.size() != 1 || !Iterables.getOnlyElement(seeds).address.equals(InetAddress.getLocalHost()))
throw new ConfigurationException("Unexpected seed list when single_node_cluster flag is set to true."
+ " For a single node Cassandra cluster, the only seed should be localhost", false);
} catch (UnknownHostException e) {
Expand Down Expand Up @@ -1355,7 +1358,12 @@ public static String getPersistentSettingsLocation() {

public static Set<InetAddress> getSeeds()
{
return ImmutableSet.<InetAddress>builder().addAll(seedProvider.getSeeds()).build();
ImmutableSet.Builder<InetAddress> builder = ImmutableSet.builder();
for (InetAddressAndPort seed : seedProvider.getSeeds())
{
builder.add(seed.address);
}
return builder.build();
}

public static Set<InetAddress> getAllHosts()
Expand Down
25 changes: 24 additions & 1 deletion src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.io.ByteStreams;
import com.palantir.cassandra.db.CompactionsInProgressFlusher;

import org.apache.cassandra.locator.InetAddressAndPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -666,7 +667,7 @@ public static InetAddress getPreferredIP(InetAddress ep)
/**
* Return a map of IP addresses containing a map of dc and rack info
*/
public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
public static Map<InetAddress, Map<String, String>> loadDcRackInfoLegacy()
{
Map<InetAddress, Map<String, String>> result = new HashMap<>();
for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS))
Expand All @@ -683,6 +684,28 @@ public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
return result;
}

/**
* Return a map of IP addresses containing a map of dc and rack info
*
* @apiNote Shim for plugin forward compatibility. Do not use internally.
*/
public static Map<InetAddressAndPort, Map<String, String>> loadDcRackInfo()
{
Map<InetAddressAndPort, Map<String, String>> result = new HashMap<>();
for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS))
{
InetAddressAndPort peer = InetAddressAndPort.getByAddress(row.getInetAddress("peer"));
if (row.has("data_center") && row.has("rack"))
{
Map<String, String> dcRack = new HashMap<>();
dcRack.put("data_center", row.getString("data_center"));
dcRack.put("rack", row.getString("rack"));
result.put(peer, dcRack);
}
}
return result;
}

/**
* Get release version for given endpoint.
* If release version is unknown, then this returns null.
Expand Down
9 changes: 9 additions & 0 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.util.concurrent.Uninterruptibles;

import com.palantir.cassandra.cvim.CrossVpcIpMappingHandshaker;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
Expand Down Expand Up @@ -858,6 +859,14 @@ protected long getExpireTimeForEndpoint(InetAddress endpoint)
return storedTime == null ? computeExpireTime() : storedTime;
}

/**
* @apiNote Shim for plugin forward compatibility. Do not use internally.
*/
public EndpointState getEndpointStateForEndpoint(InetAddressAndPort ep)
{
return getEndpointStateForEndpoint(ep.address);
}

public EndpointState getEndpointStateForEndpoint(InetAddress ep)
{
return endpointStateMap.get(ep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,30 @@ public abstract class AbstractNetworkTopologySnitch extends AbstractEndpointSnit
* @param endpoint a specified endpoint
* @return string of rack
*/
abstract public String getRack(InetAddress endpoint);
abstract public String getRack(InetAddressAndPort endpoint);

/**
* @apiNote Only override in tests!
*/
public String getRack(InetAddress endpoint)
{
return getRack(InetAddressAndPort.getByAddress(endpoint));
}

/**
* Return the data center for which an endpoint resides in
* @param endpoint a specified endpoint
* @return string of data center
*/
abstract public String getDatacenter(InetAddress endpoint);
abstract public String getDatacenter(InetAddressAndPort endpoint);

/**
* @apiNote Only override in tests!
*/
public String getDatacenter(InetAddress endpoint)
{
return getDatacenter(InetAddressAndPort.getByAddress(endpoint));
}

public int compareEndpoints(InetAddress address, InetAddress a1, InetAddress a2)
{
Expand Down
10 changes: 6 additions & 4 deletions src/java/org/apache/cassandra/locator/CloudstackSnitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,31 +83,33 @@ public CloudstackSnitch() throws IOException, ConfigurationException
csZoneRack = zone_parts[2];
}

public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpointAndPort)
{
InetAddress endpoint = endpointAndPort.address;
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return csZoneRack;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
{
if (savedEndpoints == null)
savedEndpoints = SystemKeyspace.loadDcRackInfo();
savedEndpoints = SystemKeyspace.loadDcRackInfoLegacy();
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("rack");
return DEFAULT_RACK;
}
return state.getApplicationState(ApplicationState.RACK).value;
}

public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpointAndPort)
{
InetAddress endpoint = endpointAndPort.address;
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return csZoneDc;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.DC) == null)
{
if (savedEndpoints == null)
savedEndpoints = SystemKeyspace.loadDcRackInfo();
savedEndpoints = SystemKeyspace.loadDcRackInfoLegacy();
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("data_center");
return DEFAULT_DC;
Expand Down
10 changes: 6 additions & 4 deletions src/java/org/apache/cassandra/locator/Ec2Snitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,31 +92,33 @@ String awsApiCall(String url) throws IOException, ConfigurationException
}
}

public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpointAndPort)
{
InetAddress endpoint = endpointAndPort.address;
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return ec2zone;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
{
if (savedEndpoints == null)
savedEndpoints = SystemKeyspace.loadDcRackInfo();
savedEndpoints = SystemKeyspace.loadDcRackInfoLegacy();
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("rack");
return DEFAULT_RACK;
}
return state.getApplicationState(ApplicationState.RACK).value;
}

public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpointAndPort)
{
InetAddress endpoint = endpointAndPort.address;
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return ec2region;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.DC) == null)
{
if (savedEndpoints == null)
savedEndpoints = SystemKeyspace.loadDcRackInfo();
savedEndpoints = SystemKeyspace.loadDcRackInfoLegacy();
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("data_center");
return DEFAULT_DC;
Expand Down
10 changes: 6 additions & 4 deletions src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,31 +94,33 @@ String gceApiCall(String url) throws IOException, ConfigurationException
}
}

public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpointAndPort)
{
InetAddress endpoint = endpointAndPort.address;
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return gceZone;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
{
if (savedEndpoints == null)
savedEndpoints = SystemKeyspace.loadDcRackInfo();
savedEndpoints = SystemKeyspace.loadDcRackInfoLegacy();
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("rack");
return DEFAULT_RACK;
}
return state.getApplicationState(ApplicationState.RACK).value;
}

public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpointAndPort)
{
InetAddress endpoint = endpointAndPort.address;
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return gceRegion;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.DC) == null)
{
if (savedEndpoints == null)
savedEndpoints = SystemKeyspace.loadDcRackInfo();
savedEndpoints = SystemKeyspace.loadDcRackInfoLegacy();
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("data_center");
return DEFAULT_DC;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ private static SnitchProperties loadConfiguration() throws ConfigurationExceptio
/**
* Return the data center for which an endpoint resides in
*
* @param endpoint the endpoint to process
* @param endpointAndPort the endpoint to process
* @return string of data center
*/
public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpointAndPort)
{
InetAddress endpoint = endpointAndPort.address;
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return myDC;

Expand All @@ -95,7 +96,7 @@ public String getDatacenter(InetAddress endpoint)
if (psnitch == null)
{
if (savedEndpoints == null)
savedEndpoints = SystemKeyspace.loadDcRackInfo();
savedEndpoints = SystemKeyspace.loadDcRackInfoLegacy();
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("data_center");
return DEFAULT_DC;
Expand All @@ -109,11 +110,12 @@ public String getDatacenter(InetAddress endpoint)
/**
* Return the rack for which an endpoint resides in
*
* @param endpoint the endpoint to process
* @param endpointAndPort the endpoint to process
* @return string of rack
*/
public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpointAndPort)
{
InetAddress endpoint = endpointAndPort.address;
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return myRack;

Expand All @@ -123,7 +125,7 @@ public String getRack(InetAddress endpoint)
if (psnitch == null)
{
if (savedEndpoints == null)
savedEndpoints = SystemKeyspace.loadDcRackInfo();
savedEndpoints = SystemKeyspace.loadDcRackInfoLegacy();
if (savedEndpoints.containsKey(endpoint))
return savedEndpoints.get(endpoint).get("rack");
return DEFAULT_RACK;
Expand Down
14 changes: 13 additions & 1 deletion src/java/org/apache/cassandra/locator/InetAddressAndPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>,
//these when it loads the config. A lot of unit tests won't end up loading DatabaseDescriptor.
//Tools that might use this class also might not load database descriptor. Those tools are expected
//to always override the defaults.
static volatile int defaultPort = 7000;
static volatile int defaultPort = 7001;

public final InetAddress address;
public final byte[] addressBytes;
Expand All @@ -65,6 +65,13 @@ private InetAddressAndPort(InetAddress address, byte[] addressBytes, int port)
this.addressBytes = addressBytes;
}

private InetAddressAndPort(InetAddress address)
{
this.address = address;
this.port = -1;
this.addressBytes = null;
}

private static void validatePortRange(int port)
{
if (port < 0 | port > 65535)
Expand Down Expand Up @@ -191,4 +198,9 @@ public static void initializeDefaultPort(int port)
{
defaultPort = port;
}

public static InetAddressAndPort getLocalHost()
{
return FBUtilities.getLocalAddressAndPort();
}
}
12 changes: 6 additions & 6 deletions src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ private static String[] getRawEndpointInfo(InetAddress endpoint)
* @param endpoint the endpoint to process
* @return string of data center
*/
public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpoint)
{
String[] info = getEndpointInfo(endpoint);
assert info != null : "No location defined for endpoint " + endpoint;
String[] info = getEndpointInfo(endpoint.address);
assert info != null : "No location defined for endpoint " + endpoint.address;
return info[0];
}

Expand All @@ -131,10 +131,10 @@ public String getDatacenter(InetAddress endpoint)
* @param endpoint the endpoint to process
* @return string of rack
*/
public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpoint)
{
String[] info = getEndpointInfo(endpoint);
assert info != null : "No location defined for endpoint " + endpoint;
String[] info = getEndpointInfo(endpoint.address);
assert info != null : "No location defined for endpoint " + endpoint.address;
return info[1];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
*/
public class RackInferringSnitch extends AbstractNetworkTopologySnitch
{
public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpoint)
{
return Integer.toString(endpoint.getAddress()[2] & 0xFF, 10);
return Integer.toString(endpoint.address.getAddress()[2] & 0xFF, 10);
}

public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpoint)
{
return Integer.toString(endpoint.getAddress()[1] & 0xFF, 10);
return Integer.toString(endpoint.address.getAddress()[1] & 0xFF, 10);
}
}
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/locator/SeedProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@

public interface SeedProvider
{
List<InetAddress> getSeeds();
List<InetAddressAndPort> getSeeds();
}
Loading