Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move SeedProvider and AbstractNetworkTopologySnitch to InetAddressAndPort for forward compatibility #519

Open
wants to merge 13 commits into
base: palantir-cassandra-2.2.18
Choose a base branch
from
14 changes: 11 additions & 3 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.*;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;

Expand Down Expand Up @@ -295,6 +295,8 @@ else if (config.rpc_interface != null)
public static void applyConfig(Config config) throws ConfigurationException
{
conf = config;

InetAddressAndPort.initializeDefaultPort(getSSLStoragePort());
Comment on lines +298 to +299
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that on trunk the default port is getStoragePort() so 7000. I'm not sure if this matters.


if (conf.commitlog_sync == null)
{
Expand Down Expand Up @@ -731,7 +733,8 @@ else if (conf.num_tokens > MAX_NUM_TOKENS)
if (Boolean.getBoolean("palantir_cassandra.single_node_cluster"))
{
try {
if (!seedProvider.getSeeds().equals(ImmutableList.of(InetAddress.getLocalHost())))
List<InetAddressAndPort> seeds = seedProvider.getSeeds();
if (seeds.size() != 1 || !Iterables.getOnlyElement(seeds).address.equals(InetAddress.getLocalHost()))
throw new ConfigurationException("Unexpected seed list when single_node_cluster flag is set to true."
+ " For a single node Cassandra cluster, the only seed should be localhost", false);
} catch (UnknownHostException e) {
Expand Down Expand Up @@ -1355,7 +1358,12 @@ public static String getPersistentSettingsLocation() {

public static Set<InetAddress> getSeeds()
{
return ImmutableSet.<InetAddress>builder().addAll(seedProvider.getSeeds()).build();
ImmutableSet.Builder<InetAddress> builder = ImmutableSet.builder();
for (InetAddressAndPort seed : seedProvider.getSeeds())
{
builder.add(seed.address);
}
return builder.build();
}

public static Set<InetAddress> getAllHosts()
Expand Down
7 changes: 4 additions & 3 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.io.ByteStreams;
import com.palantir.cassandra.db.CompactionsInProgressFlusher;

import org.apache.cassandra.locator.InetAddressAndPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -666,12 +667,12 @@ public static InetAddress getPreferredIP(InetAddress ep)
/**
* Return a map of IP addresses containing a map of dc and rack info
*/
public static Map<InetAddress, Map<String,String>> loadDcRackInfo()
public static Map<InetAddressAndPort, Map<String, String>> loadDcRackInfo()
{
Map<InetAddress, Map<String, String>> result = new HashMap<>();
Map<InetAddressAndPort, Map<String, String>> result = new HashMap<>();
for (UntypedResultSet.Row row : executeInternal("SELECT peer, data_center, rack from system." + PEERS))
{
InetAddress peer = row.getInetAddress("peer");
InetAddressAndPort peer = InetAddressAndPort.getByAddress(row.getInetAddress("peer"));
if (row.has("data_center") && row.has("rack"))
{
Map<String, String> dcRack = new HashMap<>();
Expand Down
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/gms/Gossiper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.util.concurrent.Uninterruptibles;

import com.palantir.cassandra.cvim.CrossVpcIpMappingHandshaker;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.ExecutorUtils;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
Expand Down Expand Up @@ -858,6 +859,12 @@ protected long getExpireTimeForEndpoint(InetAddress endpoint)
return storedTime == null ? computeExpireTime() : storedTime;
}

public EndpointState getEndpointStateForEndpoint(InetAddressAndPort ep)
{
return getEndpointStateForEndpoint(ep.address);
}

@Deprecated
public EndpointState getEndpointStateForEndpoint(InetAddress ep)
{
return endpointStateMap.get(ep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,24 @@ public abstract class AbstractNetworkTopologySnitch extends AbstractEndpointSnit
* @param endpoint a specified endpoint
* @return string of rack
*/
abstract public String getRack(InetAddress endpoint);
abstract public String getRack(InetAddressAndPort endpoint);

public String getRack(InetAddress endpoint)
{
return getRack(InetAddressAndPort.getByAddress(endpoint));
}

/**
* Return the data center for which an endpoint resides in
* @param endpoint a specified endpoint
* @return string of data center
*/
abstract public String getDatacenter(InetAddress endpoint);
abstract public String getDatacenter(InetAddressAndPort endpoint);

public String getDatacenter(InetAddress endpoint)
{
return getDatacenter(InetAddressAndPort.getByAddress(endpoint));
}

public int compareEndpoints(InetAddress address, InetAddress a1, InetAddress a2)
{
Expand Down
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/locator/CloudstackSnitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class CloudstackSnitch extends AbstractNetworkTopologySnitch
protected static final Logger logger = LoggerFactory.getLogger(CloudstackSnitch.class);
protected static final String ZONE_NAME_QUERY_URI = "/latest/meta-data/availability-zone";

private Map<InetAddress, Map<String, String>> savedEndpoints;
private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;

private static final String DEFAULT_DC = "UNKNOWN-DC";
private static final String DEFAULT_RACK = "UNKNOWN-RACK";
Expand All @@ -83,9 +83,9 @@ public CloudstackSnitch() throws IOException, ConfigurationException
csZoneRack = zone_parts[2];
}

public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return csZoneRack;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
Expand All @@ -99,9 +99,9 @@ public String getRack(InetAddress endpoint)
return state.getApplicationState(ApplicationState.RACK).value;
}

public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return csZoneDc;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.DC) == null)
Expand Down
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/locator/Ec2Snitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch
protected static final String ZONE_NAME_QUERY_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone";
private static final String DEFAULT_DC = "UNKNOWN-DC";
private static final String DEFAULT_RACK = "UNKNOWN-RACK";
private Map<InetAddress, Map<String, String>> savedEndpoints;
private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
protected String ec2zone;
protected String ec2region;

Expand Down Expand Up @@ -92,9 +92,9 @@ String awsApiCall(String url) throws IOException, ConfigurationException
}
}

public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return ec2zone;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
Expand All @@ -108,9 +108,9 @@ public String getRack(InetAddress endpoint)
return state.getApplicationState(ApplicationState.RACK).value;
}

public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return ec2region;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.DC) == null)
Expand Down
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/locator/GoogleCloudSnitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class GoogleCloudSnitch extends AbstractNetworkTopologySnitch
protected static final String ZONE_NAME_QUERY_URL = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
private static final String DEFAULT_DC = "UNKNOWN-DC";
private static final String DEFAULT_RACK = "UNKNOWN-RACK";
private Map<InetAddress, Map<String, String>> savedEndpoints;
private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
protected String gceZone;
protected String gceRegion;

Expand Down Expand Up @@ -94,9 +94,9 @@ String gceApiCall(String url) throws IOException, ConfigurationException
}
}

