Skip to content

Commit e3e5e67

Browse files
authored
Introduce command replay filter to avoid command replaying after reconnect #1310 (#3118)
* Draft : filter out commands to resent after a re-connect * Wordlist addition
1 parent 6d69f45 commit e3e5e67

File tree

6 files changed

+171
-19
lines changed

6 files changed

+171
-19
lines changed

.github/wordlist.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -277,4 +277,5 @@ ACR
277277
AMR
278278
Entra
279279
authx
280-
entraid
280+
entraid
281+
autoReconnect

docs/advanced-usage.md

+59-11
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ client.setOptions(ClientOptions.builder()
327327
<tbody>
328328
<tr>
329329
<td>PING before activating connection</td>
330-
<td><code>pingBefor eActivateConnection</code></td>
330+
<td><code>pingBeforeActivateConnection</code></td>
331331
<td><code>true</code></td>
332332
</tr>
333333
<tr>
@@ -362,8 +362,21 @@ queued commands.</p>
362362
refuse commands and cancel these with an exception.</p></td>
363363
</tr>
364364
<tr>
365+
<td>Replay filter</td>
366+
<td><code>replayFilter</code></td>
367+
<td><code>(cmd) -> false</code></td>
368+
</tr>
369+
<tr>
370+
<td colspan="3"><p>Since: 6.6</p>
371+
<p>Controls which commands are to be filtered out in case the driver
372+
attempts to reconnect to the server. Returning <code>false</code> means
373+
that the command would not be filtered out.</p>
374+
<p>This flag has no effect in case the autoReconnect feature is not
375+
enabled.</p></td>
376+
</tr>
377+
<tr>
365378
<td>Cancel commands on reconnect failure</td>
366-
<td><code>cancelCommand sOnReconnectFailure</code></td>
379+
<td><code>cancelCommandsOnReconnectFailure</code></td>
367380
<td><code>false</code></td>
368381
</tr>
369382
<tr>
@@ -486,7 +499,7 @@ store/trust store.</p></td>
486499
<tr>
487500
<td>Timeout Options</td>
488501
<td><code>timeoutOptions</code></td>
489-
<td><code>Do n ot timeout commands.</code></td>
502+
<td><code>Do not timeout commands.</code></td>
490503
</tr>
491504
<tr>
492505
<td colspan="3"><p>Since: 5.1</p>
@@ -550,7 +563,7 @@ client.setOptions(ClusterClientOptions.builder()
550563
<tbody>
551564
<tr>
552565
<td>Periodic cluster topology refresh</td>
553-
<td><code>en ablePeriodicRefresh</code></td>
566+
<td><code>enablePeriodicRefresh</code></td>
554567
<td><code>false</code></td>
555568
</tr>
556569
<tr>
@@ -2405,14 +2418,14 @@ independent connections to Redis.
24052418
Lettuce provides two levels of consistency; these are the rules for
24062419
Redis command sends:
24072420

2408-
Depending on the chosen consistency level:
2421+
#### Depending on the chosen consistency level
24092422

2410-
- **at-most-once execution**, i. e. no guaranteed execution
2423+
- **at-most-once execution**, i.e. no guaranteed execution
24112424

2412-
- **at-least-once execution**, i. e. guaranteed execution (with [some
2425+
- **at-least-once execution**, i.e. guaranteed execution (with [some
24132426
exceptions](#exceptions-to-at-least-once))
24142427

2415-
Always:
2428+
#### Always
24162429

24172430
- command ordering in the order of invocations
24182431

@@ -2608,9 +2621,44 @@ re-established, queued commands are re-sent for execution. While a
26082621
connection failure persists, issued commands are buffered.
26092622

26102623
To change into *at-most-once* consistency level, disable auto-reconnect
2611-
mode. Connections cannot be longer reconnected and thus no retries are
2612-
issued. Not successfully commands are canceled. New commands are
2613-
rejected.
2624+
mode. Connections can no longer be reconnected and thus no retries are
2625+
issued. Unsuccessful commands are canceled. New commands are rejected.
2626+
2627+
#### Controlling replay of commands in *at-lease-once* mode
2628+
2629+
!!! NOTE
2630+
This feature is only available since Lettuce 6.6
2631+
2632+
One can achieve a more fine-grained control over the commands that are
2633+
replayed after a reconnection by using the option to specify a filter
2634+
predicate. This option is part of the ClientOptions configuration. See
2635+
[Client Options](advanced-usage.md#client-options) for further reference.
2636+
2637+
``` java
2638+
Predicate<RedisCommand<?, ?, ?> > filter = cmd ->
2639+
cmd.getType().toString().equalsIgnoreCase("DECR");
2640+
2641+
client.setOptions(ClientOptions.builder()
2642+
.autoReconnect(true)
2643+
.replayFilter(filter)
2644+
.build());
2645+
```
2646+
2647+
The code above would filter out all `DECR` commands from being replayed
2648+
after a reconnection. Another, perhaps more popular example, would be:
2649+
2650+
``` java
2651+
Predicate<RedisCommand<?, ?, ?> > filter = cmd -> true;
2652+
2653+
client.setOptions(ClientOptions.builder()
2654+
.autoReconnect(true)
2655+
.replayFilter(filter)
2656+
.build());
2657+
```
2658+
2659+
... which disables any command replay, but still allows the driver to
2660+
re-connect, basically providing a way to have auto-reconnect without
2661+
auto-replay of commands.
26142662

26152663
### Clustered operations
26162664

src/main/java/io/lettuce/core/ClientOptions.java

+42-7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Iterator;
2626
import java.util.ServiceConfigurationError;
2727
import java.util.ServiceLoader;
28+
import java.util.function.Predicate;
2829
import java.util.function.Supplier;
2930

3031
import io.lettuce.core.api.StatefulConnection;
@@ -35,6 +36,7 @@
3536
import io.lettuce.core.protocol.DecodeBufferPolicy;
3637
import io.lettuce.core.protocol.ProtocolVersion;
3738
import io.lettuce.core.protocol.ReadOnlyCommands;
39+
import io.lettuce.core.protocol.RedisCommand;
3840
import io.lettuce.core.resource.ClientResources;
3941
import reactor.core.publisher.Mono;
4042

@@ -50,6 +52,8 @@ public class ClientOptions implements Serializable {
5052

5153
public static final boolean DEFAULT_AUTO_RECONNECT = true;
5254

55+
public static final Predicate<RedisCommand<?, ?, ?>> DEFAULT_REPLAY_FILTER = (cmd) -> false;
56+
5357
public static final int DEFAULT_BUFFER_USAGE_RATIO = 3;
5458

5559
public static final boolean DEFAULT_CANCEL_CMD_RECONNECT_FAIL = false;
@@ -92,6 +96,8 @@ public class ClientOptions implements Serializable {
9296

9397
private final boolean autoReconnect;
9498

99+
private final Predicate<RedisCommand<?, ?, ?>> replayFilter;
100+
95101
private final boolean cancelCommandsOnReconnectFailure;
96102

97103
private final DecodeBufferPolicy decodeBufferPolicy;
@@ -126,6 +132,7 @@ public class ClientOptions implements Serializable {
126132

127133
protected ClientOptions(Builder builder) {
128134
this.autoReconnect = builder.autoReconnect;
135+
this.replayFilter = builder.replayFilter;
129136
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
130137
this.decodeBufferPolicy = builder.decodeBufferPolicy;
131138
this.disconnectedBehavior = builder.disconnectedBehavior;
@@ -146,6 +153,7 @@ protected ClientOptions(Builder builder) {
146153

147154
protected ClientOptions(ClientOptions original) {
148155
this.autoReconnect = original.isAutoReconnect();
156+
this.replayFilter = original.getReplayFilter();
149157
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
150158
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
151159
this.disconnectedBehavior = original.getDisconnectedBehavior();
@@ -199,6 +207,8 @@ public static class Builder {
199207

200208
private boolean autoReconnect = DEFAULT_AUTO_RECONNECT;
201209

210+
private Predicate<RedisCommand<?, ?, ?>> replayFilter = DEFAULT_REPLAY_FILTER;
211+
202212
private boolean cancelCommandsOnReconnectFailure = DEFAULT_CANCEL_CMD_RECONNECT_FAIL;
203213

204214
private DecodeBufferPolicy decodeBufferPolicy = DecodeBufferPolicies.ratio(DEFAULT_BUFFER_USAGE_RATIO);
@@ -246,6 +256,21 @@ public Builder autoReconnect(boolean autoReconnect) {
246256
return this;
247257
}
248258

259+
/**
260+
* When {@link #autoReconnect(boolean)} is set to true, this {@link Predicate} is used to filter commands to replay when
261+
* the connection is reestablished after a disconnect. Returning <code>false</code> means the command will not be
262+
* filtered out and will be replayed. Defaults to replaying all queued commands.
263+
*
264+
* @param replayFilter a {@link Predicate} to filter commands to replay. Must not be {@code null}.
265+
* @see #DEFAULT_REPLAY_FILTER
266+
* @return {@code this}
267+
* @since 6.6
268+
*/
269+
public Builder replayFilter(Predicate<RedisCommand<?, ?, ?>> replayFilter) {
270+
this.replayFilter = replayFilter;
271+
return this;
272+
}
273+
249274
/**
250275
* Allows cancelling queued commands in case a reconnect fails.Defaults to {@code false}. See
251276
* {@link #DEFAULT_CANCEL_CMD_RECONNECT_FAIL}. <b>This flag is deprecated and should not be used as it can lead to race
@@ -527,13 +552,13 @@ public ClientOptions.Builder mutate() {
527552
Builder builder = new Builder();
528553

529554
builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
530-
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
531-
.reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
532-
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
533-
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
534-
.scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions())
535-
.sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure())
536-
.timeoutOptions(getTimeoutOptions());
555+
.replayFilter(getReplayFilter()).decodeBufferPolicy(getDecodeBufferPolicy())
556+
.disconnectedBehavior(getDisconnectedBehavior()).reauthenticateBehavior(getReauthenticateBehaviour())
557+
.readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler())
558+
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
559+
.requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
560+
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
561+
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
537562

538563
return builder;
539564
}
@@ -551,6 +576,16 @@ public boolean isAutoReconnect() {
551576
return autoReconnect;
552577
}
553578

579+
/**
580+
* Controls which {@link RedisCommand} will be replayed after a re-connect. The {@link Predicate} returns <code>true</code>
581+
* if command should be filtered out and not replayed. Defaults to {@link #DEFAULT_REPLAY_FILTER}.
582+
*
583+
* @return the currently set {@link Predicate} used to filter out commands to replay
584+
*/
585+
public Predicate<RedisCommand<?, ?, ?>> getReplayFilter() {
586+
return replayFilter;
587+
}
588+
554589
/**
555590
* If this flag is {@code true} any queued commands will be canceled when a reconnect fails within the activation sequence.
556591
* Default is {@code false}.

src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java

+17
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3434
import java.util.concurrent.atomic.AtomicLong;
3535
import java.util.function.Consumer;
36+
import java.util.function.Predicate;
3637
import java.util.function.Supplier;
3738

3839
import io.lettuce.core.ClientOptions;
@@ -81,6 +82,8 @@ public class DefaultEndpoint implements RedisChannelWriter, Endpoint, PushHandle
8182

8283
private final Reliability reliability;
8384

85+
private final Predicate<RedisCommand<?, ?, ?>> replayFilter;
86+
8487
private final ClientOptions clientOptions;
8588

8689
private final ClientResources clientResources;
@@ -139,6 +142,7 @@ public DefaultEndpoint(ClientOptions clientOptions, ClientResources clientResour
139142
this.clientOptions = clientOptions;
140143
this.clientResources = clientResources;
141144
this.reliability = clientOptions.isAutoReconnect() ? Reliability.AT_LEAST_ONCE : Reliability.AT_MOST_ONCE;
145+
this.replayFilter = clientOptions.getReplayFilter();
142146
this.disconnectedBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize());
143147
this.commandBuffer = LettuceFactories.newConcurrentQueue(clientOptions.getRequestQueueSize());
144148
this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
@@ -343,6 +347,13 @@ private void writeToDisconnectedBuffer(RedisCommand<?, ?, ?> command) {
343347
return;
344348
}
345349

350+
if (replayFilter.test(command)) {
351+
if (debugEnabled) {
352+
logger.debug("{} writeToDisconnectedBuffer() Filtering out command {}", logPrefix(), command);
353+
}
354+
return;
355+
}
356+
346357
if (debugEnabled) {
347358
logger.debug("{} writeToDisconnectedBuffer() buffering (disconnected) command {}", logPrefix(), command);
348359
}
@@ -1033,10 +1044,16 @@ private void doComplete(Future<Void> future) {
10331044
private void potentiallyRequeueCommands(Channel channel, RedisCommand<?, ?, ?> sentCommand,
10341045
Collection<? extends RedisCommand<?, ?, ?>> sentCommands) {
10351046

1047+
// do not requeue commands that are done
10361048
if (sentCommand != null && sentCommand.isDone()) {
10371049
return;
10381050
}
10391051

1052+
// do not requeue commands that are to be filtered out
1053+
if (this.endpoint.replayFilter.test(sentCommand)) {
1054+
return;
1055+
}
1056+
10401057
if (sentCommands != null) {
10411058

10421059
boolean foundToSend = false;

src/test/java/io/lettuce/core/cluster/ClusterNodeEndpointUnitTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class ClusterNodeEndpointUnitTests {
5656
@BeforeEach
5757
void before() {
5858

59+
when(clientOptions.getReplayFilter()).thenReturn((cmd) -> false);
5960
when(clientOptions.getRequestQueueSize()).thenReturn(1000);
6061
when(clientOptions.getDisconnectedBehavior()).thenReturn(ClientOptions.DisconnectedBehavior.DEFAULT);
6162

src/test/java/io/lettuce/core/reliability/AtLeastOnceIntegrationTests.java

+50
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import java.util.concurrent.CountDownLatch;
99
import java.util.concurrent.ExecutionException;
1010
import java.util.concurrent.TimeUnit;
11+
import java.util.function.Predicate;
1112

1213
import io.lettuce.core.TimeoutOptions;
14+
import io.lettuce.core.protocol.RedisCommand;
1315
import org.junit.jupiter.api.BeforeEach;
1416
import org.junit.jupiter.api.Tag;
1517
import org.junit.jupiter.api.Test;
@@ -372,6 +374,54 @@ void retryAfterConnectionIsDisconnected() throws Exception {
372374
verificationConnection.getStatefulConnection().close();
373375
}
374376

377+
@Test
378+
void retryAfterConnectionIsDisconnectedButFiltered() throws Exception {
379+
// Do not replay DECR commands after reconnect for some reason
380+
Predicate<RedisCommand<?, ?, ?>> filter = cmd -> cmd.getType().toString().equalsIgnoreCase("DECR");
381+
382+
client.setOptions(ClientOptions.builder().autoReconnect(true).replayFilter(filter)
383+
.timeoutOptions(TimeoutOptions.builder().timeoutCommands(false).build()).build());
384+
385+
// needs to be increased on slow systems...perhaps...
386+
client.setDefaultTimeout(3, TimeUnit.SECONDS);
387+
388+
StatefulRedisConnection<String, String> connection = client.connect();
389+
RedisCommands<String, String> verificationConnection = client.connect().sync();
390+
391+
connection.sync().set(key, "1");
392+
393+
ConnectionWatchdog connectionWatchdog = ConnectionTestUtil.getConnectionWatchdog(connection);
394+
connectionWatchdog.setListenOnChannelInactive(false);
395+
396+
connection.async().quit();
397+
while (connection.isOpen()) {
398+
Delay.delay(Duration.ofMillis(100));
399+
}
400+
401+
assertThat(connection.async().incr(key).await(1, TimeUnit.SECONDS)).isFalse();
402+
assertThat(connection.async().decr(key).await(1, TimeUnit.SECONDS)).isFalse();
403+
assertThat(connection.async().decr(key).await(1, TimeUnit.SECONDS)).isFalse();
404+
405+
assertThat(verificationConnection.get("key")).isEqualTo("1");
406+
407+
assertThat(ConnectionTestUtil.getDisconnectedBuffer(connection).size()).isGreaterThan(0);
408+
assertThat(ConnectionTestUtil.getCommandBuffer(connection)).isEmpty();
409+
410+
connectionWatchdog.setListenOnChannelInactive(true);
411+
connectionWatchdog.scheduleReconnect();
412+
413+
while (!ConnectionTestUtil.getCommandBuffer(connection).isEmpty()
414+
|| !ConnectionTestUtil.getDisconnectedBuffer(connection).isEmpty()) {
415+
Delay.delay(Duration.ofMillis(10));
416+
}
417+
418+
assertThat(connection.sync().get(key)).isEqualTo("2");
419+
assertThat(verificationConnection.get(key)).isEqualTo("2");
420+
421+
connection.close();
422+
verificationConnection.getStatefulConnection().close();
423+
}
424+
375425
private Throwable getException(RedisFuture<?> command) {
376426
try {
377427
command.get();

0 commit comments

Comments
 (0)