Skip to content

Commit 6df7b23

Browse files
committed
fix: revert protectMode upon channelActive event, complete non-retryable activation command in drainStackUponChannelInactive()
1 parent 7a54c8b commit 6df7b23

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

src/main/java/io/lettuce/core/protocol/CommandHandler.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,18 @@ void setBuffer(ByteBuf buffer) {
190190
return drainCommands(stack);
191191
}
192192

193-
private Deque<RedisCommand<?, ?, ?>> drainStack() {
193+
private Deque<RedisCommand<?, ?, ?>> drainStackUponChannelInactive() {
194194
final Deque<RedisCommand<?, ?, ?>> target = new ArrayDeque<>(stack.size());
195195

196196
RedisCommand<?, ?, ?> cmd;
197197
while ((cmd = stack.poll()) != null) {
198-
if (!cmd.isDone() && !ActivationCommand.isActivationCommand(cmd)) {
199-
target.add(cmd);
198+
if (!cmd.isDone()) {
199+
if (!ActivationCommand.isActivationCommand(cmd)) {
200+
target.add(cmd);
201+
} else {
202+
cmd.completeExceptionally(
203+
new RedisConnectionException("activation command won't be retried upon channel inactive"));
204+
}
200205
}
201206
}
202207

@@ -379,7 +384,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
379384
endpoint.notifyChannelInactive(ctx.channel());
380385
Deque<RedisCommand<?, ?, ?>> autoBatchFlushRetryableDrainQueuedCommands = UnmodifiableDeque.emptyDeque();
381386
if (supportsAutoBatchFlush) {
382-
autoBatchFlushRetryableDrainQueuedCommands = drainStack();
387+
autoBatchFlushRetryableDrainQueuedCommands = drainStackUponChannelInactive();
383388
} else {
384389
endpoint.notifyDrainQueuedCommands(this);
385390
}

src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ public void notifyChannelActive(Channel channel) {
319319

320320
this.logPrefix = null;
321321
this.connectionError = null;
322+
this.inProtectMode = false;
322323

323324
if (!CHANNEL.compareAndSet(this, DummyContextualChannelInstances.CHANNEL_CONNECTING, contextualChannel)) {
324325
channel.close();
@@ -398,7 +399,7 @@ public void notifyChannelInactive(Channel channel) {
398399

399400
@Override
400401
public void notifyChannelInactiveAfterWatchdogDecision(Channel channel,
401-
Deque<RedisCommand<?, ?, ?>> retryableQueuedCommands) {
402+
Deque<RedisCommand<?, ?, ?>> retryablePendingCommands) {
402403
final ContextualChannel inactiveChan = this.channel;
403404
if (!inactiveChan.context.initialState.isConnected()) {
404405
logger.error("[unexpected][{}] notifyChannelInactive: channel initial state not connected", logPrefix());
@@ -446,7 +447,7 @@ public void notifyChannelInactiveAfterWatchdogDecision(Channel channel,
446447
CHANNEL.set(this, DummyContextualChannelInstances.CHANNEL_ENDPOINT_CLOSED);
447448
}
448449
inactiveChan.context
449-
.setCloseStatus(new ConnectionContext.CloseStatus(willReconnect, retryableQueuedCommands, exception));
450+
.setCloseStatus(new ConnectionContext.CloseStatus(willReconnect, retryablePendingCommands, exception));
450451
trySetEndpointQuiescence(inactiveChan);
451452
}
452453

@@ -945,11 +946,11 @@ private <K, V, T> RedisCommand<K, V, T> processActivationCommand(RedisCommand<K,
945946

946947
private Throwable validateWrite(ContextualChannel chan, int commands, boolean isActivationCommand) {
947948
if (isClosed()) {
948-
return new RedisException("Connection is closed");
949+
return new RedisException("Endpoint is closed");
949950
}
950951

951952
final Throwable localConnectionErr = connectionError;
952-
if (localConnectionErr != null /* different logic of DefaultEndpoint */) {
953+
if (localConnectionErr != null /* attention: different logic of DefaultEndpoint */) {
953954
return localConnectionErr;
954955
}
955956

@@ -961,18 +962,19 @@ private Throwable validateWrite(ContextualChannel chan, int commands, boolean is
961962

962963
final ConnectionContext.State initialState = chan.context.initialState;
963964
final boolean rejectCommandsWhileDisconnectedLocal = this.rejectCommandsWhileDisconnected || isActivationCommand;
965+
final String rejectDesc = isActivationCommand ? "isActivationCommand" : "rejectCommandsWhileDisconnected";
964966
switch (initialState) {
965967
case ENDPOINT_CLOSED:
966968
return new RedisException("Connection is closed");
967969
case RECONNECT_FAILED:
968-
return failedToReconnectReason;
970+
return getFailedToReconnectReason();
969971
case WILL_RECONNECT:
970972
case CONNECTING:
971-
return rejectCommandsWhileDisconnectedLocal
972-
? new RedisException("Currently not connected. Commands are rejected.")
973+
return rejectCommandsWhileDisconnectedLocal ? new RedisException("Currently not connected and " + rejectDesc)
973974
: null;
974975
case CONNECTED:
975-
return !chan.isActive() && rejectCommandsWhileDisconnectedLocal ? new RedisException("Channel is closed")
976+
return !chan.isActive() && rejectCommandsWhileDisconnectedLocal
977+
? new RedisException("Channel is inactive and " + rejectDesc)
976978
: null;
977979
default:
978980
throw new IllegalStateException("unexpected state: " + initialState);

0 commit comments

Comments
 (0)