public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return gceZone;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.RACK) == null)
Expand All @@ -110,9 +110,9 @@ public String getRack(InetAddress endpoint)
return state.getApplicationState(ApplicationState.RACK).value;
}

public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return gceRegion;
EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
if (state == null || state.getApplicationState(ApplicationState.DC) == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class GossipingPropertyFileSnitch extends AbstractNetworkTopologySnitch//
private final boolean preferLocal;
private final AtomicReference<ReconnectableSnitchHelper> snitchHelperReference;

private Map<InetAddress, Map<String, String>> savedEndpoints;
private Map<InetAddressAndPort, Map<String, String>> savedEndpoints;
private static final String DEFAULT_DC = "UNKNOWN_DC";
private static final String DEFAULT_RACK = "UNKNOWN_RACK";

Expand Down Expand Up @@ -84,9 +84,9 @@ private static SnitchProperties loadConfiguration() throws ConfigurationExceptio
* @param endpoint the endpoint to process
* @return string of data center
*/
public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return myDC;

EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
Expand All @@ -112,9 +112,9 @@ public String getDatacenter(InetAddress endpoint)
* @param endpoint the endpoint to process
* @return string of rack
*/
public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
if (endpoint.equals(FBUtilities.getBroadcastAddressAndPort()))
return myRack;

EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>,
//these when it loads the config. A lot of unit tests won't end up loading DatabaseDescriptor.
//Tools that might use this class also might not load database descriptor. Those tools are expected
//to always override the defaults.
static volatile int defaultPort = 7000;
static volatile int defaultPort = 7001;

public final InetAddress address;
public final byte[] addressBytes;
Expand Down Expand Up @@ -191,4 +191,9 @@ public static void initializeDefaultPort(int port)
{
defaultPort = port;
}

public static InetAddressAndPort getLocalHost()
{
return FBUtilities.getLocalAddressAndPort();
}
}
12 changes: 6 additions & 6 deletions src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ private static String[] getRawEndpointInfo(InetAddress endpoint)
* @param endpoint the endpoint to process
* @return string of data center
*/
public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpoint)
{
String[] info = getEndpointInfo(endpoint);
assert info != null : "No location defined for endpoint " + endpoint;
String[] info = getEndpointInfo(endpoint.address);
assert info != null : "No location defined for endpoint " + endpoint.address;
return info[0];
}

