From a90e1aa709e71e5012be321526ccf3a47edf0153 Mon Sep 17 00:00:00 2001 From: amcreynolds Date: Thu, 2 Nov 2017 18:47:42 -0700 Subject: [PATCH] Add Support For Alternative Transcoder Services Adds support for an alternative implementation of a Transcoder Service. The default implementation (TranscodeService) is not a good fit for all usecases, workloads, and/or environments. Note: Requires a minor version uprev due to the addition of new APIs, but is backwards compatible otherwise. --- .../net/spy/memcached/ConnectionFactory.java | 6 ++ .../memcached/ConnectionFactoryBuilder.java | 15 ++++ .../memcached/DefaultConnectionFactory.java | 13 +++- .../net/spy/memcached/MemcachedClient.java | 70 +++++++++++-------- .../transcoders/TranscodeService.java | 3 +- .../transcoders/TranscoderService.java | 14 ++++ 6 files changed, 88 insertions(+), 33 deletions(-) create mode 100644 src/main/java/net/spy/memcached/transcoders/TranscoderService.java diff --git a/src/main/java/net/spy/memcached/ConnectionFactory.java b/src/main/java/net/spy/memcached/ConnectionFactory.java index a85a6dda3..3dbf70742 100644 --- a/src/main/java/net/spy/memcached/ConnectionFactory.java +++ b/src/main/java/net/spy/memcached/ConnectionFactory.java @@ -36,6 +36,7 @@ import net.spy.memcached.metrics.MetricCollector; import net.spy.memcached.metrics.MetricType; import net.spy.memcached.ops.Operation; +import net.spy.memcached.transcoders.TranscoderService; import net.spy.memcached.transcoders.Transcoder; /** @@ -152,6 +153,11 @@ MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c, */ Transcoder getDefaultTranscoder(); + /** + * Get the default transcoder service. + */ + TranscoderService getDefaultTranscoderService(boolean daemon); + /** * If true, low-level optimization is in effect. */ diff --git a/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java b/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java index b3ece2eec..674f1b403 100644 --- a/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java +++ b/src/main/java/net/spy/memcached/ConnectionFactoryBuilder.java @@ -36,6 +36,7 @@ import net.spy.memcached.ops.OperationQueueFactory; import net.spy.memcached.protocol.ascii.AsciiOperationFactory; import net.spy.memcached.protocol.binary.BinaryOperationFactory; +import net.spy.memcached.transcoders.TranscoderService; import net.spy.memcached.transcoders.Transcoder; /** @@ -48,6 +49,7 @@ public class ConnectionFactoryBuilder { protected OperationQueueFactory writeQueueFactory; protected Transcoder transcoder; + protected TranscoderService transcoderService; protected FailureMode failureMode; @@ -145,6 +147,14 @@ public ConnectionFactoryBuilder setTranscoder(Transcoder t) { return this; } + /** + * Set the default transcoder service. + */ + public ConnectionFactoryBuilder setTranscoderService(TranscoderService t) { + transcoderService = t; + return this; + } + /** * Set the failure mode. */ @@ -367,6 +377,11 @@ public Transcoder getDefaultTranscoder() { return transcoder == null ? super.getDefaultTranscoder() : transcoder; } + @Override + public TranscoderService getDefaultTranscoderService(boolean daemon) { + return transcoderService == null ? super.getDefaultTranscoderService(daemon) : transcoderService; + } + @Override public FailureMode getFailureMode() { return failureMode == null ? super.getFailureMode() : failureMode; diff --git a/src/main/java/net/spy/memcached/DefaultConnectionFactory.java b/src/main/java/net/spy/memcached/DefaultConnectionFactory.java index 16623cd3b..3782ca7c1 100644 --- a/src/main/java/net/spy/memcached/DefaultConnectionFactory.java +++ b/src/main/java/net/spy/memcached/DefaultConnectionFactory.java @@ -33,9 +33,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -51,7 +49,9 @@ import net.spy.memcached.protocol.ascii.AsciiOperationFactory; import net.spy.memcached.protocol.binary.BinaryMemcachedNodeImpl; import net.spy.memcached.protocol.binary.BinaryOperationFactory; +import net.spy.memcached.transcoders.TranscoderService; import net.spy.memcached.transcoders.SerializingTranscoder; +import net.spy.memcached.transcoders.TranscodeService; import net.spy.memcached.transcoders.Transcoder; /** @@ -379,6 +379,15 @@ public Transcoder getDefaultTranscoder() { return new SerializingTranscoder(); } + /* + * (non-Javadoc) + * + * @see net.spy.memcached.ConnectionFactory#getDefaultTranscoderService() + */ + public TranscoderService getDefaultTranscoderService(boolean daemon) { + return new TranscodeService(daemon); + } + /* * (non-Javadoc) * diff --git a/src/main/java/net/spy/memcached/MemcachedClient.java b/src/main/java/net/spy/memcached/MemcachedClient.java index 77d0f048e..c3922b549 100644 --- a/src/main/java/net/spy/memcached/MemcachedClient.java +++ b/src/main/java/net/spy/memcached/MemcachedClient.java @@ -23,6 +23,32 @@ package net.spy.memcached; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + import net.spy.memcached.auth.AuthDescriptor; import net.spy.memcached.auth.AuthThreadMonitor; import net.spy.memcached.compat.SpyObject; @@ -51,34 +77,9 @@ import net.spy.memcached.protocol.binary.BinaryOperationFactory; import net.spy.memcached.transcoders.TranscodeService; import net.spy.memcached.transcoders.Transcoder; +import net.spy.memcached.transcoders.TranscoderService; import net.spy.memcached.util.StringUtils; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - /** * Client to a memcached server. * @@ -149,7 +150,7 @@ public class MemcachedClient extends SpyObject implements MemcachedClientIF, protected final Transcoder transcoder; - protected final TranscodeService tcService; + protected final TranscoderService tcService; protected final AuthDescriptor authDescriptor; @@ -202,7 +203,7 @@ public MemcachedClient(ConnectionFactory cf, List addrs) throw new IllegalArgumentException("Operation timeout must be positive."); } connFactory = cf; - tcService = new TranscodeService(cf.isDaemon()); + tcService = cf.getDefaultTranscoderService(cf.isDaemon()); transcoder = cf.getDefaultTranscoder(); opFact = cf.getOperationFactory(); assert opFact != null : "Connection factory failed to make op factory"; @@ -2635,8 +2636,19 @@ public MemcachedConnection getConnection() { return mconn; } + /** + * @deprecated use {@link #getTranscodeService()} + */ public TranscodeService getTranscoderService() { - return tcService; + if (tcService instanceof TranscodeService) { + return (TranscodeService)tcService; + } + throw new RuntimeException("Configured TranscoderService is not an " + + "instance of TranscodeService!"); + } + + public TranscoderService getTranscodeService() { + return tcService; } public ExecutorService getExecutorService() { diff --git a/src/main/java/net/spy/memcached/transcoders/TranscodeService.java b/src/main/java/net/spy/memcached/transcoders/TranscodeService.java index aa9b6ed52..9d91be1ea 100644 --- a/src/main/java/net/spy/memcached/transcoders/TranscodeService.java +++ b/src/main/java/net/spy/memcached/transcoders/TranscodeService.java @@ -33,13 +33,12 @@ import java.util.concurrent.atomic.AtomicBoolean; import net.spy.memcached.CachedData; -import net.spy.memcached.compat.SpyObject; import net.spy.memcached.internal.BasicThreadFactory; /** * Asynchronous transcoder. */ -public class TranscodeService extends SpyObject { +public class TranscodeService extends TranscoderService { private final ThreadPoolExecutor pool; diff --git a/src/main/java/net/spy/memcached/transcoders/TranscoderService.java b/src/main/java/net/spy/memcached/transcoders/TranscoderService.java new file mode 100644 index 000000000..0b1d36562 --- /dev/null +++ b/src/main/java/net/spy/memcached/transcoders/TranscoderService.java @@ -0,0 +1,14 @@ +package net.spy.memcached.transcoders; + +import java.util.concurrent.Future; + +import net.spy.memcached.CachedData; +import net.spy.memcached.compat.SpyObject; + +public abstract class TranscoderService extends SpyObject { + + public abstract Future decode(final Transcoder tc, final CachedData cachedData); + public abstract void shutdown(); + public abstract boolean isShutdown(); + +}