Skip to content

Commit

Permalink
Documented EVCacheLatch
Browse files Browse the repository at this point in the history
Fixed success count to expected success count
Added monitoring to ThreadPool
Set name to EVCacheClientUtil fixup thread
Reporting Write queue and Read queue metrics every time we refresh pool
  • Loading branch information
smadappa committed Nov 11, 2016
1 parent 8abc5df commit 2e76960
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 35 deletions.
18 changes: 18 additions & 0 deletions evcache-client/src/main/java/com/netflix/evcache/EVCacheLatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@

import net.spy.memcached.internal.OperationCompletionListener;


/**
* EVCacheLatch is a blocking mechanism that allows one or more threads to wait until
* a set of operations as specified by {@Link Policy} performed by evcache threads are complete.
*
* <p>The Latch is initialized with a <em>count</em> as determined by the Policy.
* The {@link #await await} methods block until the current count reaches
* zero due to completion of the operation, after which
* all waiting threads are released and any subsequent invocations of
* {@link #await await} return immediately.
*
* The latch is also released if the specified timeout is reached even though the count is greater than zero.
* In this case the {@link #await await} method returns false
*
* The various methods in latch can be queried any time and they return the state of the operations across the Futures.
*/

public interface EVCacheLatch extends OperationCompletionListener {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@
public class EVCacheLatchImpl implements EVCacheLatch {
private static final Logger log = LoggerFactory.getLogger(EVCacheLatchImpl.class);

private final int count;
private final int expectedSuccessCount;
private final CountDownLatch latch;
private final List<Future<Boolean>> futures;
private final Policy policy;
private final int totalFutureCount;

private final String appName;

public EVCacheLatchImpl(Policy policy, int _count, String appName) {
this.policy = policy;
this.futures = new ArrayList<Future<Boolean>>(_count);
this.appName = appName;
this.count = policyToCount(policy, _count);
this.latch = new CountDownLatch(count);
this.totalFutureCount = _count;
this.expectedSuccessCount = policyToCount(policy, _count);
this.latch = new CountDownLatch(expectedSuccessCount);

if (log.isDebugEnabled()) log.debug("Number of Futures = " + _count + "; Number of Futures that need to completed for Latch to be released = " + this.count);
if (log.isDebugEnabled()) log.debug("Number of Futures = " + _count + "; Number of Futures that need to completed for Latch to be released = " + this.expectedSuccessCount);
}

/*
Expand Down Expand Up @@ -99,8 +101,9 @@ public int getPendingCount() {
*/
@Override
public int getCompletedCount() {
if (log.isDebugEnabled()) log.debug("Completed Count = " + (count - (int) latch.getCount()));
return (count - (int) latch.getCount());
final int completedCount = (totalFutureCount - (int) latch.getCount());
if (log.isDebugEnabled()) log.debug("Completed Count = " + completedCount);
return completedCount;
}

/*
Expand All @@ -110,7 +113,7 @@ public int getCompletedCount() {
*/
@Override
public List<Future<Boolean>> getPendingFutures() {
final List<Future<Boolean>> returnFutures = new ArrayList<Future<Boolean>>(count);
final List<Future<Boolean>> returnFutures = new ArrayList<Future<Boolean>>(expectedSuccessCount);
for (Future<Boolean> future : futures) {
if (!future.isDone()) {
returnFutures.add(future);
Expand All @@ -136,7 +139,7 @@ public List<Future<Boolean>> getAllFutures() {
*/
@Override
public List<Future<Boolean>> getCompletedFutures() {
final List<Future<Boolean>> returnFutures = new ArrayList<Future<Boolean>>(count);
final List<Future<Boolean>> returnFutures = new ArrayList<Future<Boolean>>(expectedSuccessCount);
for (Future<Boolean> future : futures) {
if (future.isDone()) {
returnFutures.add(future);
Expand Down Expand Up @@ -212,7 +215,7 @@ public int getFailureCount() {
*/
@Override
public int getExpectedSuccessCount() {
return this.count;
return this.expectedSuccessCount;
}

/*
Expand Down Expand Up @@ -264,6 +267,8 @@ public String toString() {
builder.append(getFailureCount());
builder.append("\",\"Success Count\":\"");
builder.append(getSuccessCount());
builder.append("\",\"Excpected Success Count\":\"");
builder.append(getExpectedSuccessCount());
builder.append("\"}");
return builder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.tag.BasicTagList;
import com.netflix.servo.tag.TagList;

import net.spy.memcached.CASValue;
import net.spy.memcached.CachedData;
Expand Down Expand Up @@ -86,7 +88,7 @@ public ConnectionFactory getConnectionFactory() {
private final EVCacheClientPool pool;
private Counter addCounter = null;
private final ChainedDynamicProperty.BooleanProperty ignoreTouch;

protected final TagList tags;

EVCacheClient(String appName, String zone, int id, EVCacheServerGroupConfig config,
List<InetSocketAddress> memcachedNodesInZone, int maxQueueSize, DynamicIntProperty maxReadQueueSize,
Expand Down Expand Up @@ -115,6 +117,7 @@ public ConnectionFactory getConnectionFactory() {
this.evcacheMemcachedClient = new EVCacheMemcachedClient(connectionFactory, memcachedNodesInZone, readTimeout, appName, zone, id, serverGroup, this);
this.connectionObserver = new EVCacheConnectionObserver(appName, serverGroup, id);
this.evcacheMemcachedClient.addObserver(connectionObserver);
this.tags = BasicTagList.of("ServerGroup", serverGroup.getName(), "APP", appName);

this.decodingTranscoder = new SerializingTranscoder(Integer.MAX_VALUE);
decodingTranscoder.setCompressionThreshold(Integer.MAX_VALUE);
Expand Down Expand Up @@ -1432,4 +1435,30 @@ public String toString() {
return builder.toString();
}
}

public int getWriteQueueLength() {
final Collection<MemcachedNode> allNodes = evcacheMemcachedClient.getNodeLocator().getAll();
int size = 0;
for(MemcachedNode node : allNodes) {
if(node instanceof EVCacheNodeImpl) {
size += ((EVCacheNodeImpl)node).getWriteQueueSize();
}
}
return size;
}

public int getReadQueueLength() {
final Collection<MemcachedNode> allNodes = evcacheMemcachedClient.getNodeLocator().getAll();
int size = 0;
for(MemcachedNode node : allNodes) {
if(node instanceof EVCacheNodeImpl) {
size += ((EVCacheNodeImpl)node).getReadQueueSize();
}
}
return size;
}

public TagList getTagList() {
return tags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.evcache.util.ServerGroupCircularIterator;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.tag.BasicTagList;
import com.netflix.servo.tag.TagList;

import net.spy.memcached.MemcachedNode;
import net.spy.memcached.protocol.binary.EVCacheNodeImpl;
Expand All @@ -58,7 +58,6 @@ public class EVCacheClientPool implements Runnable, EVCacheClientPoolMBean {
public static final String DEFAULT_PORT = "11211";

private final DynamicBooleanProperty _retryAcrossAllReplicas;
private final DynamicBooleanProperty enableDynamicWriteOnlyMode;
private long lastReconcileTime = 0;

private final DynamicIntProperty logOperations;
Expand Down Expand Up @@ -130,7 +129,6 @@ public void run() {
refreshPool();
}
});
this.enableDynamicWriteOnlyMode = config.getDynamicBooleanProperty("EVCacheClientPool.enable.dynamic.writeonly", true);

this._opQueueMaxBlockTime = config.getDynamicIntProperty(appName + ".operation.QueueMaxBlockTime", 10);
this._opQueueMaxBlockTime.addCallback(new Runnable() {
Expand Down Expand Up @@ -367,7 +365,7 @@ protected boolean haveInstancesInServerGroupChanged(ServerGroup serverGroup, Set
final int inActiveServerCount = connectionObserver.getInActiveServerCount();
final int sizeInDiscovery = discoveredHostsInServerGroup.size();
final int sizeInHashing = client.getNodeLocator().getAll().size();
final BasicTagList tags = BasicTagList.of("ServerGroup", serverGroup.getName(), "APP", _appName);
final TagList tags = client.getTagList();
if (i == 0) {
EVCacheMetricsFactory.getLongGauge("EVCacheClientPool-PoolSize", tags).set(Long.valueOf(size));
EVCacheMetricsFactory.getLongGauge("EVCacheClientPool-ActiveConnections", tags).set(Long.valueOf(activeServerCount * size));
Expand Down Expand Up @@ -563,8 +561,7 @@ private void setupNewClientsByServerGroup(ServerGroup serverGroup, List<EVCacheC
// Now since we have replace the old instances shutdown all the old
// clients
if (log.isDebugEnabled()) log.debug("Replaced an existing Pool for ServerGroup : " + serverGroup + "; and app "
+ _appName + " ;\n\tOldClients : " + currentClients
+ ";\n\tNewClients : " + newClients);
+ _appName + " ;\n\tOldClients : " + currentClients + ";\n\tNewClients : " + newClients);
for (EVCacheClient client : currentClients) {
if (!client.isShutdown()) {
if (log.isDebugEnabled()) log.debug("Shutting down in Fallback -> AppName : " + _appName
Expand Down Expand Up @@ -606,29 +603,25 @@ private void updateMemcachedReadInstancesByZone() {
}
}

if (enableDynamicWriteOnlyMode.get()) {
// if we lose over 50% of instances put that zone in writeonly
// mode.
final List<EVCacheClient> clients = memcachedReadInstancesByServerGroup.get(serverGroup);
if (clients != null && !clients.isEmpty()) {
final EVCacheClient client = clients.get(0);
if (client != null) {
final EVCacheConnectionObserver connectionObserver = client.getConnectionObserver();
if (connectionObserver != null) {
final int activeServerCount = connectionObserver.getActiveServerCount();
final int inActiveServerCount = connectionObserver.getInActiveServerCount();
if (inActiveServerCount > activeServerCount) {
memcachedReadInstancesByServerGroup.remove(serverGroup);
}
// if we lose over 50% of instances put that zone in writeonly mode.
final List<EVCacheClient> clients = memcachedReadInstancesByServerGroup.get(serverGroup);
if (clients != null && !clients.isEmpty()) {
final EVCacheClient client = clients.get(0);
if (client != null) {
final EVCacheConnectionObserver connectionObserver = client.getConnectionObserver();
if (connectionObserver != null) {
final int activeServerCount = connectionObserver.getActiveServerCount();
final int inActiveServerCount = connectionObserver.getInActiveServerCount();
if (inActiveServerCount > activeServerCount) {
memcachedReadInstancesByServerGroup.remove(serverGroup);
}
}
}
}
}

if (memcachedReadInstancesByServerGroup.size() != memcachedFallbackReadInstances.getSize()) {
memcachedFallbackReadInstances = new ServerGroupCircularIterator(memcachedReadInstancesByServerGroup
.keySet());
memcachedFallbackReadInstances = new ServerGroupCircularIterator(memcachedReadInstancesByServerGroup.keySet());

Map<String, Set<ServerGroup>> readServerGroupByZoneMap = new ConcurrentHashMap<String, Set<ServerGroup>>();
for (ServerGroup serverGroup : memcachedReadInstancesByServerGroup.keySet()) {
Expand All @@ -648,7 +641,6 @@ private void updateMemcachedReadInstancesByZone() {
this.readServerGroupByZone = _readServerGroupByZone;
localServerGroupIterator = readServerGroupByZone.get(_zone);
}

}

private void cleanupMemcachedInstances(boolean force) {
Expand Down Expand Up @@ -776,6 +768,7 @@ private synchronized void refresh(boolean force) throws IOException {
cleanupMemcachedInstances(false);
}
updateMemcachedReadInstancesByZone();
updateQueueStats();
if (_pingServers.get()) pingServers();
} catch (Throwable t) {
log.error("Exception while refreshing the Server list", t);
Expand All @@ -786,6 +779,19 @@ private synchronized void refresh(boolean force) throws IOException {
if (log.isDebugEnabled()) log.debug("refresh APP : " + _appName + "; DONE");
}

private void updateQueueStats() {
for (ServerGroup serverGroup : memcachedInstancesByServerGroup.keySet()) {
List<EVCacheClient> clients = memcachedInstancesByServerGroup.get(serverGroup);
for(EVCacheClient client : clients) {
final int wSize = client.getWriteQueueLength();
EVCacheMetricsFactory.getLongGauge("EVCacheClientPool-WriteQueueSize", client.getTagList()).set(Long.valueOf(wSize));
final int rSize = client.getReadQueueLength();
EVCacheMetricsFactory.getLongGauge("EVCacheClientPool-ReadQueueSize", client.getTagList()).set(Long.valueOf(rSize));
}
}
}


public void pingServers() {
try {
final Map<ServerGroup, List<EVCacheClient>> allServers = getAllInstancesByZone();
Expand Down Expand Up @@ -995,8 +1001,7 @@ public String toString() {
return "\nEVCacheClientPool [\n\t_appName=" + _appName + ",\n\t_zone=" + _zone
+ ",\n\tlocalServerGroupIterator=" + localServerGroupIterator + ",\n\t_poolSize=" + _poolSize
+ ",\n\t_readTimeout=" + _readTimeout + ",\n\t_bulkReadTimeout=" + _bulkReadTimeout
+ ",\n\tenableDynamicWriteOnlyMode=" + enableDynamicWriteOnlyMode + ",\n\tlogOperations="
+ logOperations + ",\n\t_opQueueMaxBlockTime="
+ ",\n\tlogOperations=" + logOperations + ",\n\t_opQueueMaxBlockTime="
+ _opQueueMaxBlockTime + ",\n\t_operationTimeout=" + _operationTimeout + ",\n\t_maxReadQueueSize="
+ _maxReadQueueSize // + ",\n\tinjectionPoint=" + injectionPoint
+ ",\n\t_pingServers=" + _pingServers + ",\n\twriteOnlyFastPropertyMap=" + writeOnlyFastPropertyMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.netflix.evcache.operation.EVCacheLatchImpl;
import com.netflix.evcache.operation.EVCacheOperationFuture;
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.servo.DefaultMonitorRegistry;
import com.netflix.servo.monitor.CompositeMonitor;
import com.netflix.servo.monitor.Monitors;
import com.netflix.spectator.api.DistributionSummary;

import net.spy.memcached.CachedData;
Expand Down Expand Up @@ -61,6 +64,9 @@ public Thread newThread(Runnable r) {

final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10000);
threadPool = new ThreadPoolExecutor(fixupPoolSize.get(), fixupPoolSize.get() * 2, 30, TimeUnit.SECONDS, queue, new SimpleThreadFactory(), block);

CompositeMonitor<?> newThreadPoolMonitor = Monitors.newThreadPoolMonitor("EVCacheClientUtil-AddFixUp", threadPool);
DefaultMonitorRegistry.getInstance().register(newThreadPoolMonitor);
threadPool.prestartAllCoreThreads();

}
Expand Down

0 comments on commit 2e76960

Please sign in to comment.