Expand All @@ -131,10 +131,10 @@ public String getDatacenter(InetAddress endpoint)
* @param endpoint the endpoint to process
* @return string of rack
*/
public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpoint)
{
String[] info = getEndpointInfo(endpoint);
assert info != null : "No location defined for endpoint " + endpoint;
String[] info = getEndpointInfo(endpoint.address);
assert info != null : "No location defined for endpoint " + endpoint.address;
return info[1];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
*/
public class RackInferringSnitch extends AbstractNetworkTopologySnitch
{
public String getRack(InetAddress endpoint)
public String getRack(InetAddressAndPort endpoint)
{
return Integer.toString(endpoint.getAddress()[2] & 0xFF, 10);
return Integer.toString(endpoint.address.getAddress()[2] & 0xFF, 10);
}

public String getDatacenter(InetAddress endpoint)
public String getDatacenter(InetAddressAndPort endpoint)
{
return Integer.toString(endpoint.getAddress()[1] & 0xFF, 10);
return Integer.toString(endpoint.address.getAddress()[1] & 0xFF, 10);
}
}
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/locator/SeedProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@

public interface SeedProvider
{
List<InetAddress> getSeeds();
List<InetAddressAndPort> getSeeds();
}
6 changes: 3 additions & 3 deletions src/java/org/apache/cassandra/locator/SimpleSeedProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class SimpleSeedProvider implements SeedProvider

public SimpleSeedProvider(Map<String, String> args) {}

public List<InetAddress> getSeeds()
public List<InetAddressAndPort> getSeeds()
{
Config conf;
try
Expand All @@ -47,12 +47,12 @@ public List<InetAddress> getSeeds()
throw new AssertionError(e);
}
String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1);
List<InetAddress> seeds = new ArrayList<InetAddress>(hosts.length);
List<InetAddressAndPort> seeds = new ArrayList<>(hosts.length);
for (String host : hosts)
{
try
{
seeds.add(InetAddress.getByName(host.trim()));
seeds.add(InetAddressAndPort.getByName(host.trim()));
}
catch (UnknownHostException ex)
{
Expand Down
19 changes: 19 additions & 0 deletions src/java/org/apache/cassandra/utils/FBUtilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,25 @@ public static InetAddress getLocalAddress()
return localInetAddress;
}

/**
* Get the broadcast address and port for intra-cluster storage traffic. This the address to advertise that uniquely
* identifies the node and is reachable from everywhere. This is the one you want unless you are trying to connect
* to the local address specifically.
*/
public static InetAddressAndPort getBroadcastAddressAndPort()
{
return InetAddressAndPort.getByAddress(getBroadcastAddress());
}

/**
* The address and port to listen on for intra-cluster storage traffic (not client). Use this to get the correct
* stuff to listen on for intra-cluster communication.
*/
public static InetAddressAndPort getLocalAddressAndPort()
{
return InetAddressAndPort.getByAddress(getLocalAddress());
}

public static InetAddress getBroadcastAddress()
{
if (broadcastInetAddress == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private String maybeGetFromEndpointState(String current, InetAddressAndPort endp
{
savedEndpoints = new HashMap<>();
int storage_port = Config.getOverrideLoadConfig().get().storage_port;
for (Map.Entry<InetAddress, Map<String, String>> entry : SystemKeyspace.loadDcRackInfo().entrySet())
for (Map.Entry<InetAddressAndPort, Map<String, String>> entry : SystemKeyspace.loadDcRackInfo().entrySet())
{
savedEndpoints.put(InetAddressAndPort.getByAddressOverrideDefaults(endpoint.address, storage_port),
entry.getValue());
Expand Down
Loading