Skip to content

Commit e328aa0

Browse files
committed
fix: revert protectMode upon channelActive event, complete non-retryable activation command in drainStackUponChannelInactive()
1 parent 7a54c8b commit e328aa0

File tree

2 files changed

+27
-22
lines changed

2 files changed

+27
-22
lines changed

src/main/java/io/lettuce/core/protocol/CommandHandler.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,18 @@ void setBuffer(ByteBuf buffer) {
190190
return drainCommands(stack);
191191
}
192192

193-
private Deque<RedisCommand<?, ?, ?>> drainStack() {
193+
private Deque<RedisCommand<?, ?, ?>> drainStackUponChannelInactive() {
194194
final Deque<RedisCommand<?, ?, ?>> target = new ArrayDeque<>(stack.size());
195195

196196
RedisCommand<?, ?, ?> cmd;
197197
while ((cmd = stack.poll()) != null) {
198-
if (!cmd.isDone() && !ActivationCommand.isActivationCommand(cmd)) {
199-
target.add(cmd);
198+
if (!cmd.isDone()) {
199+
if (!ActivationCommand.isActivationCommand(cmd)) {
200+
target.add(cmd);
201+
} else {
202+
cmd.completeExceptionally(
203+
new RedisConnectionException("activation command won't be retried upon channel inactive"));
204+
}
200205
}
201206
}
202207

@@ -379,7 +384,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
379384
endpoint.notifyChannelInactive(ctx.channel());
380385
Deque<RedisCommand<?, ?, ?>> autoBatchFlushRetryableDrainQueuedCommands = UnmodifiableDeque.emptyDeque();
381386
if (supportsAutoBatchFlush) {
382-
autoBatchFlushRetryableDrainQueuedCommands = drainStack();
387+
autoBatchFlushRetryableDrainQueuedCommands = drainStackUponChannelInactive();
383388
} else {
384389
endpoint.notifyDrainQueuedCommands(this);
385390
}

src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java

+18-18
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {
133133

134134
protected volatile @Nonnull ContextualChannel channel = DummyContextualChannelInstances.CHANNEL_CONNECTING;
135135

136+
private volatile Throwable failedToReconnectReason;
137+
136138
private final Consumer<RedisCommand<?, ?, ?>> callbackOnClose;
137139

138140
private final boolean rejectCommandsWhileDisconnected;
@@ -153,19 +155,17 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {
153155

154156
private ConnectionFacade connectionFacade;
155157

156-
private volatile Throwable connectionError;
157-
158158
private final String cachedEndpointId;
159159

160160
protected final UnboundedOfferFirstQueue<Object> taskQueue;
161161

162162
private final boolean canFire;
163163

164-
private volatile boolean inProtectMode;
164+
private volatile EventLoop lastEventLoop = null;
165165

166-
private volatile Throwable failedToReconnectReason;
166+
private volatile Throwable connectionError;
167167

168-
private volatile EventLoop lastEventLoop = null;
168+
private volatile boolean inProtectMode;
169169

170170
private final int writeSpinCount;
171171

@@ -316,17 +316,16 @@ private <V, K> void writeAndFlushActivationCommands(ContextualChannel chan,
316316
@Override
317317
public void notifyChannelActive(Channel channel) {
318318
final ContextualChannel contextualChannel = new ContextualChannel(channel, ConnectionContext.State.CONNECTED);
319-
320-
this.logPrefix = null;
321-
this.connectionError = null;
322-
323319
if (!CHANNEL.compareAndSet(this, DummyContextualChannelInstances.CHANNEL_CONNECTING, contextualChannel)) {
324320
channel.close();
325321
onUnexpectedState("notifyChannelActive", ConnectionContext.State.CONNECTING);
326322
return;
327323
}
328324

329-
lastEventLoop = channel.eventLoop();
325+
this.lastEventLoop = channel.eventLoop();
326+
this.connectionError = null;
327+
this.inProtectMode = false;
328+
this.logPrefix = null;
330329

331330
// Created a synchronize-before with set channel to CHANNEL_CONNECTING,
332331
if (isClosed()) {
@@ -398,7 +397,7 @@ public void notifyChannelInactive(Channel channel) {
398397

399398
@Override
400399
public void notifyChannelInactiveAfterWatchdogDecision(Channel channel,
401-
Deque<RedisCommand<?, ?, ?>> retryableQueuedCommands) {
400+
Deque<RedisCommand<?, ?, ?>> retryablePendingCommands) {
402401
final ContextualChannel inactiveChan = this.channel;
403402
if (!inactiveChan.context.initialState.isConnected()) {
404403
logger.error("[unexpected][{}] notifyChannelInactive: channel initial state not connected", logPrefix());
@@ -446,7 +445,7 @@ public void notifyChannelInactiveAfterWatchdogDecision(Channel channel,
446445
CHANNEL.set(this, DummyContextualChannelInstances.CHANNEL_ENDPOINT_CLOSED);
447446
}
448447
inactiveChan.context
449-
.setCloseStatus(new ConnectionContext.CloseStatus(willReconnect, retryableQueuedCommands, exception));
448+
.setCloseStatus(new ConnectionContext.CloseStatus(willReconnect, retryablePendingCommands, exception));
450449
trySetEndpointQuiescence(inactiveChan);
451450
}
452451

@@ -945,11 +944,11 @@ private <K, V, T> RedisCommand<K, V, T> processActivationCommand(RedisCommand<K,
945944

946945
private Throwable validateWrite(ContextualChannel chan, int commands, boolean isActivationCommand) {
947946
if (isClosed()) {
948-
return new RedisException("Connection is closed");
947+
return new RedisException("Endpoint is closed");
949948
}
950949

951950
final Throwable localConnectionErr = connectionError;
952-
if (localConnectionErr != null /* different logic of DefaultEndpoint */) {
951+
if (localConnectionErr != null /* attention: different logic of DefaultEndpoint */) {
953952
return localConnectionErr;
954953
}
955954

@@ -961,18 +960,19 @@ private Throwable validateWrite(ContextualChannel chan, int commands, boolean is
961960

962961
final ConnectionContext.State initialState = chan.context.initialState;
963962
final boolean rejectCommandsWhileDisconnectedLocal = this.rejectCommandsWhileDisconnected || isActivationCommand;
963+
final String rejectDesc = isActivationCommand ? "isActivationCommand" : "rejectCommandsWhileDisconnected";
964964
switch (initialState) {
965965
case ENDPOINT_CLOSED:
966966
return new RedisException("Connection is closed");
967967
case RECONNECT_FAILED:
968-
return failedToReconnectReason;
968+
return getFailedToReconnectReason();
969969
case WILL_RECONNECT:
970970
case CONNECTING:
971-
return rejectCommandsWhileDisconnectedLocal
972-
? new RedisException("Currently not connected. Commands are rejected.")
971+
return rejectCommandsWhileDisconnectedLocal ? new RedisException("Currently not connected and " + rejectDesc)
973972
: null;
974973
case CONNECTED:
975-
return !chan.isActive() && rejectCommandsWhileDisconnectedLocal ? new RedisException("Channel is closed")
974+
return !chan.isActive() && rejectCommandsWhileDisconnectedLocal
975+
? new RedisException("Channel is inactive and " + rejectDesc)
976976
: null;
977977
default:
978978
throw new IllegalStateException("unexpected state: " + initialState);

0 commit comments

Comments
 (0)