@@ -650,13 +650,22 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
650
650
}
651
651
652
652
if (chan .context .autoBatchFlushEndPointContext .hasOngoingSendLoop .tryEnter ()) {
653
- // Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get):
654
- // 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls
655
- // Avg latency: 3.2956217278663s
656
- // Avg QPS: 495238.50056392356/s
657
- // 2. uses eventLoop.execute() directly
658
- // Avg latency: 3.2677197021496998s
659
- // Avg QPS: 476925.0751855796/s
653
+ // Benchmark result:
654
+ // Redis:
655
+ // engine: 7.1.0
656
+ // server: AWS elasticcache cache.r7g.large
657
+ // Client: EC2-c5n.2xlarge
658
+ // Test Model:
659
+ // multi-thread sync exists (./bench-multi-thread-exists.sh -b 32 -s 10 -n 80000 -t 64)
660
+ // Test Parameter:
661
+ // thread num: 64, loop num: 80000, batch size: 32, write spin count: 10
662
+ //
663
+ // With tryEnter():
664
+ // Avg latency: 0.64917373203125ms
665
+ // Avg QPS: 196037.67991971457/s
666
+ // Without tryEnter():
667
+ // Avg latency: 0.6618976359375001ms
668
+ // Avg QPS: 192240.1301551348/s
660
669
eventLoop .execute (() -> loopSend (chan , true ));
661
670
}
662
671
@@ -681,31 +690,33 @@ private void loopSend(final ContextualChannel chan, boolean entered) {
681
690
}
682
691
683
692
private void loopSend0 (final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext , final ContextualChannel chan ,
684
- int remainingSpinnCount , boolean entered ) {
693
+ int remainingSpinnCount , final boolean entered ) {
685
694
do {
686
- final int count = pollBatch (autoBatchFlushEndPointContext , chan );
695
+ final int count = DefaultAutoBatchFlushEndpoint . this . pollBatch (autoBatchFlushEndPointContext , chan );
687
696
if (count < 0 ) {
688
697
return ;
689
698
}
690
- if (count < batchSize ) {
691
- if (!entered ) {
692
- return ;
693
- }
694
- // queue was empty
695
- // The send loop will be triggered later when a new task is added,
696
- // // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call.
697
- autoBatchFlushEndPointContext .hasOngoingSendLoop .exit ();
698
- if (taskQueue .isEmpty ()) {
699
- return ;
700
- }
701
- entered = false ;
702
- // // Guarantee thread-safety: no dangling tasks in the queue.
699
+
700
+ if (count == 0 ) {
701
+ break ;
703
702
}
704
703
} while (--remainingSpinnCount > 0 );
705
704
706
- final boolean finalEntered = entered ;
707
- // Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread.
708
- chan .eventLoop ().execute (() -> loopSend (chan , finalEntered ));
705
+ if (remainingSpinnCount <= 0 ) {
706
+ // Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread.
707
+ chan .eventLoop ().execute (() -> loopSend (chan , entered ));
708
+ return ;
709
+ }
710
+
711
+ if (entered ) {
712
+ // queue was empty
713
+ // The send loop will be triggered later when a new task is added,
714
+ autoBatchFlushEndPointContext .hasOngoingSendLoop .exit ();
715
+ // Guarantee thread-safety: no dangling tasks in the queue, see scheduleSendJobIfNeeded()
716
+ if (!taskQueue .isEmpty ()) {
717
+ loopSend0 (autoBatchFlushEndPointContext , chan , remainingSpinnCount , false );
718
+ }
719
+ }
709
720
}
710
721
711
722
private int pollBatch (final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext , ContextualChannel chan ) {
@@ -771,24 +782,6 @@ private void trySetEndpointQuiescence(ContextualChannel chan) {
771
782
}
772
783
}
773
784
774
- private void onEndpointQuiescence () {
775
- if (channel .context .initialState == ConnectionContext .State .ENDPOINT_CLOSED ) {
776
- return ;
777
- }
778
-
779
- this .logPrefix = null ;
780
- // Create happens-before with channelActive()
781
- if (!CHANNEL .compareAndSet (this , DummyContextualChannelInstances .CHANNEL_WILL_RECONNECT ,
782
- DummyContextualChannelInstances .CHANNEL_CONNECTING )) {
783
- onUnexpectedState ("onEndpointQuiescence" , ConnectionContext .State .WILL_RECONNECT );
784
- return ;
785
- }
786
-
787
- // neither connectionWatchdog nor doReconnectOnEndpointQuiescence could be null
788
- // noinspection DataFlowIssue
789
- connectionWatchdog .reconnectOnAutoBatchFlushEndpointQuiescence ();
790
- }
791
-
792
785
private void onWillReconnect (@ Nonnull final ConnectionContext .CloseStatus closeStatus ,
793
786
final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext ) {
794
787
final @ Nullable Deque <RedisCommand <?, ?, ?>> retryableFailedToSendTasks = autoBatchFlushEndPointContext
@@ -831,6 +824,25 @@ private void onWontReconnect(@Nonnull final ConnectionContext.CloseStatus closeS
831
824
}
832
825
}
833
826
827
+ private void onEndpointQuiescence () {
828
+ if (channel .context .initialState == ConnectionContext .State .ENDPOINT_CLOSED ) {
829
+ return ;
830
+ }
831
+
832
+ this .logPrefix = null ;
833
+ // Create happens-before with channelActive()
834
+ if (!CHANNEL .compareAndSet (this , DummyContextualChannelInstances .CHANNEL_WILL_RECONNECT ,
835
+ DummyContextualChannelInstances .CHANNEL_CONNECTING )) {
836
+ onUnexpectedState ("onEndpointQuiescence" , ConnectionContext .State .WILL_RECONNECT );
837
+ return ;
838
+ }
839
+
840
+ // notify connectionWatchDog that it is safe to reconnect now.
841
+ // neither connectionWatchdog nor doReconnectOnEndpointQuiescence could be null
842
+ // noinspection DataFlowIssue
843
+ connectionWatchdog .reconnectOnAutoBatchFlushEndpointQuiescence ();
844
+ }
845
+
834
846
private void offerFirstAll (Deque <RedisCommand <?, ?, ?>> commands ) {
835
847
commands .forEach (cmd -> {
836
848
if (cmd instanceof DemandAware .Sink ) {
0 commit comments