diff --git a/Dockerfile b/Dockerfile
index 4027d8d..b40bfa9 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -21,12 +21,12 @@ WORKDIR /opt/accumulo-proxy
ARG HADOOP_VERSION=3.3.4
ARG ZOOKEEPER_VERSION=3.8.0
-ARG ACCUMULO_VERSION=2.1.0
+ARG ACCUMULO_VERSION=2.1.1
ARG ACCUMULO_PROXY_VERSION=2.0.0-SNAPSHOT
ARG HADOOP_HASH=ca5e12625679ca95b8fd7bb7babc2a8dcb2605979b901df9ad137178718821097b67555115fafc6dbf6bb32b61864ccb6786dbc555e589694a22bf69147780b4
ARG ZOOKEEPER_HASH=d66e3a40451f840406901b2cd940992b001f92049a372ae48d8b420891605871cd1ae5f6cceb3b10665491e7abef36a4078dace158bd1e0938fcd3567b5234ca
-ARG ACCUMULO_HASH=9bbc4defc114013f145e9e9fdd08683c842a29faaa01ea6e80555549a6aecef86ee6657cce23183b411a64c31123db682a6944e2825eef83c76fb5a91620235f
+ARG ACCUMULO_HASH=adb23e56362c2e3e813d07791389b8ca2d5976df8b00a29b607e6ae05ea465eff80ada6d1ec9a9c596df8b4066c51078cd5a4006dc78568ac38f638a1d3895be
# Download from Apache mirrors instead of archive #9
ENV APACHE_DIST_URLS \
diff --git a/pom.xml b/pom.xml
index 746bf21..9f70da8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,7 +85,7 @@
https://github.com/apache/accumulo-proxy/actions
- 2.1.0
+ 2.1.1
contrib/Eclipse-Accumulo-Codestyle.xml
diff --git a/src/main/java/org/apache/accumulo/proxy/Proxy.java b/src/main/java/org/apache/accumulo/proxy/Proxy.java
index 50e38a3..017b298 100644
--- a/src/main/java/org/apache/accumulo/proxy/Proxy.java
+++ b/src/main/java/org/apache/accumulo/proxy/Proxy.java
@@ -38,7 +38,6 @@
import org.apache.accumulo.proxy.thrift.AccumuloProxy;
import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
import org.apache.accumulo.server.rpc.ServerAddress;
-import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.rpc.TimedProcessor;
import org.apache.accumulo.server.rpc.UGIAssumingProcessor;
@@ -251,10 +250,9 @@ public static ServerAddress createProxyServer(HostAndPort address,
break;
}
- TimedProcessor timedProcessor = new TimedProcessor(processor, serverName, threadName);
+ TimedProcessor timedProcessor = new TimedProcessor(processor);
// Create the thrift server with our processor and properties
-
return TServerUtils.startTServer(serverType, timedProcessor, protocolFactory, serverName,
threadName, numThreads, ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
ClientConfConverter.toAccumuloConf(props), 1000L, maxFrameSize, sslParams, saslParams,
diff --git a/src/main/java/org/apache/accumulo/proxy/TServerUtils.java b/src/main/java/org/apache/accumulo/proxy/TServerUtils.java
new file mode 100644
index 0000000..c378f59
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/proxy/TServerUtils.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.proxy;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.SSLServerSocket;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.PropertyType;
+import org.apache.accumulo.core.rpc.SslConnectionParams;
+import org.apache.accumulo.core.rpc.ThriftUtil;
+import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.server.rpc.ClientInfoProcessorFactory;
+import org.apache.accumulo.server.rpc.CustomNonBlockingServer;
+import org.apache.accumulo.server.rpc.CustomThreadedSelectorServer;
+import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
+import org.apache.accumulo.server.rpc.SaslServerDigestCallbackHandler;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.rpc.TimedProcessor;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TSSLTransportFactory;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.primitives.Ints;
+
+/**
+ * Factory methods for creating Thrift server objects
+ */
+public class TServerUtils {
+ private static final Logger log = LoggerFactory.getLogger(TServerUtils.class);
+
+ /**
+ * Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the client
+ * address of any incoming RPC.
+ */
+ public static final ThreadLocal clientAddress = new ThreadLocal<>();
+
+ /**
+ *
+ * @param config Accumulo configuration
+ * @return A Map object with reserved port numbers as keys and Property objects as values
+ */
+ static Map getReservedPorts(AccumuloConfiguration config,
+ Property portProperty) {
+ return EnumSet.allOf(Property.class).stream()
+ .filter(p -> p.getType() == PropertyType.PORT && p != portProperty)
+ .flatMap(rp -> config.getPortStream(rp).mapToObj(portNum -> new Pair<>(portNum, rp)))
+ .filter(p -> p.getFirst() != 0).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
+ }
+
+ /**
+ * Create a non blocking server with multiple select threads and a custom thread pool that can
+ * dynamically resize itself.
+ */
+ private static ServerAddress createThreadedSelectorServer(HostAndPort address,
+ TProcessor processor, TProtocolFactory protocolFactory, final String serverName,
+ final int numThreads, final long threadTimeOut, final AccumuloConfiguration conf,
+ long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
+
+ final TNonblockingServerSocket transport =
+ new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()), 0,
+ Ints.saturatedCast(maxMessageSize));
+
+ TThreadedSelectorServer.Args options = new TThreadedSelectorServer.Args(transport);
+
+ options.selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 4);
+ log.info("selectorThreads : {}", options.selectorThreads);
+ options.protocolFactory(protocolFactory);
+ options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+ options.maxReadBufferBytes = maxMessageSize;
+ options.stopTimeoutVal(5);
+
+ // Create our own very special thread pool.
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
+ conf, timeBetweenThreadChecks);
+
+ options.executorService(pool);
+ options.processorFactory(new TProcessorFactory(processor));
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHost(), transport.getPort());
+ }
+
+ return new ServerAddress(new CustomThreadedSelectorServer(options), address);
+ }
+
+ /**
+ * Create a NonBlockingServer with a single select threads and a custom thread pool that can
+ * dynamically resize itself.
+ */
+ private static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor,
+ TProtocolFactory protocolFactory, final String serverName, final int numThreads,
+ final long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks,
+ long maxMessageSize) throws TTransportException {
+
+ final TNonblockingServerSocket transport =
+ new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()), 0,
+ Ints.saturatedCast(maxMessageSize));
+ final CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport);
+
+ options.protocolFactory(protocolFactory);
+ options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+ options.maxReadBufferBytes = maxMessageSize;
+ options.stopTimeoutVal(5);
+
+ // Create our own very special thread pool.
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
+ conf, timeBetweenThreadChecks);
+
+ options.executorService(pool);
+ options.processorFactory(new TProcessorFactory(processor));
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHost(), transport.getPort());
+ }
+
+ return new ServerAddress(new CustomNonBlockingServer(options), address);
+ }
+
+ /**
+ * Creates a {@link ThreadPoolExecutor} which uses a ScheduledThreadPoolExecutor to inspect the
+ * core pool size and number of active threads of the {@link ThreadPoolExecutor} and increase or
+ * decrease the core pool size based on activity (excessive or lack thereof).
+ *
+ * @param serverName A name to describe the thrift server this executor will service
+ * @param executorThreads The minimum number of threads for the executor
+ * @param threadTimeOut The time after which threads are allowed to terminate including core
+ * threads. If set to 0, the core threads will indefinitely stay running waiting for work.
+ * @param conf Accumulo Configuration
+ * @param timeBetweenThreadChecks The amount of time, in millis, between attempts to resize the
+ * executor thread pool
+ * @return A {@link ThreadPoolExecutor} which will resize itself automatically
+ */
+ private static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName,
+ final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf,
+ long timeBetweenThreadChecks) {
+ final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createFixedThreadPool(
+ executorThreads, threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", true);
+ // periodically adjust the number of threads we need by checking how busy our threads are
+ ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> {
+ // there is a minor race condition between sampling the current state of the thread pool
+ // and adjusting it however, this isn't really an issue, since it adjusts periodically
+ if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+ int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+ ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
+ } else {
+ if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+ int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
+ ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool");
+ }
+ }
+ });
+ return pool;
+ }
+
+ /**
+ * Creates a TThreadPoolServer for normal unsecure operation. Useful for comparing performance
+ * against SSL or SASL transports.
+ *
+ * @param address Address to bind to
+ * @param processor TProcessor for the server
+ * @param maxMessageSize Maximum size of a Thrift message allowed
+ * @return A configured TThreadPoolServer and its bound address information
+ */
+ private static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor,
+ TProtocolFactory protocolFactory, long maxMessageSize, String serverName, int numThreads,
+ long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks)
+ throws TTransportException {
+
+ InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort());
+ // Must use an ISA, providing only a port would ignore the hostname given
+ TServerSocket transport = new TServerSocket(isa);
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
+ conf, timeBetweenThreadChecks);
+ TThreadPoolServer server = createTThreadPoolServer(transport, processor,
+ ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool);
+
+ if (address.getPort() == 0) {
+ address =
+ HostAndPort.fromParts(address.getHost(), transport.getServerSocket().getLocalPort());
+ log.info("Blocking Server bound on {}", address);
+ }
+
+ return new ServerAddress(server, address);
+
+ }
+
+ /**
+ * Create a {@link TThreadPoolServer} with the provided server transport, processor and transport
+ * factory.
+ *
+ * @param transport TServerTransport for the server
+ * @param processor TProcessor for the server
+ * @param transportFactory TTransportFactory for the server
+ */
+ private static TThreadPoolServer createTThreadPoolServer(TServerTransport transport,
+ TProcessor processor, TTransportFactory transportFactory, TProtocolFactory protocolFactory,
+ ExecutorService service) {
+ TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
+ options.protocolFactory(protocolFactory);
+ options.transportFactory(transportFactory);
+ options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
+ if (service != null) {
+ options.executorService(service);
+ }
+ return new TThreadPoolServer(options);
+ }
+
+ /**
+ * Create the Thrift server socket for RPC running over SSL.
+ *
+ * @param port Port of the server socket to bind to
+ * @param timeout Socket timeout
+ * @param address Address to bind the socket to
+ * @param params SSL parameters
+ * @return A configured TServerSocket configured to use SSL
+ */
+ private static TServerSocket getSslServerSocket(int port, int timeout, InetAddress address,
+ SslConnectionParams params) throws TTransportException {
+ TServerSocket tServerSock;
+ if (params.useJsse()) {
+ tServerSock =
+ TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address);
+ } else {
+ tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address,
+ params.getTTransportParams());
+ }
+
+ final ServerSocket serverSock = tServerSock.getServerSocket();
+ if (serverSock instanceof SSLServerSocket) {
+ SSLServerSocket sslServerSock = (SSLServerSocket) serverSock;
+ String[] protocols = params.getServerProtocols();
+
+ // Be nice for the user and automatically remove protocols that might not exist in their JVM.
+ // Keeps us from forcing config alterations too
+ // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6
+ Set socketEnabledProtocols =
+ new HashSet<>(Arrays.asList(sslServerSock.getEnabledProtocols()));
+ // Keep only the enabled protocols that were specified by the configuration
+ socketEnabledProtocols.retainAll(Arrays.asList(protocols));
+ if (socketEnabledProtocols.isEmpty()) {
+ // Bad configuration...
+ throw new RuntimeException(
+ "No available protocols available for secure socket. Available protocols: "
+ + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: "
+ + Arrays.toString(protocols));
+ }
+
+ // Set the protocol(s) on the server socketlong
+ sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0]));
+ }
+
+ return tServerSock;
+ }
+
+ /**
+ * Create a Thrift SSL server.
+ *
+ * @param address host and port to bind to
+ * @param processor TProcessor for the server
+ * @param socketTimeout Socket timeout
+ * @param sslParams SSL parameters
+ * @return A ServerAddress with the bound-socket information and the Thrift server
+ */
+ private static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor,
+ TProtocolFactory protocolFactory, long socketTimeout, SslConnectionParams sslParams,
+ String serverName, int numThreads, long threadTimeOut, final AccumuloConfiguration conf,
+ long timeBetweenThreadChecks) throws TTransportException {
+ TServerSocket transport;
+ try {
+ transport = getSslServerSocket(address.getPort(), (int) socketTimeout,
+ InetAddress.getByName(address.getHost()), sslParams);
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+
+ if (address.getPort() == 0) {
+ address =
+ HostAndPort.fromParts(address.getHost(), transport.getServerSocket().getLocalPort());
+ log.info("SSL Thread Pool Server bound on {}", address);
+ }
+
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
+ conf, timeBetweenThreadChecks);
+
+ return new ServerAddress(createTThreadPoolServer(transport, processor,
+ ThriftUtil.transportFactory(), protocolFactory, pool), address);
+ }
+
+ private static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor,
+ TProtocolFactory protocolFactory, long socketTimeout, SaslServerConnectionParams params,
+ final String serverName, final int numThreads, final long threadTimeOut,
+ final AccumuloConfiguration conf, long timeBetweenThreadChecks) throws TTransportException {
+ // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the
+ // TThreadPoolServer does,
+ // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it
+ // open()'s which will fail
+ // when the server does an accept() to (presumably) wake up the eventing system.
+ log.info("Creating SASL thread pool thrift server on listening on {}:{}", address.getHost(),
+ address.getPort());
+ InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort());
+ // Must use an ISA, providing only a port would ignore the hostname given
+ TServerSocket transport = new TServerSocket(isa, (int) socketTimeout);
+
+ String hostname, fqdn;
+ try {
+ hostname = InetAddress.getByName(address.getHost()).getCanonicalHostName();
+ fqdn = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ transport.close();
+ throw new TTransportException(e);
+ }
+
+ // If we can't get a real hostname from the provided host test, use the hostname from DNS for
+ // localhost
+ if ("0.0.0.0".equals(hostname)) {
+ hostname = fqdn;
+ }
+
+ // ACCUMULO-3497 an easy sanity check we can perform for the user when SASL is enabled. Clients
+ // and servers have to agree upon the FQDN
+ // so that the SASL handshake can occur. If the provided hostname doesn't match the FQDN for
+ // this host, fail quickly and inform them to update
+ // their configuration.
+ if (!hostname.equals(fqdn)) {
+ log.error("Expected hostname of '{}' but got '{}'. Ensure the entries in"
+ + " the Accumulo hosts files (e.g. managers, tservers) are the FQDN for"
+ + " each host when using SASL.", fqdn, hostname);
+ transport.close();
+ throw new RuntimeException("SASL requires that the address the thrift"
+ + " server listens on is the same as the FQDN for this host");
+ }
+
+ final UserGroupInformation serverUser;
+ try {
+ serverUser = UserGroupInformation.getLoginUser();
+ } catch (IOException e) {
+ transport.close();
+ ThriftUtil.checkIOExceptionCause(e);
+ throw new TTransportException(e);
+ }
+
+ log.debug("Logged in as {}, creating TSaslServerTransport factory with {}/{}", serverUser,
+ params.getKerberosServerPrimary(), hostname);
+
+ // Make the SASL transport factory with the instance and primary from the kerberos server
+ // principal, SASL properties
+ // and the SASL callback handler from Hadoop to ensure authorization ID is the authentication
+ // ID. Despite the 'protocol' argument seeming to be useless, it
+ // *must* be the primary of the server.
+ TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
+ saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(),
+ hostname, params.getSaslProperties(), new SaslRpcServer.SaslGssCallbackHandler());
+
+ if (params.getSecretManager() != null) {
+ log.info("Adding DIGEST-MD5 server definition for delegation tokens");
+ saslTransportFactory.addServerDefinition(ThriftUtil.DIGEST_MD5,
+ params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
+ new SaslServerDigestCallbackHandler(params.getSecretManager()));
+ } else {
+ log.info("SecretManager is null, not adding support for delegation token authentication");
+ }
+
+ // Make sure the TTransportFactory is performing a UGI.doAs
+ TTransportFactory ugiTransportFactory =
+ new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
+
+ if (address.getPort() == 0) {
+ // If we chose a port dynamically, make a new use it (along with the proper hostname)
+ address =
+ HostAndPort.fromParts(address.getHost(), transport.getServerSocket().getLocalPort());
+ log.info("SASL thrift server bound on {}", address);
+ }
+
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut,
+ conf, timeBetweenThreadChecks);
+
+ final TThreadPoolServer server =
+ createTThreadPoolServer(transport, processor, ugiTransportFactory, protocolFactory, pool);
+
+ return new ServerAddress(server, address);
+ }
+
+ /**
+ * Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters.
+ * Non-null SSL parameters will cause an SSL server to be started.
+ *
+ * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is
+ * bound to.
+ */
+ public static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor,
+ TProtocolFactory protocolFactory, String serverName, String threadName, int numThreads,
+ long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks,
+ long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
+ long serverSocketTimeout, HostAndPort... addresses) throws TTransportException {
+
+ // This is presently not supported. It's hypothetically possible, I believe, to work, but it
+ // would require changes in how the transports
+ // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's
+ // quality of protection addresses privacy issues.
+ checkArgument(sslParams == null || saslParams == null,
+ "Cannot start a Thrift server using both SSL and SASL");
+
+ ServerAddress serverAddress = null;
+ for (HostAndPort address : addresses) {
+ try {
+ switch (serverType) {
+ case SSL:
+ log.debug("Instantiating SSL Thrift server");
+ serverAddress = createSslThreadPoolServer(address, processor, protocolFactory,
+ serverSocketTimeout, sslParams, serverName, numThreads, threadTimeOut, conf,
+ timeBetweenThreadChecks);
+ break;
+ case SASL:
+ log.debug("Instantiating SASL Thrift server");
+ serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory,
+ serverSocketTimeout, saslParams, serverName, numThreads, threadTimeOut, conf,
+ timeBetweenThreadChecks);
+ break;
+ case THREADPOOL:
+ log.debug("Instantiating unsecure TThreadPool Thrift server");
+ serverAddress =
+ createBlockingServer(address, processor, protocolFactory, maxMessageSize,
+ serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks);
+ break;
+ case THREADED_SELECTOR:
+ log.debug("Instantiating default, unsecure Threaded selector Thrift server");
+ serverAddress =
+ createThreadedSelectorServer(address, processor, protocolFactory, serverName,
+ numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize);
+ break;
+ case CUSTOM_HS_HA:
+ log.debug("Instantiating unsecure custom half-async Thrift server");
+ serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName,
+ numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize);
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown server type " + serverType);
+ }
+ break;
+ } catch (TTransportException e) {
+ log.warn("Error attempting to create server at {}. Error: {}", address, e.getMessage());
+ }
+ }
+ if (serverAddress == null) {
+ throw new TTransportException(
+ "Unable to create server on addresses: " + Arrays.toString(addresses));
+ }
+
+ final TServer finalServer = serverAddress.server;
+
+ Threads.createThread(threadName, () -> {
+ try {
+ finalServer.serve();
+ } catch (Error e) {
+ Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1);
+ }
+ }).start();
+
+ // check for the special "bind to everything address"
+ if (serverAddress.address.getHost().equals("0.0.0.0")) {
+ // can't get the address from the bind, so we'll do our best to invent our hostname
+ try {
+ serverAddress = new ServerAddress(finalServer, HostAndPort
+ .fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort()));
+ } catch (UnknownHostException e) {
+ throw new TTransportException(e);
+ }
+ }
+ return serverAddress;
+ }
+}