Skip to content

Commit

Permalink
Some optimization by storing the various servo Metrcis in a map
Browse files Browse the repository at this point in the history
  • Loading branch information
smadappa committed May 24, 2017
1 parent 49fa643 commit 94a59ab
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class EVCacheOperationFuture<T> extends OperationFuture<T> {
private final ServerGroup serverGroup;
private final String key;

public EVCacheOperationFuture(String k, CountDownLatch l, AtomicReference<T> oref, long opTimeout, ExecutorService service, String appName, ServerGroup serverGroup, String metricName) {
public EVCacheOperationFuture(String k, CountDownLatch l, AtomicReference<T> oref, long opTimeout, ExecutorService service, String appName, ServerGroup serverGroup) {
super(k, l, oref, opTimeout, service);
this.latch = l;
this.objRef = oref;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.monitor.Counter;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.servo.monitor.Timer;
import com.netflix.spectator.api.DistributionSummary;

import net.spy.memcached.internal.GetFuture;
Expand Down Expand Up @@ -63,9 +65,12 @@ public class EVCacheMemcachedClient extends MemcachedClient {
private final ChainedDynamicProperty.IntProperty readTimeout;
private final ServerGroup serverGroup;
private final EVCacheClient client;
private final ConnectionFactory connectionFactory;
private final Map<String, Counter> counterMap = new ConcurrentHashMap<String, Counter>();
private final Map<String, Timer> timerMap = new ConcurrentHashMap<String, Timer>();

private DistributionSummary getDataSize, bulkDataSize, getAndTouchDataSize;
private DynamicLongProperty mutateOperationTimeout;
private final ConnectionFactory connectionFactory;

public EVCacheMemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs,
ChainedDynamicProperty.IntProperty readTimeout, String appName, String zone, int id,
Expand Down Expand Up @@ -94,8 +99,8 @@ public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {

public <T> EVCacheOperationFuture<T> asyncGet(final String key, final Transcoder<T> tc, EVCacheGetOperationListener<T> listener) {
final CountDownLatch latch = new CountDownLatch(1);
final EVCacheOperationFuture<T> rv = new EVCacheOperationFuture<T>(key, latch, new AtomicReference<T>(null), readTimeout.get().intValue(), executorService, appName, serverGroup, "GetOperation");
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, "GetOperation").start();
final EVCacheOperationFuture<T> rv = new EVCacheOperationFuture<T>(key, latch, new AtomicReference<T>(null), readTimeout.get().intValue(), executorService, appName, serverGroup);
final Stopwatch operationDuration = getTimer(GET_OPERATION_STRING).start();
Operation op = opFact.get(key, new GetOperation.Callback() {
private Future<T> val = null;

Expand Down Expand Up @@ -177,7 +182,7 @@ public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys, final T
final CountDownLatch latch = new CountDownLatch(initialLatchCount);
final Collection<Operation> ops = new ArrayList<Operation>(chunks.size());
final EVCacheBulkGetFuture<T> rv = new EVCacheBulkGetFuture<T>(appName, m, ops, latch, executorService, serverGroup, metricName);
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, "BulkOperation").start();
final Stopwatch operationDuration = getTimer(BULK_OPERATION_STRING).start();
GetOperation.Callback cb = new GetOperation.Callback() {
@Override
@SuppressWarnings("synthetic-access")
Expand All @@ -192,7 +197,6 @@ public void gotData(String k, int flags, byte[] data) {
if(bulkDataSize == null) bulkDataSize = EVCacheMetricsFactory.getDistributionSummary(appName + "-BulkOperation-DataSize", appName, serverGroup.getName());
if (bulkDataSize != null) bulkDataSize.record(data.length);
}

m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())));
}

Expand Down Expand Up @@ -222,8 +226,8 @@ public void complete() {

public <T> EVCacheOperationFuture<CASValue<T>> asyncGetAndTouch(final String key, final int exp, final Transcoder<T> tc) {
final CountDownLatch latch = new CountDownLatch(1);
final EVCacheOperationFuture<CASValue<T>> rv = new EVCacheOperationFuture<CASValue<T>>(key, latch, new AtomicReference<CASValue<T>>(null), connectionFactory.getOperationTimeout(), executorService, appName, serverGroup, "GetAndTouchOperation");
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, "GetAndTouchOperation").start();
final EVCacheOperationFuture<CASValue<T>> rv = new EVCacheOperationFuture<CASValue<T>>(key, latch, new AtomicReference<CASValue<T>>(null), connectionFactory.getOperationTimeout(), executorService, appName, serverGroup);
final Stopwatch operationDuration = getTimer(GET_AND_TOUCH_OPERATION_STRING).start();
Operation op = opFact.getAndTouch(key, exp, new GetAndTouchOperation.Callback() {
private CASValue<T> val = null;

Expand Down Expand Up @@ -279,17 +283,16 @@ public <T> OperationFuture<Boolean> add(String key, int exp, T o, Transcoder<T>
public OperationFuture<Boolean> delete(String key, EVCacheLatch evcacheLatch) {
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(key, latch, connectionFactory.getOperationTimeout(), executorService);
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, "DeleteOperation").start();

final Stopwatch operationDuration = getTimer(DELETE_STRING).start();
final DeleteOperation.Callback callback = new DeleteOperation.Callback() {
@Override
public void receivedStatus(OperationStatus status) {
operationDuration.stop();
rv.set(Boolean.TRUE, status);
if (status.getStatusCode().equals(StatusCode.SUCCESS)) {
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-DeleteOperation-SUCCESS", DataSourceType.COUNTER).increment();
getCounter(DELETE_OPERATION_SUCCESS_STRING).increment();
} else {
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-DeleteOperation-"+ status.getStatusCode().name(), DataSourceType.COUNTER).increment();
getCounter("DeleteOperation-"+ status.getStatusCode().name()).increment();
}
}

Expand All @@ -315,20 +318,17 @@ public void complete() {
public <T> OperationFuture<Boolean> touch(final String key, final int exp, EVCacheLatch evcacheLatch) {
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(key, latch, connectionFactory.getOperationTimeout(), executorService);
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, "TouchOperation").start();

final Stopwatch operationDuration = getTimer(TOUCH_OPERATION_STRING).start();
Operation op = opFact.touch(key, exp, new OperationCallback() {
@Override
public void receivedStatus(OperationStatus status) {
operationDuration.stop();
rv.set(status.isSuccess(), status);

if (status.getStatusCode().equals(StatusCode.SUCCESS)) {
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-TouchOperation-SUCCESS", DataSourceType.COUNTER).increment();
} else if (status.getStatusCode().equals(StatusCode.ERR_NOT_FOUND) ) {
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-TouchOperation-"+status.getStatusCode().name(), DataSourceType.COUNTER).increment();
getCounter(TOUCH_OPERATION_SUCCESS_STRING).increment();
} else {
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-TouchOperation-"+status.getStatusCode().name(), DataSourceType.COUNTER).increment();
getCounter(TOUCH_OPERATION_STRING + "-" + status.getStatusCode().name()).increment();
}
}

Expand All @@ -347,9 +347,8 @@ public void complete() {

public <T> OperationFuture<Boolean> asyncAppendOrAdd(final String key, int exp, CachedData co, EVCacheLatch evcacheLatch) {
final CountDownLatch latch = new CountDownLatch(1);
final OperationFuture<Boolean> rv = new EVCacheOperationFuture<Boolean>(key, latch, new AtomicReference<Boolean>(null), connectionFactory.getOperationTimeout(), executorService, appName, serverGroup, "LatencyAoA" );
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, "AoAOperation").start();

final OperationFuture<Boolean> rv = new EVCacheOperationFuture<Boolean>(key, latch, new AtomicReference<Boolean>(null), connectionFactory.getOperationTimeout(), executorService, appName, serverGroup);
final Stopwatch operationDuration = getTimer(AOA_STRING).start();
Operation op = opFact.cat(ConcatenationType.append, 0, key, co.getData(),
new OperationCallback() {
boolean appendSuccess = false;
Expand All @@ -361,11 +360,11 @@ public void receivedStatus(OperationStatus val) {
if (log.isDebugEnabled()) log.debug("AddOrAppend Key (Append Operation): " + key + "; Status : " + val.getStatusCode().name()
+ "; Message : " + val.getMessage() + "; Elapsed Time - " + (operationDuration.getDuration(TimeUnit.MILLISECONDS)));

EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-AoA-AppendOperation-SUCCESS", DataSourceType.COUNTER).increment();
getCounter(AOA_APPEND_OPERATION_SUCCESS_STRING).increment();
rv.set(val.isSuccess(), val);
appendSuccess = true;
} else {
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-AoA-AppendOperation-FAIL", DataSourceType.COUNTER).increment();
getCounter("AoA-AppendOperation-FAIL").increment();
appendSuccess = false;
}
}
Expand All @@ -385,20 +384,20 @@ public void receivedStatus(OperationStatus val) {
rv.set(val.isSuccess(), val);
if(val.isSuccess()) {
appendSuccess = true;
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-AoA-AddOperation-SUCCESS", DataSourceType.COUNTER).increment();
getCounter(AOA_ADD_OPERATION_SUCCESS_STRING).increment();
} else {
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-AoA-AddOperation-FAIL", DataSourceType.COUNTER).increment();
getCounter("AoA-AddOperation-FAIL").increment();
Operation op = opFact.cat(ConcatenationType.append, 0, key, co.getData(),
new OperationCallback() {
public void receivedStatus(OperationStatus val) {
if (val.getStatusCode().equals(StatusCode.SUCCESS)) {
if (log.isDebugEnabled()) log.debug("AddOrAppend Retry append Key (Append Operation): " + key + "; Status : " + val.getStatusCode().name()
+ "; Message : " + val.getMessage() + "; Elapsed Time - " + (operationDuration.getDuration(TimeUnit.MILLISECONDS)));

EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-AoA-RetryAppendOperation-SUCCESS", DataSourceType.COUNTER).increment();
getCounter("AoA-RetryAppendOperation-SUCCESS").increment();
rv.set(val.isSuccess(), val);
} else {
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-AoA-RetryAppendOperation-FAIL", DataSourceType.COUNTER).increment();
getCounter("AoA-RetryAppendOperation-FAIL").increment();
}
}
public void complete() {
Expand Down Expand Up @@ -435,7 +434,23 @@ public void complete() {
return rv;
}

private Timer getTimer(String name) {
Timer timer = timerMap.get(name);
if(timer != null) return timer;

timer = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, name);
timerMap.put(name, timer);
return timer;
}

private Counter getCounter(String counterMetric) {
Counter counter = counterMap.get(counterMetric);
if(counter != null) return counter;

counter = EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-" + counterMetric, DataSourceType.COUNTER);
counterMap.put(counterMetric, counter);
return counter;
}

private <T> OperationFuture<Boolean> asyncStore(final StoreType storeType, final String key, int exp, T value, Transcoder<T> tc, EVCacheLatch evcacheLatch) {
final CachedData co;
Expand All @@ -446,31 +461,36 @@ private <T> OperationFuture<Boolean> asyncStore(final StoreType storeType, final
}
final CountDownLatch latch = new CountDownLatch(1);
final String operationStr;
final String operationSuccessStr;
if (storeType == StoreType.set) {
operationStr = "Set";
operationStr = SET_OPERATION_STRING;
operationSuccessStr = SET_OPERATION_SUCCESS_STRING;
} else if (storeType == StoreType.add) {
operationStr = "Add";
operationStr = ADD_OPERATION_STRING;
operationSuccessStr = ADD_OPERATION_SUCCESS_STRING;
} else {
operationStr = "Replace";
operationStr = REPLACE_OPERATION_STRING;
operationSuccessStr = REPLACE_OPERATION_SUCCESS_STRING;
}
final OperationFuture<Boolean> rv = new EVCacheOperationFuture<Boolean>(key, latch, new AtomicReference<Boolean>(null), connectionFactory.getOperationTimeout(), executorService, appName, serverGroup, "Latency" + operationStr);

final Timer timer = getTimer(operationStr);
final OperationFuture<Boolean> rv = new EVCacheOperationFuture<Boolean>(key, latch, new AtomicReference<Boolean>(null), connectionFactory.getOperationTimeout(), executorService, appName, serverGroup);
Operation op = opFact.store(storeType, key, co.getFlags(), exp, co.getData(), new StoreOperation.Callback() {
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, operationStr + "Operation").start();

final Stopwatch operationDuration = timer.start();

@Override
public void receivedStatus(OperationStatus val) {
operationDuration.stop();
if (log.isDebugEnabled()) log.debug("Storing Key : " + key + "; Status : " + val.getStatusCode().name()
+ "; Message : " + val.getMessage() + "; Elapsed Time - " + operationDuration.getDuration(TimeUnit.MILLISECONDS));
if (val.getStatusCode().equals(StatusCode.SUCCESS)) {
EVCacheMetricsFactory.increment(appName, null, serverGroup.getName(), appName + "-" + operationStr + "Operation-SUCCESS");
getCounter(operationSuccessStr).increment();
} else {
if (val.getStatusCode().equals(StatusCode.TIMEDOUT)) {
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-" + operationStr + "Operation-TIMEDOUT", DataSourceType.COUNTER).increment();
} else if (val.getStatusCode().equals(StatusCode.ERR_NOT_FOUND) || val.getStatusCode().equals(StatusCode.ERR_EXISTS)) {
EVCacheMetricsFactory.increment(appName, null, serverGroup.getName(), appName + "-" + operationStr + "Operation-" + val.getStatusCode().name());
getCounter(operationStr + "-TIMEDOUT").increment();
} else {
EVCacheMetricsFactory.getCounter(appName, null, serverGroup.getName(), appName + "-" + operationStr + "Operation-" + val.getStatusCode().name(), DataSourceType.COUNTER).increment();
getCounter(operationStr + "-" + val.getStatusCode().name()).increment();
}
}
rv.set(val.isSuccess(), val);
Expand Down Expand Up @@ -504,7 +524,7 @@ public <T> OperationFuture<Boolean> add(String key, int exp, T o, final Transcod
}

public long incr(String key, long by, long def, int exp) {
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, "IncrOperation").start();
final Stopwatch operationDuration = getTimer(INCR_OPERATION_STRING).start();
long val = 0;
try {
val = mutate(Mutator.incr, key, by, def, exp);
Expand All @@ -518,7 +538,7 @@ public long incr(String key, long by, long def, int exp) {


public long decr(String key, long by, long def, int exp) {
final Stopwatch operationDuration = EVCacheMetricsFactory.getStatsTimer(appName, serverGroup, "DecrOperation").start();
final Stopwatch operationDuration = getTimer(DECR_OPERATION_STRING).start();
long val = 0;
try {
val = super.decr(key, by, def, exp);
Expand Down Expand Up @@ -565,4 +585,23 @@ public void reconnectNode(EVCacheNodeImpl evcNode ) {
evcNode.setConnectTime(System.currentTimeMillis());
mconn.queueReconnect(evcNode);
}
private final String BULK_OPERATION_STRING = "BulkOperation";
private final String GET_OPERATION_STRING = "GetOperation";
private final String GET_AND_TOUCH_OPERATION_STRING = "GetAndTouchOperation";
private final String DELETE_STRING = "DeleteOperation";
private final String TOUCH_OPERATION_STRING = "TouchOperation";
private final String TOUCH_OPERATION_SUCCESS_STRING = "TouchOperation-SUCCESS";
private final String AOA_STRING = "AoAOperation";
private final String SET_OPERATION_STRING = "SetOperation";
private final String ADD_OPERATION_STRING = "AddOperation";
private final String REPLACE_OPERATION_STRING = "ReplaceOperation";
private final String INCR_OPERATION_STRING = "IncrOperation";
private final String DECR_OPERATION_STRING = "DecrOperation";
private final String DELETE_OPERATION_SUCCESS_STRING = "DeleteOperation-SUCCESS";
private final String SET_OPERATION_SUCCESS_STRING = "SetOperation-SUCCESS";
private final String ADD_OPERATION_SUCCESS_STRING = "AddOperation-SUCCESS";
private final String REPLACE_OPERATION_SUCCESS_STRING = "ReplaceOperation-SUCCESS";
private final String AOA_APPEND_OPERATION_SUCCESS_STRING = "AoA-AppendOperation-SUCCESS";
private final String AOA_ADD_OPERATION_SUCCESS_STRING = "AoA-AddOperation-SUCCESS";

}

0 comments on commit 94a59ab

Please sign in to comment.