Skip to content

Commit

Permalink
Add interval gauge and don’t change IFailureDetector API
Browse files Browse the repository at this point in the history
  • Loading branch information
wi11dey authored and rhuffy committed Oct 24, 2024
1 parent 78a50e9 commit a24270f
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 109 deletions.
25 changes: 12 additions & 13 deletions src/java/com/palantir/cassandra/metrics/FailureDetectorMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,29 @@
import com.google.common.net.InetAddresses;

import com.codahale.metrics.Gauge;
import org.apache.cassandra.gms.FailureDetector;
import com.codahale.metrics.Histogram;
import org.apache.cassandra.gms.FailureDetector.ArrivalWindow;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;

public class FailureDetectorMetrics
{
public static void register(InetAddress ep)
public static void register(InetAddress ep, ArrivalWindow window)
{
Metrics.register(createMetricName(ep), new Gauge<Double>()
{
public Double getValue()
{
return FailureDetector.instance.getPhiValue(ep);
}
});
Metrics.register(createMetricName(ep, "phi"), (Gauge<Double>) window::getLastReportedPhi);
Metrics.register(createMetricName(ep, "interval_histogram"), new Histogram(window));
Metrics.register(createMetricName(ep, "last_interval"), (Gauge<Long>) window::getLastInterval);
}

public static void unregister(InetAddress ep)
{
Metrics.remove(createMetricName(ep));
Metrics.remove(createMetricName(ep, "phi"));
Metrics.remove(createMetricName(ep, "interval_histogram"));
Metrics.remove(createMetricName(ep, "last_interval"));
}

private static CassandraMetricsRegistry.MetricName createMetricName(InetAddress ep)
private static CassandraMetricsRegistry.MetricName createMetricName(InetAddress ep, String name)
{
String endpoint = InetAddresses.toAddrString(ep);
String groupName = FailureDetectorMetrics.class.getPackage().getName();
Expand All @@ -55,8 +54,8 @@ private static CassandraMetricsRegistry.MetricName createMetricName(InetAddress
mbeanName.append(groupName).append(":");
mbeanName.append("type=FailureDetector");
mbeanName.append(",endpoint=").append(endpoint);
mbeanName.append(",name=").append("phi");
mbeanName.append(",name=").append(name);

return new CassandraMetricsRegistry.MetricName(groupName, "FailureDetector", "phi", endpoint, mbeanName.toString());
return new CassandraMetricsRegistry.MetricName(groupName, "FailureDetector", name, endpoint, mbeanName.toString());
}
}
202 changes: 112 additions & 90 deletions src/java/org/apache/cassandra/gms/FailureDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,17 +273,21 @@ public void report(InetAddress ep)
// avoid adding an empty ArrivalWindow to the Map
heartbeatWindow = new ArrivalWindow(SAMPLE_SIZE);
heartbeatWindow.add(now, ep);
heartbeatWindow = arrivalSamples.putIfAbsent(ep, heartbeatWindow);
if (heartbeatWindow != null)
heartbeatWindow.add(now, ep);
FailureDetectorMetrics.register(ep);
ArrivalWindow previousHeartbeatWindow = arrivalSamples.putIfAbsent(ep, heartbeatWindow);
if (previousHeartbeatWindow != null)
{
// Another thread beat us to registering the ArrivalWindow.
previousHeartbeatWindow.add(now, ep);
heartbeatWindow = previousHeartbeatWindow;
}
FailureDetectorMetrics.register(ep, heartbeatWindow);
}
else
{
heartbeatWindow.add(now, ep);
}

if (logger.isTraceEnabled() && heartbeatWindow != null)
if (logger.isTraceEnabled())
logger.trace("Average for {} is {}", ep, heartbeatWindow.mean());
}

Expand Down Expand Up @@ -391,14 +395,110 @@ public String toString()
return sb.toString();
}

public double getPhiValue(InetAddress ep)
public static class ArrivalWindow implements Reservoir
{
ArrivalWindow arrivalWindow = arrivalSamples.get(ep);
if (arrivalWindow == null)
private static final Logger logger = LoggerFactory.getLogger(ArrivalWindow.class);
private long tLast = 0L;
private final ArrayBackedBoundedStats arrivalIntervals;
private double lastReportedPhi = Double.MIN_VALUE;

// in the event of a long partition, never record an interval longer than the rpc timeout,
// since if a host is regularly experiencing connectivity problems lasting this long we'd
// rather mark it down quickly instead of adapting
// this value defaults to the same initial value the FD is seeded with
private final long MAX_INTERVAL_IN_NANO = getMaxInterval();

ArrivalWindow(int size)
{
arrivalIntervals = new ArrayBackedBoundedStats(size);
}

private static long getMaxInterval()
{
String newvalue = System.getProperty("cassandra.fd_max_interval_ms");
if (newvalue == null)
{
return FailureDetector.INITIAL_VALUE_NANOS;
}
else
{
logger.info("Overriding FD MAX_INTERVAL to {}ms", newvalue);
return TimeUnit.NANOSECONDS.convert(Integer.parseInt(newvalue), TimeUnit.MILLISECONDS);
}
}

synchronized void add(long value, InetAddress ep)
{
assert tLast >= 0;
if (tLast > 0L)
{
long interArrivalTime = (value - tLast);
if (interArrivalTime <= MAX_INTERVAL_IN_NANO)
{
arrivalIntervals.add(interArrivalTime);
logger.trace("Reporting interval time of {} for {}", interArrivalTime, ep);
}
else
{
logger.debug("Ignoring interval time of {} for {}", interArrivalTime, ep);
}
}
else
{
// We use a very large initial interval since the "right" average depends on the cluster size
// and it's better to err high (false negatives, which will be corrected by waiting a bit longer)
// than low (false positives, which cause "flapping").
arrivalIntervals.add(FailureDetector.INITIAL_VALUE_NANOS);
}
tLast = value;
}

double mean()
{
return arrivalIntervals.mean();
}

// see CASSANDRA-2597 for an explanation of the math at work here.
double phi(long tnow)
{
assert arrivalIntervals.mean() > 0 && tLast > 0; // should not be called before any samples arrive
long t = tnow - tLast;
lastReportedPhi = t / mean();
return lastReportedPhi;
}

public double getLastReportedPhi()
{
return lastReportedPhi;
}

public String toString()
{
return Arrays.toString(arrivalIntervals.getArrivalIntervals());
}

@Override
public int size()
{
return arrivalIntervals.size();
}

@Override
public void update(long interval)
{
arrivalIntervals.update(interval);
}

@Override
public Snapshot getSnapshot()
{
return arrivalIntervals.getSnapshot();
}

public long getLastInterval()
{
return -1;
return arrivalIntervals.getLastInterval();
}
return arrivalWindow.getLastReportedPhi();
}
}

