diff --git a/src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java b/src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java index f3ab8cb85..1a9402586 100644 --- a/src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java +++ b/src/main/java/io/lettuce/core/support/ConnectionPoolSupport.java @@ -6,6 +6,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.commons.pool2.BasePooledObjectFactory; @@ -60,6 +61,7 @@ * * * @author Mark Paluch + * @author dae won * @since 4.3 */ public abstract class ConnectionPoolSupport { @@ -77,8 +79,8 @@ private ConnectionPoolSupport() { * @return the connection pool. */ public static > GenericObjectPool createGenericObjectPool( - Supplier connectionSupplier, GenericObjectPoolConfig config) { - return createGenericObjectPool(connectionSupplier, config, true); + Supplier connectionSupplier, GenericObjectPoolConfig config, Predicate connectionValidator) { + return createGenericObjectPool(connectionSupplier, config, true, connectionValidator); } /** @@ -94,14 +96,17 @@ private ConnectionPoolSupport() { */ @SuppressWarnings("unchecked") public static > GenericObjectPool createGenericObjectPool( - Supplier connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) { + Supplier connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections, + Predicate connectionValidator) { LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null"); LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null"); + LettuceAssert.notNull(connectionValidator, "Connection validator must not be null"); AtomicReference> poolRef = new AtomicReference<>(); - GenericObjectPool pool = new GenericObjectPool(new RedisPooledObjectFactory(connectionSupplier), config) { + GenericObjectPool pool = new GenericObjectPool( + new EnhancedRedisPooledObjectFactory(connectionSupplier, connectionValidator), config) { @Override public T borrowObject() throws Exception { @@ -249,4 +254,43 @@ public CompletableFuture returnObjectAsync(T o) throws Exception { } + private static class EnhancedRedisPooledObjectFactory> + extends BasePooledObjectFactory { + + private final Supplier connectionSupplier; + + private final Predicate connectionValidator; + + EnhancedRedisPooledObjectFactory(Supplier connectionSupplier, Predicate connectionValidator) { + this.connectionSupplier = connectionSupplier; + this.connectionValidator = connectionValidator; + } + + @Override + public T create() throws Exception { + return connectionSupplier.get(); + } + + @Override + public PooledObject wrap(T obj) { + return new DefaultPooledObject<>(obj); + } + + @Override + public boolean validateObject(PooledObject p) { + T connection = p.getObject(); + return connection.isOpen() && connectionValidator.test(connection); + } + + @Override + public void destroyObject(PooledObject p) throws Exception { + try { + p.getObject().close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + } + } diff --git a/src/test/java/io/lettuce/core/dynamic/RedisCommandsIntegrationTests.java b/src/test/java/io/lettuce/core/dynamic/RedisCommandsIntegrationTests.java index 21c2b1e98..e71e89eba 100644 --- a/src/test/java/io/lettuce/core/dynamic/RedisCommandsIntegrationTests.java +++ b/src/test/java/io/lettuce/core/dynamic/RedisCommandsIntegrationTests.java @@ -100,7 +100,13 @@ void verifierShouldCatchTooFewParametersDeclarations() { void shouldWorkWithPooledConnection() throws Exception { GenericObjectPool> pool = ConnectionPoolSupport - .createGenericObjectPool(client::connect, new GenericObjectPoolConfig<>()); + .createGenericObjectPool(client::connect, new GenericObjectPoolConfig<>(), connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); try (StatefulRedisConnection connection = pool.borrowObject()) { diff --git a/src/test/java/io/lettuce/core/support/ConnectionPoolSupportIntegrationTests.java b/src/test/java/io/lettuce/core/support/ConnectionPoolSupportIntegrationTests.java index ed38435ed..a92fe2f5b 100644 --- a/src/test/java/io/lettuce/core/support/ConnectionPoolSupportIntegrationTests.java +++ b/src/test/java/io/lettuce/core/support/ConnectionPoolSupportIntegrationTests.java @@ -66,7 +66,13 @@ static void afterClass() { void genericPoolShouldWorkWithWrappedConnections() throws Exception { GenericObjectPool> pool = ConnectionPoolSupport - .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>()); + .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); borrowAndReturn(pool); borrowAndClose(pool); @@ -91,7 +97,13 @@ void genericPoolShouldCloseConnectionsAboveMaxIdleSize() throws Exception { poolConfig.setMaxIdle(2); GenericObjectPool> pool = ConnectionPoolSupport - .createGenericObjectPool(() -> client.connect(), poolConfig); + .createGenericObjectPool(() -> client.connect(), poolConfig, connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); borrowAndReturn(pool); borrowAndClose(pool); @@ -120,7 +132,13 @@ void genericPoolShouldCloseConnectionsAboveMaxIdleSize() throws Exception { void genericPoolShouldWorkWithPlainConnections() throws Exception { GenericObjectPool> pool = ConnectionPoolSupport - .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false); + .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false, connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); borrowAndReturn(pool); @@ -151,7 +169,13 @@ void softReferencePoolShouldWorkWithPlainConnections() throws Exception { void genericPoolUsingWrappingShouldPropagateExceptionsCorrectly() throws Exception { GenericObjectPool> pool = ConnectionPoolSupport - .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>()); + .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); StatefulRedisConnection connection = pool.borrowObject(); RedisCommands sync = connection.sync(); @@ -172,7 +196,13 @@ void genericPoolUsingWrappingShouldPropagateExceptionsCorrectly() throws Excepti void wrappedConnectionShouldUseWrappers() throws Exception { GenericObjectPool> pool = ConnectionPoolSupport - .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>()); + .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); StatefulRedisConnection connection = pool.borrowObject(); RedisCommands sync = connection.sync(); @@ -197,7 +227,13 @@ void wrappedMasterSlaveConnectionShouldUseWrappers() throws Exception { GenericObjectPool> pool = ConnectionPoolSupport .createGenericObjectPool(() -> MasterReplica.connect(client, new StringCodec(), RedisURI.create(host, port)), - new GenericObjectPoolConfig<>()); + new GenericObjectPoolConfig<>(), connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); StatefulRedisMasterReplicaConnection connection = pool.borrowObject(); RedisCommands sync = connection.sync(); @@ -223,7 +259,13 @@ void wrappedClusterConnectionShouldUseWrappers() throws Exception { RedisURI.create(TestSettings.host(), 7379)); GenericObjectPool> pool = ConnectionPoolSupport - .createGenericObjectPool(redisClusterClient::connect, new GenericObjectPoolConfig<>()); + .createGenericObjectPool(redisClusterClient::connect, new GenericObjectPoolConfig<>(), connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); StatefulRedisClusterConnection connection = pool.borrowObject(); RedisAdvancedClusterCommands sync = connection.sync(); @@ -250,7 +292,13 @@ void wrappedClusterConnectionShouldUseWrappers() throws Exception { void plainConnectionShouldNotUseWrappers() throws Exception { GenericObjectPool> pool = ConnectionPoolSupport - .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false); + .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), false, connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); StatefulRedisConnection connection = pool.borrowObject(); RedisCommands sync = connection.sync(); @@ -295,7 +343,13 @@ void softRefPoolShouldWorkWithWrappedConnections() throws Exception { void wrappedObjectClosedAfterReturn() throws Exception { GenericObjectPool> pool = ConnectionPoolSupport - .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), true); + .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), true, connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); StatefulRedisConnection connection = pool.borrowObject(); RedisCommands sync = connection.sync(); @@ -317,7 +371,13 @@ void wrappedObjectClosedAfterReturn() throws Exception { void tryWithResourcesReturnsConnectionToPool() throws Exception { GenericObjectPool> pool = ConnectionPoolSupport - .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>()); + .createGenericObjectPool(() -> client.connect(), new GenericObjectPoolConfig<>(), connection -> { + try { + return "PONG".equals(connection.sync().ping()); + } catch (Exception e) { + return false; + } + }); StatefulRedisConnection usedConnection = null; try (StatefulRedisConnection connection = pool.borrowObject()) {