From 63c9d5232783c76381079943e985461532e00bb7 Mon Sep 17 00:00:00 2001 From: magrable <85767620+magrable@users.noreply.github.com> Date: Mon, 2 Dec 2024 15:50:58 -0500 Subject: [PATCH] Update to accumulo 2.1.1 (#81) * Moved TServerUtils into proxy code to maintain support for other Thrift Protocols --------- Co-authored-by: magrable --- Dockerfile | 4 +- pom.xml | 2 +- .../java/org/apache/accumulo/proxy/Proxy.java | 4 +- .../apache/accumulo/proxy/TServerUtils.java | 525 ++++++++++++++++++ 4 files changed, 529 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/apache/accumulo/proxy/TServerUtils.java diff --git a/Dockerfile b/Dockerfile index 4027d8d..b40bfa9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,12 +21,12 @@ WORKDIR /opt/accumulo-proxy ARG HADOOP_VERSION=3.3.4 ARG ZOOKEEPER_VERSION=3.8.0 -ARG ACCUMULO_VERSION=2.1.0 +ARG ACCUMULO_VERSION=2.1.1 ARG ACCUMULO_PROXY_VERSION=2.0.0-SNAPSHOT ARG HADOOP_HASH=ca5e12625679ca95b8fd7bb7babc2a8dcb2605979b901df9ad137178718821097b67555115fafc6dbf6bb32b61864ccb6786dbc555e589694a22bf69147780b4 ARG ZOOKEEPER_HASH=d66e3a40451f840406901b2cd940992b001f92049a372ae48d8b420891605871cd1ae5f6cceb3b10665491e7abef36a4078dace158bd1e0938fcd3567b5234ca -ARG ACCUMULO_HASH=9bbc4defc114013f145e9e9fdd08683c842a29faaa01ea6e80555549a6aecef86ee6657cce23183b411a64c31123db682a6944e2825eef83c76fb5a91620235f +ARG ACCUMULO_HASH=adb23e56362c2e3e813d07791389b8ca2d5976df8b00a29b607e6ae05ea465eff80ada6d1ec9a9c596df8b4066c51078cd5a4006dc78568ac38f638a1d3895be # Download from Apache mirrors instead of archive #9 ENV APACHE_DIST_URLS \ diff --git a/pom.xml b/pom.xml index 746bf21..9f70da8 100644 --- a/pom.xml +++ b/pom.xml @@ -85,7 +85,7 @@ https://github.com/apache/accumulo-proxy/actions - 2.1.0 + 2.1.1 contrib/Eclipse-Accumulo-Codestyle.xml diff --git a/src/main/java/org/apache/accumulo/proxy/Proxy.java b/src/main/java/org/apache/accumulo/proxy/Proxy.java index 50e38a3..017b298 100644 --- a/src/main/java/org/apache/accumulo/proxy/Proxy.java +++ b/src/main/java/org/apache/accumulo/proxy/Proxy.java @@ -38,7 +38,6 @@ import org.apache.accumulo.proxy.thrift.AccumuloProxy; import org.apache.accumulo.server.rpc.SaslServerConnectionParams; import org.apache.accumulo.server.rpc.ServerAddress; -import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.server.rpc.TimedProcessor; import org.apache.accumulo.server.rpc.UGIAssumingProcessor; @@ -251,10 +250,9 @@ public static ServerAddress createProxyServer(HostAndPort address, break; } - TimedProcessor timedProcessor = new TimedProcessor(processor, serverName, threadName); + TimedProcessor timedProcessor = new TimedProcessor(processor); // Create the thrift server with our processor and properties - return TServerUtils.startTServer(serverType, timedProcessor, protocolFactory, serverName, threadName, numThreads, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, ClientConfConverter.toAccumuloConf(props), 1000L, maxFrameSize, sslParams, saslParams, diff --git a/src/main/java/org/apache/accumulo/proxy/TServerUtils.java b/src/main/java/org/apache/accumulo/proxy/TServerUtils.java new file mode 100644 index 0000000..c378f59 --- /dev/null +++ b/src/main/java/org/apache/accumulo/proxy/TServerUtils.java @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.proxy; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.net.ssl.SSLServerSocket; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.PropertyType; +import org.apache.accumulo.core.rpc.SslConnectionParams; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory; +import org.apache.accumulo.core.util.Halt; +import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.server.rpc.ClientInfoProcessorFactory; +import org.apache.accumulo.server.rpc.CustomNonBlockingServer; +import org.apache.accumulo.server.rpc.CustomThreadedSelectorServer; +import org.apache.accumulo.server.rpc.SaslServerConnectionParams; +import org.apache.accumulo.server.rpc.SaslServerDigestCallbackHandler; +import org.apache.accumulo.server.rpc.ServerAddress; +import org.apache.accumulo.server.rpc.ThriftServerType; +import org.apache.accumulo.server.rpc.TimedProcessor; +import org.apache.hadoop.security.SaslRpcServer; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TProcessorFactory; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.server.TThreadedSelectorServer; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TSSLTransportFactory; +import org.apache.thrift.transport.TSaslServerTransport; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.primitives.Ints; + +/** + * Factory methods for creating Thrift server objects + */ +public class TServerUtils { + private static final Logger log = LoggerFactory.getLogger(TServerUtils.class); + + /** + * Static instance, passed to {@link ClientInfoProcessorFactory}, which will contain the client + * address of any incoming RPC. + */ + public static final ThreadLocal clientAddress = new ThreadLocal<>(); + + /** + * + * @param config Accumulo configuration + * @return A Map object with reserved port numbers as keys and Property objects as values + */ + static Map getReservedPorts(AccumuloConfiguration config, + Property portProperty) { + return EnumSet.allOf(Property.class).stream() + .filter(p -> p.getType() == PropertyType.PORT && p != portProperty) + .flatMap(rp -> config.getPortStream(rp).mapToObj(portNum -> new Pair<>(portNum, rp))) + .filter(p -> p.getFirst() != 0).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond)); + } + + /** + * Create a non blocking server with multiple select threads and a custom thread pool that can + * dynamically resize itself. + */ + private static ServerAddress createThreadedSelectorServer(HostAndPort address, + TProcessor processor, TProtocolFactory protocolFactory, final String serverName, + final int numThreads, final long threadTimeOut, final AccumuloConfiguration conf, + long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { + + final TNonblockingServerSocket transport = + new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()), 0, + Ints.saturatedCast(maxMessageSize)); + + TThreadedSelectorServer.Args options = new TThreadedSelectorServer.Args(transport); + + options.selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 4); + log.info("selectorThreads : {}", options.selectorThreads); + options.protocolFactory(protocolFactory); + options.transportFactory(ThriftUtil.transportFactory(maxMessageSize)); + options.maxReadBufferBytes = maxMessageSize; + options.stopTimeoutVal(5); + + // Create our own very special thread pool. + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, + conf, timeBetweenThreadChecks); + + options.executorService(pool); + options.processorFactory(new TProcessorFactory(processor)); + + if (address.getPort() == 0) { + address = HostAndPort.fromParts(address.getHost(), transport.getPort()); + } + + return new ServerAddress(new CustomThreadedSelectorServer(options), address); + } + + /** + * Create a NonBlockingServer with a single select threads and a custom thread pool that can + * dynamically resize itself. + */ + private static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, + TProtocolFactory protocolFactory, final String serverName, final int numThreads, + final long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, + long maxMessageSize) throws TTransportException { + + final TNonblockingServerSocket transport = + new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()), 0, + Ints.saturatedCast(maxMessageSize)); + final CustomNonBlockingServer.Args options = new CustomNonBlockingServer.Args(transport); + + options.protocolFactory(protocolFactory); + options.transportFactory(ThriftUtil.transportFactory(maxMessageSize)); + options.maxReadBufferBytes = maxMessageSize; + options.stopTimeoutVal(5); + + // Create our own very special thread pool. + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, + conf, timeBetweenThreadChecks); + + options.executorService(pool); + options.processorFactory(new TProcessorFactory(processor)); + + if (address.getPort() == 0) { + address = HostAndPort.fromParts(address.getHost(), transport.getPort()); + } + + return new ServerAddress(new CustomNonBlockingServer(options), address); + } + + /** + * Creates a {@link ThreadPoolExecutor} which uses a ScheduledThreadPoolExecutor to inspect the + * core pool size and number of active threads of the {@link ThreadPoolExecutor} and increase or + * decrease the core pool size based on activity (excessive or lack thereof). + * + * @param serverName A name to describe the thrift server this executor will service + * @param executorThreads The minimum number of threads for the executor + * @param threadTimeOut The time after which threads are allowed to terminate including core + * threads. If set to 0, the core threads will indefinitely stay running waiting for work. + * @param conf Accumulo Configuration + * @param timeBetweenThreadChecks The amount of time, in millis, between attempts to resize the + * executor thread pool + * @return A {@link ThreadPoolExecutor} which will resize itself automatically + */ + private static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, + final int executorThreads, long threadTimeOut, final AccumuloConfiguration conf, + long timeBetweenThreadChecks) { + final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createFixedThreadPool( + executorThreads, threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", true); + // periodically adjust the number of threads we need by checking how busy our threads are + ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> { + // there is a minor race condition between sampling the current state of the thread pool + // and adjusting it however, this isn't really an issue, since it adjusts periodically + if (pool.getCorePoolSize() <= pool.getActiveCount()) { + int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2); + ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool"); + } else { + if (pool.getCorePoolSize() > pool.getActiveCount() + 3) { + int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1); + ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool"); + } + } + }); + return pool; + } + + /** + * Creates a TThreadPoolServer for normal unsecure operation. Useful for comparing performance + * against SSL or SASL transports. + * + * @param address Address to bind to + * @param processor TProcessor for the server + * @param maxMessageSize Maximum size of a Thrift message allowed + * @return A configured TThreadPoolServer and its bound address information + */ + private static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, + TProtocolFactory protocolFactory, long maxMessageSize, String serverName, int numThreads, + long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks) + throws TTransportException { + + InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort()); + // Must use an ISA, providing only a port would ignore the hostname given + TServerSocket transport = new TServerSocket(isa); + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, + conf, timeBetweenThreadChecks); + TThreadPoolServer server = createTThreadPoolServer(transport, processor, + ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool); + + if (address.getPort() == 0) { + address = + HostAndPort.fromParts(address.getHost(), transport.getServerSocket().getLocalPort()); + log.info("Blocking Server bound on {}", address); + } + + return new ServerAddress(server, address); + + } + + /** + * Create a {@link TThreadPoolServer} with the provided server transport, processor and transport + * factory. + * + * @param transport TServerTransport for the server + * @param processor TProcessor for the server + * @param transportFactory TTransportFactory for the server + */ + private static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, + TProcessor processor, TTransportFactory transportFactory, TProtocolFactory protocolFactory, + ExecutorService service) { + TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport); + options.protocolFactory(protocolFactory); + options.transportFactory(transportFactory); + options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor)); + if (service != null) { + options.executorService(service); + } + return new TThreadPoolServer(options); + } + + /** + * Create the Thrift server socket for RPC running over SSL. + * + * @param port Port of the server socket to bind to + * @param timeout Socket timeout + * @param address Address to bind the socket to + * @param params SSL parameters + * @return A configured TServerSocket configured to use SSL + */ + private static TServerSocket getSslServerSocket(int port, int timeout, InetAddress address, + SslConnectionParams params) throws TTransportException { + TServerSocket tServerSock; + if (params.useJsse()) { + tServerSock = + TSSLTransportFactory.getServerSocket(port, timeout, params.isClientAuth(), address); + } else { + tServerSock = TSSLTransportFactory.getServerSocket(port, timeout, address, + params.getTTransportParams()); + } + + final ServerSocket serverSock = tServerSock.getServerSocket(); + if (serverSock instanceof SSLServerSocket) { + SSLServerSocket sslServerSock = (SSLServerSocket) serverSock; + String[] protocols = params.getServerProtocols(); + + // Be nice for the user and automatically remove protocols that might not exist in their JVM. + // Keeps us from forcing config alterations too + // e.g. TLSv1.1 and TLSv1.2 don't exist in JDK6 + Set socketEnabledProtocols = + new HashSet<>(Arrays.asList(sslServerSock.getEnabledProtocols())); + // Keep only the enabled protocols that were specified by the configuration + socketEnabledProtocols.retainAll(Arrays.asList(protocols)); + if (socketEnabledProtocols.isEmpty()) { + // Bad configuration... + throw new RuntimeException( + "No available protocols available for secure socket. Available protocols: " + + Arrays.toString(sslServerSock.getEnabledProtocols()) + ", allowed protocols: " + + Arrays.toString(protocols)); + } + + // Set the protocol(s) on the server socketlong + sslServerSock.setEnabledProtocols(socketEnabledProtocols.toArray(new String[0])); + } + + return tServerSock; + } + + /** + * Create a Thrift SSL server. + * + * @param address host and port to bind to + * @param processor TProcessor for the server + * @param socketTimeout Socket timeout + * @param sslParams SSL parameters + * @return A ServerAddress with the bound-socket information and the Thrift server + */ + private static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, + TProtocolFactory protocolFactory, long socketTimeout, SslConnectionParams sslParams, + String serverName, int numThreads, long threadTimeOut, final AccumuloConfiguration conf, + long timeBetweenThreadChecks) throws TTransportException { + TServerSocket transport; + try { + transport = getSslServerSocket(address.getPort(), (int) socketTimeout, + InetAddress.getByName(address.getHost()), sslParams); + } catch (UnknownHostException e) { + throw new TTransportException(e); + } + + if (address.getPort() == 0) { + address = + HostAndPort.fromParts(address.getHost(), transport.getServerSocket().getLocalPort()); + log.info("SSL Thread Pool Server bound on {}", address); + } + + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, + conf, timeBetweenThreadChecks); + + return new ServerAddress(createTThreadPoolServer(transport, processor, + ThriftUtil.transportFactory(), protocolFactory, pool), address); + } + + private static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, + TProtocolFactory protocolFactory, long socketTimeout, SaslServerConnectionParams params, + final String serverName, final int numThreads, final long threadTimeOut, + final AccumuloConfiguration conf, long timeBetweenThreadChecks) throws TTransportException { + // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the + // TThreadPoolServer does, + // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it + // open()'s which will fail + // when the server does an accept() to (presumably) wake up the eventing system. + log.info("Creating SASL thread pool thrift server on listening on {}:{}", address.getHost(), + address.getPort()); + InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort()); + // Must use an ISA, providing only a port would ignore the hostname given + TServerSocket transport = new TServerSocket(isa, (int) socketTimeout); + + String hostname, fqdn; + try { + hostname = InetAddress.getByName(address.getHost()).getCanonicalHostName(); + fqdn = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + transport.close(); + throw new TTransportException(e); + } + + // If we can't get a real hostname from the provided host test, use the hostname from DNS for + // localhost + if ("0.0.0.0".equals(hostname)) { + hostname = fqdn; + } + + // ACCUMULO-3497 an easy sanity check we can perform for the user when SASL is enabled. Clients + // and servers have to agree upon the FQDN + // so that the SASL handshake can occur. If the provided hostname doesn't match the FQDN for + // this host, fail quickly and inform them to update + // their configuration. + if (!hostname.equals(fqdn)) { + log.error("Expected hostname of '{}' but got '{}'. Ensure the entries in" + + " the Accumulo hosts files (e.g. managers, tservers) are the FQDN for" + + " each host when using SASL.", fqdn, hostname); + transport.close(); + throw new RuntimeException("SASL requires that the address the thrift" + + " server listens on is the same as the FQDN for this host"); + } + + final UserGroupInformation serverUser; + try { + serverUser = UserGroupInformation.getLoginUser(); + } catch (IOException e) { + transport.close(); + ThriftUtil.checkIOExceptionCause(e); + throw new TTransportException(e); + } + + log.debug("Logged in as {}, creating TSaslServerTransport factory with {}/{}", serverUser, + params.getKerberosServerPrimary(), hostname); + + // Make the SASL transport factory with the instance and primary from the kerberos server + // principal, SASL properties + // and the SASL callback handler from Hadoop to ensure authorization ID is the authentication + // ID. Despite the 'protocol' argument seeming to be useless, it + // *must* be the primary of the server. + TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory(); + saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), + hostname, params.getSaslProperties(), new SaslRpcServer.SaslGssCallbackHandler()); + + if (params.getSecretManager() != null) { + log.info("Adding DIGEST-MD5 server definition for delegation tokens"); + saslTransportFactory.addServerDefinition(ThriftUtil.DIGEST_MD5, + params.getKerberosServerPrimary(), hostname, params.getSaslProperties(), + new SaslServerDigestCallbackHandler(params.getSecretManager())); + } else { + log.info("SecretManager is null, not adding support for delegation token authentication"); + } + + // Make sure the TTransportFactory is performing a UGI.doAs + TTransportFactory ugiTransportFactory = + new UGIAssumingTransportFactory(saslTransportFactory, serverUser); + + if (address.getPort() == 0) { + // If we chose a port dynamically, make a new use it (along with the proper hostname) + address = + HostAndPort.fromParts(address.getHost(), transport.getServerSocket().getLocalPort()); + log.info("SASL thrift server bound on {}", address); + } + + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, threadTimeOut, + conf, timeBetweenThreadChecks); + + final TThreadPoolServer server = + createTThreadPoolServer(transport, processor, ugiTransportFactory, protocolFactory, pool); + + return new ServerAddress(server, address); + } + + /** + * Start the appropriate Thrift server (SSL or non-blocking server) for the given parameters. + * Non-null SSL parameters will cause an SSL server to be started. + * + * @return A ServerAddress encapsulating the Thrift server created and the host/port which it is + * bound to. + */ + public static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor, + TProtocolFactory protocolFactory, String serverName, String threadName, int numThreads, + long threadTimeOut, final AccumuloConfiguration conf, long timeBetweenThreadChecks, + long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, + long serverSocketTimeout, HostAndPort... addresses) throws TTransportException { + + // This is presently not supported. It's hypothetically possible, I believe, to work, but it + // would require changes in how the transports + // work at the Thrift layer to ensure that both the SSL and SASL handshakes function. SASL's + // quality of protection addresses privacy issues. + checkArgument(sslParams == null || saslParams == null, + "Cannot start a Thrift server using both SSL and SASL"); + + ServerAddress serverAddress = null; + for (HostAndPort address : addresses) { + try { + switch (serverType) { + case SSL: + log.debug("Instantiating SSL Thrift server"); + serverAddress = createSslThreadPoolServer(address, processor, protocolFactory, + serverSocketTimeout, sslParams, serverName, numThreads, threadTimeOut, conf, + timeBetweenThreadChecks); + break; + case SASL: + log.debug("Instantiating SASL Thrift server"); + serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory, + serverSocketTimeout, saslParams, serverName, numThreads, threadTimeOut, conf, + timeBetweenThreadChecks); + break; + case THREADPOOL: + log.debug("Instantiating unsecure TThreadPool Thrift server"); + serverAddress = + createBlockingServer(address, processor, protocolFactory, maxMessageSize, + serverName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks); + break; + case THREADED_SELECTOR: + log.debug("Instantiating default, unsecure Threaded selector Thrift server"); + serverAddress = + createThreadedSelectorServer(address, processor, protocolFactory, serverName, + numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize); + break; + case CUSTOM_HS_HA: + log.debug("Instantiating unsecure custom half-async Thrift server"); + serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName, + numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize); + break; + default: + throw new IllegalArgumentException("Unknown server type " + serverType); + } + break; + } catch (TTransportException e) { + log.warn("Error attempting to create server at {}. Error: {}", address, e.getMessage()); + } + } + if (serverAddress == null) { + throw new TTransportException( + "Unable to create server on addresses: " + Arrays.toString(addresses)); + } + + final TServer finalServer = serverAddress.server; + + Threads.createThread(threadName, () -> { + try { + finalServer.serve(); + } catch (Error e) { + Halt.halt("Unexpected error in TThreadPoolServer " + e + ", halting.", 1); + } + }).start(); + + // check for the special "bind to everything address" + if (serverAddress.address.getHost().equals("0.0.0.0")) { + // can't get the address from the bind, so we'll do our best to invent our hostname + try { + serverAddress = new ServerAddress(finalServer, HostAndPort + .fromParts(InetAddress.getLocalHost().getHostName(), serverAddress.address.getPort())); + } catch (UnknownHostException e) { + throw new TTransportException(e); + } + } + return serverAddress; + } +}