21
21
22
22
import java .net .SocketAddress ;
23
23
import java .time .Duration ;
24
- import java .util .concurrent .CancellationException ;
25
24
import java .util .concurrent .CompletableFuture ;
26
25
import java .util .concurrent .TimeUnit ;
27
26
import java .util .concurrent .atomic .AtomicBoolean ;
27
+ import java .util .function .Consumer ;
28
+ import java .util .function .Supplier ;
28
29
29
30
import io .lettuce .core .ClientOptions ;
30
31
import io .lettuce .core .ConnectionBuilder ;
31
32
import io .lettuce .core .ConnectionEvents ;
33
+ import io .lettuce .core .RedisException ;
32
34
import io .lettuce .core .event .EventBus ;
33
35
import io .lettuce .core .event .connection .ReconnectAttemptEvent ;
34
36
import io .lettuce .core .event .connection .ReconnectFailedEvent ;
@@ -84,9 +86,9 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
84
86
85
87
private final String epid ;
86
88
87
- private final boolean useAutoBatchFlushEndpoint ;
89
+ private final boolean useAutoBatchFlush ;
88
90
89
- private final Endpoint endpoint ;
91
+ private final Consumer < Supplier < Throwable >> endpointFailedToReconnectNotifier ;
90
92
91
93
private Channel channel ;
92
94
@@ -148,8 +150,15 @@ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Boo
148
150
this .eventBus = eventBus ;
149
151
this .redisUri = (String ) bootstrap .config ().attrs ().get (ConnectionBuilder .REDIS_URI );
150
152
this .epid = endpoint .getId ();
151
- this .endpoint = endpoint ;
152
- this .useAutoBatchFlushEndpoint = endpoint instanceof AutoBatchFlushEndpoint ;
153
+ if (endpoint instanceof AutoBatchFlushEndpoint ) {
154
+ this .useAutoBatchFlush = true ;
155
+ endpointFailedToReconnectNotifier = throwableSupplier -> ((AutoBatchFlushEndpoint ) endpoint )
156
+ .notifyReconnectFailed (throwableSupplier .get ());
157
+ } else {
158
+ this .useAutoBatchFlush = false ;
159
+ endpointFailedToReconnectNotifier = ignoredThrowableSupplier -> {
160
+ };
161
+ }
153
162
154
163
Mono <SocketAddress > wrappedSocketAddressSupplier = socketAddressSupplier .doOnNext (addr -> remoteAddress = addr )
155
164
.onErrorResume (t -> {
@@ -215,20 +224,11 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
215
224
channel = null ;
216
225
217
226
if (listenOnChannelInactive && !reconnectionHandler .isReconnectSuspended ()) {
218
- if (!isEventLoopGroupActive ()) {
219
- logger .debug ("isEventLoopGroupActive() == false" );
220
- return ;
221
- }
222
-
223
- if (!isListenOnChannelInactive ()) {
224
- logger .debug ("Skip reconnect scheduling, listener disabled" );
225
- return ;
226
- }
227
-
228
- if (!useAutoBatchFlushEndpoint ) {
227
+ if (!useAutoBatchFlush ) {
229
228
this .scheduleReconnect ();
229
+ } else {
230
+ doReconnectOnAutoBatchFlushEndpointQuiescence = this ::scheduleReconnect ;
230
231
}
231
- doReconnectOnAutoBatchFlushEndpointQuiescence = this ::scheduleReconnect ;
232
232
// otherwise, will be called later by BatchFlushEndpoint#onEndpointQuiescence
233
233
} else {
234
234
logger .debug ("{} Reconnect scheduling disabled" , logPrefix (), ctx );
@@ -237,7 +237,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
237
237
super .channelInactive (ctx );
238
238
}
239
239
240
- boolean willReconnect () {
240
+ boolean willReconnectOnAutoBatchFlushEndpointQuiescence () {
241
241
return doReconnectOnAutoBatchFlushEndpointQuiescence != null ;
242
242
}
243
243
@@ -261,14 +261,16 @@ public void scheduleReconnect() {
261
261
logger .debug ("{} scheduleReconnect()" , logPrefix ());
262
262
263
263
if (!isEventLoopGroupActive ()) {
264
- logger .debug ("isEventLoopGroupActive() == false" );
265
- notifyEndpointFailedToConnectIfNeeded ();
264
+ final String errMsg = "isEventLoopGroupActive() == false" ;
265
+ logger .debug (errMsg );
266
+ notifyEndpointFailedToReconnect (errMsg );
266
267
return ;
267
268
}
268
269
269
270
if (!isListenOnChannelInactive ()) {
270
- logger .debug ("Skip reconnect scheduling, listener disabled" );
271
- notifyEndpointFailedToConnectIfNeeded ();
271
+ final String errMsg = "Skip reconnect scheduling, listener disabled" ;
272
+ logger .debug (errMsg );
273
+ notifyEndpointFailedToReconnect (errMsg );
272
274
return ;
273
275
}
274
276
@@ -285,8 +287,9 @@ public void scheduleReconnect() {
285
287
reconnectScheduleTimeout = null ;
286
288
287
289
if (!isEventLoopGroupActive ()) {
288
- logger .warn ("Cannot execute scheduled reconnect timer, reconnect workers are terminated" );
289
- notifyEndpointFailedToConnectIfNeeded ();
290
+ final String errMsg = "Cannot execute scheduled reconnect timer, reconnect workers are terminated" ;
291
+ logger .warn (errMsg );
292
+ notifyEndpointFailedToReconnect (errMsg );
290
293
return ;
291
294
}
292
295
@@ -302,18 +305,12 @@ public void scheduleReconnect() {
302
305
}
303
306
} else {
304
307
logger .debug ("{} Skipping scheduleReconnect() because I have an active channel" , logPrefix ());
305
- notifyEndpointFailedToConnectIfNeeded ( );
308
+ notifyEndpointFailedToReconnect ( "Skipping scheduleReconnect() because I have an active channel" );
306
309
}
307
310
}
308
311
309
- private void notifyEndpointFailedToConnectIfNeeded () {
310
- notifyEndpointFailedToConnectIfNeeded (new CancellationException ());
311
- }
312
-
313
- private void notifyEndpointFailedToConnectIfNeeded (Exception e ) {
314
- if (useAutoBatchFlushEndpoint ) {
315
- ((AutoBatchFlushEndpoint ) endpoint ).notifyReconnectFailed (e );
316
- }
312
+ void notifyEndpointFailedToReconnect (String msg ) {
313
+ endpointFailedToReconnectNotifier .accept (() -> new RedisException (msg ));
317
314
}
318
315
319
316
/**
@@ -335,26 +332,29 @@ public void run(int attempt) throws Exception {
335
332
* @param delay retry delay.
336
333
* @throws Exception when reconnection fails.
337
334
*/
338
- private void run (int attempt , Duration delay ) throws Exception {
335
+ private void run (int attempt , Duration delay ) {
339
336
340
337
reconnectSchedulerSync .set (false );
341
338
reconnectScheduleTimeout = null ;
342
339
343
340
if (!isEventLoopGroupActive ()) {
344
- logger .debug ("isEventLoopGroupActive() == false" );
345
- notifyEndpointFailedToConnectIfNeeded ();
341
+ final String errMsg = "isEventLoopGroupActive() == false" ;
342
+ logger .debug (errMsg );
343
+ notifyEndpointFailedToReconnect (errMsg );
346
344
return ;
347
345
}
348
346
349
347
if (!isListenOnChannelInactive ()) {
350
- logger .debug ("Skip reconnect scheduling, listener disabled" );
351
- notifyEndpointFailedToConnectIfNeeded ();
348
+ final String errMsg = "Skip reconnect scheduling, listener disabled" ;
349
+ logger .debug (errMsg );
350
+ notifyEndpointFailedToReconnect (errMsg );
352
351
return ;
353
352
}
354
353
355
354
if (isReconnectSuspended ()) {
356
- logger .debug ("Skip reconnect scheduling, reconnect is suspended" );
357
- notifyEndpointFailedToConnectIfNeeded ();
355
+ final String msg = "Skip reconnect scheduling, reconnect is suspended" ;
356
+ logger .debug (msg );
357
+ notifyEndpointFailedToReconnect (msg );
358
358
return ;
359
359
}
360
360
@@ -411,13 +411,14 @@ private void run(int attempt, Duration delay) throws Exception {
411
411
if (!isReconnectSuspended ()) {
412
412
scheduleReconnect ();
413
413
} else {
414
- notifyEndpointFailedToConnectIfNeeded ();
414
+ endpointFailedToReconnectNotifier
415
+ .accept (() -> new RedisException ("got error and then reconnect is suspended" , t ));
415
416
}
416
417
});
417
418
} catch (Exception e ) {
418
419
logger .log (warnLevel , "Cannot reconnect: {}" , e .toString ());
419
420
eventBus .publish (new ReconnectFailedEvent (redisUri , epid , LocalAddress .ANY , remoteAddress , e , attempt ));
420
- notifyEndpointFailedToConnectIfNeeded ( e );
421
+ endpointFailedToReconnectNotifier . accept (() -> e );
421
422
}
422
423
}
423
424
0 commit comments