Skip to content

Commit 2b57334

Browse files
committed
refactor: rename BatchFlushEndpointContext->AutoBatchFlushEndpointContext
1 parent 205813a commit 2b57334

6 files changed

+83
-100
lines changed

src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java src/main/java/io/lettuce/core/context/AutoBatchFlushEndPointContext.java

+27-26
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
/**
1616
* @author chenxiaofan
1717
*/
18-
public class BatchFlushEndPointContext {
18+
public class AutoBatchFlushEndPointContext {
1919

20-
private static final InternalLogger logger = InternalLoggerFactory.getInstance(BatchFlushEndPointContext.class);
20+
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AutoBatchFlushEndPointContext.class);
2121

2222
public static class HasOngoingSendLoop {
2323

@@ -51,26 +51,26 @@ public void exit() {
5151

5252
}
5353

54-
BatchFlushEndPointContext() {
54+
AutoBatchFlushEndPointContext() {
5555
}
5656

5757
/**
58-
* Tasks that failed to send (probably due to connection errors)
58+
* Commands that failed to send (probably due to connection errors)
5959
*/
6060
@Nullable
61-
Deque<RedisCommand<?, ?, ?>> retryableFailedToSendTasks = null;
61+
Deque<RedisCommand<?, ?, ?>> retryableFailedToSendCommands = null;
6262

6363
Throwable firstDiscontinueReason = null;
6464

6565
public Throwable getFirstDiscontinueReason() {
6666
return firstDiscontinueReason;
6767
}
6868

69-
private int flyingTaskNum;
69+
private int flyingCmdNum;
7070

7171
@SuppressWarnings("unused")
72-
public int getFlyingTaskNum() {
73-
return flyingTaskNum;
72+
public int getFlyingCmdNum() {
73+
return flyingCmdNum;
7474
}
7575

7676
private int total = 0;
@@ -83,47 +83,48 @@ public int getTotal() {
8383

8484
public void add(int n) {
8585
this.total += n;
86-
this.flyingTaskNum += n;
86+
this.flyingCmdNum += n;
8787
}
8888

89-
public @Nullable Deque<RedisCommand<?, ?, ?>> getAndClearRetryableFailedToSendTasks() {
90-
final Deque<RedisCommand<?, ?, ?>> old = this.retryableFailedToSendTasks;
91-
// don't set to null so give us a chance to expose potential bugs if there is addRetryableFailedToSendTask() afterwards
92-
this.retryableFailedToSendTasks = UnmodifiableDeque.emptyDeque();
89+
public @Nullable Deque<RedisCommand<?, ?, ?>> getAndClearRetryableFailedToSendCommands() {
90+
final Deque<RedisCommand<?, ?, ?>> old = this.retryableFailedToSendCommands;
91+
// don't set to null so give us a chance to expose potential bugs if there is addRetryableFailedToSendCommand()
92+
// afterwards
93+
this.retryableFailedToSendCommands = UnmodifiableDeque.emptyDeque();
9394
return old;
9495
}
9596

9697
public void done(int n) {
97-
this.flyingTaskNum -= n;
98+
this.flyingCmdNum -= n;
9899
}
99100

100101
public boolean isDone() {
101-
if (this.flyingTaskNum < 0) {
102-
logger.error("[unexpected] flyingTaskNum < 0, flyingTaskNum: {}, total: {}", this.flyingTaskNum, this.total);
102+
if (this.flyingCmdNum < 0) {
103+
logger.error("[unexpected] flyingCmdNum < 0, flyingCmdNum: {}, total: {}", this.flyingCmdNum, this.total);
103104
return true;
104105
}
105-
return this.flyingTaskNum == 0;
106+
return this.flyingCmdNum == 0;
106107
}
107108

108-
public boolean hasRetryableFailedToSendTasks() {
109-
return retryableFailedToSendTasks != null;
109+
public boolean hasRetryableFailedToSendCommands() {
110+
return retryableFailedToSendCommands != null;
110111
}
111112

112113
/**
113-
* @param retryableTask retryable task
114+
* @param retryableCommand retryable command
114115
* @param cause fail reason
115-
* @return true if this is the first retryable failed task
116+
* @return true if this is the first retryable failed command
116117
*/
117-
public boolean addRetryableFailedToSendTask(RedisCommand<?, ?, ?> retryableTask, @Nonnull Throwable cause) {
118-
if (retryableFailedToSendTasks == null) {
119-
retryableFailedToSendTasks = new ArrayDeque<>();
120-
retryableFailedToSendTasks.add(retryableTask);
118+
public boolean addRetryableFailedToSendCommand(RedisCommand<?, ?, ?> retryableCommand, @Nonnull Throwable cause) {
119+
if (retryableFailedToSendCommands == null) {
120+
retryableFailedToSendCommands = new ArrayDeque<>();
121+
retryableFailedToSendCommands.add(retryableCommand);
121122

122123
firstDiscontinueReason = cause;
123124
return true;
124125
}
125126

126-
retryableFailedToSendTasks.add(retryableTask);
127+
retryableFailedToSendCommands.add(retryableCommand);
127128
return false;
128129
}
129130

src/main/java/io/lettuce/core/context/ConnectionContext.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ public boolean isConnected() {
6767

6868
public final State initialState;
6969

70-
public final BatchFlushEndPointContext batchFlushEndPointContext;
70+
public final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext;
7171

7272
public ConnectionContext(State initialState) {
7373
this.initialState = initialState;
74-
this.batchFlushEndPointContext = new BatchFlushEndPointContext();
74+
this.autoBatchFlushEndPointContext = new AutoBatchFlushEndPointContext();
7575
}
7676

7777
/* below fields must be accessed by the event loop thread only */
@@ -92,10 +92,6 @@ public boolean isChannelInactiveEventFired() {
9292

9393
private boolean channelQuiescent = false;
9494

95-
public boolean isChannelQuiescent() {
96-
return channelQuiescent;
97-
}
98-
9995
public boolean setChannelQuiescentOnce() {
10096
if (channelQuiescent) {
10197
return false;

src/main/java/io/lettuce/core/protocol/CommandHandler.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom
9696

9797
private final Endpoint endpoint;
9898

99-
private final boolean supportsBatchFlush;
99+
private final boolean supportsAutoBatchFlush;
100100

101101
private final ArrayDeque<RedisCommand<?, ?, ?>> stack = new ArrayDeque<>();
102102

@@ -154,7 +154,7 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc
154154
this.clientOptions = clientOptions;
155155
this.clientResources = clientResources;
156156
this.endpoint = endpoint;
157-
this.supportsBatchFlush = endpoint instanceof AutoBatchFlushEndpoint;
157+
this.supportsAutoBatchFlush = endpoint instanceof AutoBatchFlushEndpoint;
158158
this.commandLatencyRecorder = clientResources.commandLatencyRecorder();
159159
this.latencyMetricsEnabled = commandLatencyRecorder.isEnabled();
160160
this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE;
@@ -377,9 +377,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
377377
setState(LifecycleState.DEACTIVATING);
378378

379379
endpoint.notifyChannelInactive(ctx.channel());
380-
Deque<RedisCommand<?, ?, ?>> batchFlushRetryableDrainQueuedCommands = UnmodifiableDeque.emptyDeque();
381-
if (supportsBatchFlush) {
382-
batchFlushRetryableDrainQueuedCommands = drainStack();
380+
Deque<RedisCommand<?, ?, ?>> autoBatchFlushRetryableDrainQueuedCommands = UnmodifiableDeque.emptyDeque();
381+
if (supportsAutoBatchFlush) {
382+
autoBatchFlushRetryableDrainQueuedCommands = drainStack();
383383
} else {
384384
endpoint.notifyDrainQueuedCommands(this);
385385
}
@@ -397,10 +397,10 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
397397

398398
super.channelInactive(ctx);
399399

400-
if (supportsBatchFlush) {
400+
if (supportsAutoBatchFlush) {
401401
// Needs decision of watchdog
402402
((AutoBatchFlushEndpoint) endpoint).notifyChannelInactiveAfterWatchdogDecision(ctx.channel(),
403-
batchFlushRetryableDrainQueuedCommands);
403+
autoBatchFlushRetryableDrainQueuedCommands);
404404
}
405405
}
406406

src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
8484

8585
private final String epid;
8686

87-
private final boolean useBatchFlushEndpoint;
87+
private final boolean useAutoBatchFlushEndpoint;
8888

8989
private final Endpoint endpoint;
9090

@@ -149,7 +149,7 @@ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Boo
149149
this.redisUri = (String) bootstrap.config().attrs().get(ConnectionBuilder.REDIS_URI);
150150
this.epid = endpoint.getId();
151151
this.endpoint = endpoint;
152-
this.useBatchFlushEndpoint = endpoint instanceof AutoBatchFlushEndpoint;
152+
this.useAutoBatchFlushEndpoint = endpoint instanceof AutoBatchFlushEndpoint;
153153

154154
Mono<SocketAddress> wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr)
155155
.onErrorResume(t -> {
@@ -226,7 +226,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
226226
}
227227

228228
doReconnectOnEndpointQuiescence = this::scheduleReconnect;
229-
if (!useBatchFlushEndpoint) {
229+
if (!useAutoBatchFlushEndpoint) {
230230
doReconnectOnEndpointQuiescence.run();
231231
}
232232
// otherwise, will be called later by BatchFlushEndpoint#onEndpointQuiescence
@@ -307,7 +307,7 @@ private void notifyEndpointFailedToConnectIfNeeded() {
307307
}
308308

309309
private void notifyEndpointFailedToConnectIfNeeded(Exception e) {
310-
if (useBatchFlushEndpoint) {
310+
if (useAutoBatchFlushEndpoint) {
311311
((AutoBatchFlushEndpoint) endpoint).notifyReconnectFailed(e);
312312
}
313313
}

0 commit comments

Comments
 (0)