Skip to content

Commit

Permalink
Changed rejection to increment a counter and drop the event
Browse files Browse the repository at this point in the history
Added FP for pool size
  • Loading branch information
smadappa committed Nov 9, 2016
1 parent 6e0351b commit 8abc5df
Showing 1 changed file with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.config.DynamicBooleanProperty;
import com.netflix.config.DynamicIntProperty;
import com.netflix.evcache.EVCacheLatch;
import com.netflix.evcache.EVCacheLatch.Policy;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
Expand All @@ -31,6 +34,7 @@ public class EVCacheClientUtil {
private final DistributionSummary addDataSizeSummary;
private final DistributionSummary addTTLSummary;
private final DynamicBooleanProperty fixup;
private final DynamicIntProperty fixupPoolSize;
private final EVCacheClientPool _pool;
private ThreadPoolExecutor threadPool = null;

Expand All @@ -40,21 +44,23 @@ public EVCacheClientUtil(EVCacheClientPool pool) {
this.addDataSizeSummary = EVCacheMetricsFactory.getDistributionSummary(_appName + "-AddData-Size", _appName, null);
this.addTTLSummary = EVCacheMetricsFactory.getDistributionSummary(_appName + "-AddData-TTL", _appName, null);
this.fixup = EVCacheConfig.getInstance().getDynamicBooleanProperty(_appName + ".addOperation.fixup", Boolean.FALSE);
int maxThreads = 10;
this.fixupPoolSize = EVCacheConfig.getInstance().getDynamicIntProperty(_appName + ".addOperation.fixup.poolsize", 10);

RejectedExecutionHandler block = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put( r );
} catch (InterruptedException e) {
e.printStackTrace();
}
EVCacheMetricsFactory.increment(_appName , null, null, _appName + "-AddCall-FixUp-REJECTED");
}
};

class SimpleThreadFactory implements ThreadFactory {
private final AtomicInteger counter = new AtomicInteger();
public Thread newThread(Runnable r) {
return new Thread(r, "EVCacheClientUtil-AddFixUp-" + counter.getAndIncrement());
}
}

final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(1000);
threadPool = new ThreadPoolExecutor(maxThreads, maxThreads * 2, 30, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(block);
final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(10000);
threadPool = new ThreadPoolExecutor(fixupPoolSize.get(), fixupPoolSize.get() * 2, 30, TimeUnit.SECONDS, queue, new SimpleThreadFactory(), block);
threadPool.prestartAllCoreThreads();

}
Expand Down

0 comments on commit 8abc5df

Please sign in to comment.