Skip to content

Commit c4dffec

Browse files
committed
fix: two bugs: 1, autoBatchFlushEndPointContext.add() should always be before autoBatchFlushEndPointContext.done(1) othewise the flyingTaskNum could be negative; 2, make sure lastEventLoop is never null
1 parent a80453e commit c4dffec

File tree

1 file changed

+32
-30
lines changed

1 file changed

+32
-30
lines changed

src/main/java/io/lettuce/core/protocol/DefaultAutoBatchFlushEndpoint.java

+32-30
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,6 @@
1515
*/
1616
package io.lettuce.core.protocol;
1717

18-
import java.util.ArrayList;
19-
import java.util.Collection;
20-
import java.util.Deque;
21-
import java.util.HashSet;
22-
import java.util.List;
23-
import java.util.Queue;
24-
import java.util.Set;
25-
import java.util.concurrent.CompletableFuture;
26-
import java.util.concurrent.CopyOnWriteArrayList;
27-
import java.util.concurrent.TimeUnit;
28-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
29-
import java.util.concurrent.atomic.AtomicLong;
30-
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
31-
import java.util.function.Consumer;
32-
33-
import javax.annotation.Nonnull;
34-
import javax.annotation.Nullable;
35-
3618
import io.lettuce.core.ClientOptions;
3719
import io.lettuce.core.ConnectionEvents;
3820
import io.lettuce.core.ContextualChannel;
@@ -55,11 +37,29 @@
5537
import io.netty.channel.EventLoop;
5638
import io.netty.handler.codec.EncoderException;
5739
import io.netty.util.Recycler;
40+
import io.netty.util.concurrent.EventExecutor;
5841
import io.netty.util.concurrent.Future;
5942
import io.netty.util.concurrent.GenericFutureListener;
6043
import io.netty.util.internal.logging.InternalLogger;
6144
import io.netty.util.internal.logging.InternalLoggerFactory;
6245

46+
import javax.annotation.Nonnull;
47+
import javax.annotation.Nullable;
48+
import java.util.ArrayList;
49+
import java.util.Collection;
50+
import java.util.Deque;
51+
import java.util.HashSet;
52+
import java.util.List;
53+
import java.util.Queue;
54+
import java.util.Set;
55+
import java.util.concurrent.CompletableFuture;
56+
import java.util.concurrent.CopyOnWriteArrayList;
57+
import java.util.concurrent.TimeUnit;
58+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
59+
import java.util.concurrent.atomic.AtomicLong;
60+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
61+
import java.util.function.Consumer;
62+
6363
/**
6464
* Default {@link Endpoint} implementation.
6565
*
@@ -161,7 +161,7 @@ protected static void cancelCommandOnEndpointClose(RedisCommand<?, ?, ?> cmd) {
161161

162162
private final boolean canFire;
163163

164-
private volatile EventLoop lastEventLoop = null;
164+
private volatile EventExecutor lastEventExecutor;
165165

166166
private volatile Throwable connectionError;
167167

@@ -202,6 +202,7 @@ protected DefaultAutoBatchFlushEndpoint(ClientOptions clientOptions, ClientResou
202202
this.callbackOnClose = callbackOnClose;
203203
this.writeSpinCount = clientOptions.getAutoBatchFlushOptions().getWriteSpinCount();
204204
this.batchSize = clientOptions.getAutoBatchFlushOptions().getBatchSize();
205+
this.lastEventExecutor = clientResources.eventExecutorGroup().next();
205206
}
206207

207208
@Override
@@ -322,7 +323,7 @@ public void notifyChannelActive(Channel channel) {
322323
return;
323324
}
324325

325-
this.lastEventLoop = channel.eventLoop();
326+
this.lastEventExecutor = channel.eventLoop();
326327
this.connectionError = null;
327328
this.inProtectMode = false;
328329
this.logPrefix = null;
@@ -585,7 +586,7 @@ private void resetInternal() {
585586
if (chan.context.initialState.isConnected()) {
586587
chan.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset());
587588
}
588-
LettuceAssert.assertState(lastEventLoop.inEventLoop(), "must be called in lastEventLoop thread");
589+
LettuceAssert.assertState(lastEventExecutor.inEventLoop(), "must be called in lastEventLoop thread");
589590
cancelCommands("resetInternal");
590591
}
591592

@@ -727,22 +728,23 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint
727728
}
728729

729730
if (o instanceof RedisCommand<?, ?, ?>) {
731+
autoBatchFlushEndPointContext.add(1);
730732
RedisCommand<?, ?, ?> cmd = (RedisCommand<?, ?, ?>) o;
731733
channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false));
732734
count++;
733735
} else {
734736
@SuppressWarnings("unchecked")
735737
Collection<? extends RedisCommand<?, ?, ?>> commands = (Collection<? extends RedisCommand<?, ?, ?>>) o;
738+
final int commandsSize = commands.size(); // size() could be expensive for some collections so cache it!
739+
autoBatchFlushEndPointContext.add(commandsSize);
736740
for (RedisCommand<?, ?, ?> cmd : commands) {
737741
channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd, false));
738742
}
739-
count += commands.size();
743+
count += commandsSize;
740744
}
741745
}
742746

743747
if (count > 0) {
744-
autoBatchFlushEndPointContext.add(count);
745-
746748
channelFlush(chan);
747749
if (autoBatchFlushEndPointContext.hasRetryableFailedToSendCommands()) {
748750
// Wait for onConnectionClose event()
@@ -755,7 +757,7 @@ private int pollBatch(final AutoBatchFlushEndPointContext autoBatchFlushEndPoint
755757
private void trySetEndpointQuiescence(ContextualChannel chan) {
756758
final EventLoop eventLoop = chan.eventLoop();
757759
LettuceAssert.isTrue(eventLoop.inEventLoop(), "unexpected: not in event loop");
758-
LettuceAssert.isTrue(eventLoop == lastEventLoop, "unexpected: lastEventLoop not match");
760+
LettuceAssert.isTrue(eventLoop == lastEventExecutor, "unexpected: lastEventLoop not match");
759761

760762
final ConnectionContext connectionContext = chan.context;
761763
final @Nullable ConnectionContext.CloseStatus closeStatus = connectionContext.getCloseStatus();
@@ -1019,14 +1021,14 @@ private ChannelFuture channelWrite(Channel channel, RedisCommand<?, ?, ?> comman
10191021
* is terminated (state is RECONNECT_FAILED/ENDPOINT_CLOSED)
10201022
*/
10211023
private void syncAfterTerminated(Runnable runnable) {
1022-
final EventLoop localLastEventLoop = lastEventLoop;
1023-
LettuceAssert.notNull(localLastEventLoop, "lastEventLoop must not be null after terminated");
1024-
if (localLastEventLoop.inEventLoop()) {
1024+
final EventExecutor localLastEventExecutor = lastEventExecutor;
1025+
if (localLastEventExecutor.inEventLoop()) {
10251026
runnable.run();
10261027
} else {
1027-
localLastEventLoop.execute(() -> {
1028+
localLastEventExecutor.execute(() -> {
10281029
runnable.run();
1029-
LettuceAssert.isTrue(lastEventLoop == localLastEventLoop, "lastEventLoop must not be changed after terminated");
1030+
LettuceAssert.isTrue(lastEventExecutor == localLastEventExecutor,
1031+
"lastEventLoop must not be changed after terminated");
10301032
});
10311033
}
10321034
}

0 commit comments

Comments
 (0)