Expand Down Expand Up @@ -470,88 +570,10 @@ public Snapshot getSnapshot()
}
return new UniformSnapshot(values);
}
}

class ArrivalWindow
{
private static final Logger logger = LoggerFactory.getLogger(ArrivalWindow.class);
private long tLast = 0L;
private final ArrayBackedBoundedStats arrivalIntervals;
private double lastReportedPhi = Double.MIN_VALUE;

// in the event of a long partition, never record an interval longer than the rpc timeout,
// since if a host is regularly experiencing connectivity problems lasting this long we'd
// rather mark it down quickly instead of adapting
// this value defaults to the same initial value the FD is seeded with
private final long MAX_INTERVAL_IN_NANO = getMaxInterval();

ArrivalWindow(int size)
{
arrivalIntervals = new ArrayBackedBoundedStats(size);
}

private static long getMaxInterval()
{
String newvalue = System.getProperty("cassandra.fd_max_interval_ms");
if (newvalue == null)
{
return FailureDetector.INITIAL_VALUE_NANOS;
}
else
{
logger.info("Overriding FD MAX_INTERVAL to {}ms", newvalue);
return TimeUnit.NANOSECONDS.convert(Integer.parseInt(newvalue), TimeUnit.MILLISECONDS);
}
}

synchronized void add(long value, InetAddress ep)
{
assert tLast >= 0;
if (tLast > 0L)
{
long interArrivalTime = (value - tLast);
if (interArrivalTime <= MAX_INTERVAL_IN_NANO)
{
arrivalIntervals.add(interArrivalTime);
logger.trace("Reporting interval time of {} for {}", interArrivalTime, ep);
}
else
{
logger.debug("Ignoring interval time of {} for {}", interArrivalTime, ep);
}
}
else
{
// We use a very large initial interval since the "right" average depends on the cluster size
// and it's better to err high (false negatives, which will be corrected by waiting a bit longer)
// than low (false positives, which cause "flapping").
arrivalIntervals.add(FailureDetector.INITIAL_VALUE_NANOS);
}
tLast = value;
}

double mean()
{
return arrivalIntervals.mean();
}

// see CASSANDRA-2597 for an explanation of the math at work here.
double phi(long tnow)
{
assert arrivalIntervals.mean() > 0 && tLast > 0; // should not be called before any samples arrive
long t = tnow - tLast;
lastReportedPhi = t / mean();
return lastReportedPhi;
}

double getLastReportedPhi()
{
return lastReportedPhi;
}

public String toString()
public long getLastInterval()
{
return Arrays.toString(arrivalIntervals.getArrivalIntervals());
return arrivalIntervals[index % arrivalIntervals.length];
}
}

5 changes: 0 additions & 5 deletions src/java/org/apache/cassandra/gms/IFailureDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,4 @@ public interface IFailureDetector
* @param listener implementation of an application provided IFailureDetectionEventListener
*/
public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener);

/**
* Get the current phi value for this endpoint. Intended for use by metrics.
*/
public double getPhiValue(InetAddress ep);
}
1 change: 0 additions & 1 deletion test/unit/org/apache/cassandra/dht/BootStrapperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ public boolean isAlive(InetAddress ep)
public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); }
public void remove(InetAddress ep) { throw new UnsupportedOperationException(); }
public void forceConviction(InetAddress ep) { throw new UnsupportedOperationException(); }
public double getPhiValue(InetAddress ep) { throw new UnsupportedOperationException(); }
};
s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector));
s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint));
Expand Down
1 change: 1 addition & 0 deletions test/unit/org/apache/cassandra/gms/ArrivalWindowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.net.InetAddress;

import org.apache.cassandra.gms.FailureDetector.ArrivalWindow;
import org.apache.cassandra.utils.FBUtilities;

public class ArrivalWindowTest
Expand Down

0 comments on commit a24270f

Please sign in to comment.