@@ -601,15 +601,17 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) {
601
601
LettuceAssert .assertState (chan .eventLoop ().inEventLoop (), "must be called in event loop thread" );
602
602
603
603
// Schedule directly
604
- if (chan .context .batchFlushEndPointContext .hasOngoingSendLoop .tryEnter ()) {
605
- loopSend (chan );
606
- }
607
- // Otherwise:
608
- // someone will do the job for us
604
+ loopSend (chan , false );
609
605
}
610
606
611
607
private void scheduleSendJobIfNeeded (final ContextualChannel chan ) {
612
608
final EventLoop eventLoop = chan .eventLoop ();
609
+ if (eventLoop .inEventLoop ()) {
610
+ // Possible in reactive() mode.
611
+ loopSend (chan , false );
612
+ return ;
613
+ }
614
+
613
615
if (chan .context .batchFlushEndPointContext .hasOngoingSendLoop .tryEnter ()) {
614
616
// Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get):
615
617
// 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls
@@ -618,7 +620,7 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
618
620
// 2. uses eventLoop.execute() directly
619
621
// Avg latency: 3.2677197021496998s
620
622
// Avg QPS: 476925.0751855796/s
621
- eventLoop .execute (() -> loopSend (chan ));
623
+ eventLoop .execute (() -> loopSend (chan , true ));
622
624
}
623
625
624
626
// Otherwise:
@@ -629,19 +631,19 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
629
631
// second loopSend0(), which will call poll()
630
632
}
631
633
632
- private void loopSend (final ContextualChannel chan ) {
634
+ private void loopSend (final ContextualChannel chan , boolean entered ) {
633
635
final ConnectionContext connectionContext = chan .context ;
634
636
final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext .batchFlushEndPointContext ;
635
637
if (connectionContext .isChannelInactiveEventFired () || batchFlushEndPointContext .hasRetryableFailedToSendTasks ()) {
636
638
return ;
637
639
}
638
640
639
641
LettuceAssert .assertState (channel == chan , "unexpected: channel not match but closeStatus == null" );
640
- loopSend0 (batchFlushEndPointContext , chan , writeSpinCount , false );
642
+ loopSend0 (batchFlushEndPointContext , chan , writeSpinCount , entered );
641
643
}
642
644
643
645
private void loopSend0 (final BatchFlushEndPointContext batchFlushEndPointContext , final ContextualChannel chan ,
644
- int remainingSpinnCount , final boolean exited ) {
646
+ int remainingSpinnCount , final boolean entered ) {
645
647
do {
646
648
final int count = pollBatch (batchFlushEndPointContext , chan );
647
649
if (count < 0 ) {
@@ -655,16 +657,16 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext
655
657
656
658
if (remainingSpinnCount <= 0 ) {
657
659
// Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread.
658
- chan .eventLoop ().execute (() -> loopSend (chan ));
660
+ chan .eventLoop ().execute (() -> loopSend (chan , entered ));
659
661
return ;
660
662
}
661
663
662
- if (! exited ) {
664
+ if (entered ) {
663
665
// The send loop will be triggered later when a new task is added,
664
666
// // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call.
665
667
batchFlushEndPointContext .hasOngoingSendLoop .exit ();
666
668
// // Guarantee thread-safety: no dangling tasks in the queue.
667
- loopSend0 (batchFlushEndPointContext , chan , remainingSpinnCount , true );
669
+ loopSend0 (batchFlushEndPointContext , chan , remainingSpinnCount , false );
668
670
// chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100,
669
671
// TimeUnit.NANOSECONDS);
670
672
}
0 commit comments