diff --git a/src/main/java/io/r2dbc/postgresql/MultiHostConnectionStrategy.java b/src/main/java/io/r2dbc/postgresql/MultiHostConnectionStrategy.java index a5ab732c..edd80ea4 100644 --- a/src/main/java/io/r2dbc/postgresql/MultiHostConnectionStrategy.java +++ b/src/main/java/io/r2dbc/postgresql/MultiHostConnectionStrategy.java @@ -73,7 +73,7 @@ public final class MultiHostConnectionStrategy implements ConnectionStrategy { @Override public Mono connect() { - return connect(this.multiHostConfiguration.getTargetServerType()); + return Mono.defer(() -> connect(this.multiHostConfiguration.getTargetServerType())); } @Override @@ -85,7 +85,7 @@ public String toString() { public Mono connect(TargetServerType targetServerType) { AtomicReference exceptionRef = new AtomicReference<>(); - return attemptConnection(targetServerType) + return Mono.defer(() -> attemptConnection(targetServerType)) .onErrorResume(e -> { if (!exceptionRef.compareAndSet(null, e)) { exceptionRef.get().addSuppressed(e); diff --git a/src/test/java/io/r2dbc/postgresql/client/HighAvailabilityClusterIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/client/HighAvailabilityClusterIntegrationTests.java index db6d795d..1214e3c3 100644 --- a/src/test/java/io/r2dbc/postgresql/client/HighAvailabilityClusterIntegrationTests.java +++ b/src/test/java/io/r2dbc/postgresql/client/HighAvailabilityClusterIntegrationTests.java @@ -27,7 +27,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.testcontainers.containers.PostgreSQLContainer; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.DisposableChannel; +import reactor.netty.DisposableServer; +import reactor.netty.tcp.TcpServer; import reactor.test.StepVerifier; import static org.assertj.core.api.Assertions.assertThat; @@ -119,6 +123,53 @@ void testTargetPreferSecondaryConnectedToStandby() { .verifyComplete(); } + @Test + void testTargetPreferSecondaryConnectedToMasterOnStandbyFailure() { + DisposableServer failingServer = newServer(); + try { + isConnectedToPrimary(MultiHostConnectionStrategy.TargetServerType.PREFER_SECONDARY, SERVERS.getPrimary(), failingServer) + .as(StepVerifier::create) + .expectNext(true) + .verifyComplete(); + } finally { + failingServer.dispose(); + } + } + + @Test + void testMultipleCallsWithTargetPreferSecondaryConnectedToStandby() { + PostgresqlConnectionFactory connectionFactory = this.multiHostConnectionFactory(MultiHostConnectionStrategy.TargetServerType.PREFER_SECONDARY, SERVERS.getPrimary(), SERVERS.getStandby()); + + Mono allocator = Mono.usingWhen(connectionFactory.create(), this::isPrimary, Connection::close); + Flux connectionPool = Flux.merge(allocator, allocator); + + connectionPool + .as(StepVerifier::create) + .expectNext(false) + .expectNext(false) + .verifyComplete(); + } + + @Test + void testMultipleCallsWithTargetPreferSecondaryConnectedToMasterOnStandbyFailure() { + DisposableServer failingServer = newServer(); + try { + PostgresqlConnectionFactory connectionFactory = this.multiHostConnectionFactoryWithFailingServer(MultiHostConnectionStrategy.TargetServerType.PREFER_SECONDARY, SERVERS.getPrimary(), + failingServer); + + Mono allocator = Mono.usingWhen(connectionFactory.create(), this::isPrimary, Connection::close); + Flux connectionPool = Flux.merge(allocator, allocator); + + connectionPool + .as(StepVerifier::create) + .expectNext(true) + .expectNext(true) + .verifyComplete(); + } finally { + failingServer.dispose(); + } + } + @Test void testTargetPrimaryChoosePrimary() { isConnectedToPrimary(MultiHostConnectionStrategy.TargetServerType.PRIMARY, SERVERS.getPrimary(), SERVERS.getStandby()) @@ -181,6 +232,12 @@ private Mono isConnectedToPrimary(MultiHostConnectionStrategy.TargetSer return Mono.usingWhen(connectionFactory.create(), this::isPrimary, Connection::close); } + private Mono isConnectedToPrimary(MultiHostConnectionStrategy.TargetServerType targetServerType, PostgreSQLContainer primaryServer, DisposableServer failingServer) { + PostgresqlConnectionFactory connectionFactory = this.multiHostConnectionFactoryWithFailingServer(targetServerType, primaryServer, failingServer); + + return Mono.usingWhen(connectionFactory.create(), this::isPrimary, Connection::close); + } + private Mono isPrimary(PostgresqlConnection connection) { return connection.createStatement("SHOW TRANSACTION_READ_ONLY") .execute() @@ -203,4 +260,25 @@ private PostgresqlConnectionFactory multiHostConnectionFactory(MultiHostConnecti return new PostgresqlConnectionFactory(configuration); } + private PostgresqlConnectionFactory multiHostConnectionFactoryWithFailingServer(MultiHostConnectionStrategy.TargetServerType targetServerType, PostgreSQLContainer primaryServer, + DisposableServer failingServer) { + PostgresqlConnectionConfiguration.Builder builder = PostgresqlConnectionConfiguration.builder(); + builder.addHost(primaryServer.getHost(), primaryServer.getMappedPort(5432)); + builder.addHost(failingServer.host(), failingServer.port()); + + PostgresqlConnectionConfiguration configuration = builder + .targetServerType(targetServerType) + .username(primaryServer.getUsername()) + .password(primaryServer.getPassword()) + .build(); + return new PostgresqlConnectionFactory(configuration); + } + + // Simulate server downtime, where connections are accepted and then closed immediately + static DisposableServer newServer() { + return TcpServer.create() + .doOnConnection(DisposableChannel::dispose) + .bindNow(); + } + }