@@ -650,13 +650,22 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) {
650
650
}
651
651
652
652
if (chan .context .autoBatchFlushEndPointContext .hasOngoingSendLoop .tryEnter ()) {
653
- // Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get):
654
- // 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls
655
- // Avg latency: 3.2956217278663s
656
- // Avg QPS: 495238.50056392356/s
657
- // 2. uses eventLoop.execute() directly
658
- // Avg latency: 3.2677197021496998s
659
- // Avg QPS: 476925.0751855796/s
653
+ // Benchmark result:
654
+ // Redis:
655
+ // engine: 7.1.0
656
+ // server: AWS elasticcache cache.r7g.large
657
+ // Client: EC2-c5n.2xlarge
658
+ // Test Model:
659
+ // multi-thread sync exists (./bench-multi-thread-exists.sh -b 32 -s 10 -n 80000 -t 64)
660
+ // Test Parameter:
661
+ // thread num: 64, loop num: 80000, batch size: 32, write spin count: 10
662
+ //
663
+ // With tryEnter():
664
+ // Avg latency: 0.64917373203125ms
665
+ // Avg QPS: 196037.67991971457/s
666
+ // Without tryEnter():
667
+ // Avg latency: 0.6618976359375001ms
668
+ // Avg QPS: 192240.1301551348/s
660
669
eventLoop .execute (() -> loopSend (chan , true ));
661
670
}
662
671
@@ -681,31 +690,32 @@ private void loopSend(final ContextualChannel chan, boolean entered) {
681
690
}
682
691
683
692
private void loopSend0 (final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext , final ContextualChannel chan ,
684
- int remainingSpinnCount , boolean entered ) {
693
+ int remainingSpinnCount , final boolean entered ) {
685
694
do {
686
- final int count = pollBatch (autoBatchFlushEndPointContext , chan );
695
+ final int count = DefaultAutoBatchFlushEndpoint .this .pollBatch (autoBatchFlushEndPointContext , chan );
696
+ if (count == 0 ) {
697
+ break ;
698
+ }
687
699
if (count < 0 ) {
688
700
return ;
689
701
}
690
- if (count < batchSize ) {
691
- if (!entered ) {
692
- return ;
693
- }
694
- // queue was empty
695
- // The send loop will be triggered later when a new task is added,
696
- // // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call.
697
- autoBatchFlushEndPointContext .hasOngoingSendLoop .exit ();
698
- if (taskQueue .isEmpty ()) {
699
- return ;
700
- }
701
- entered = false ;
702
- // // Guarantee thread-safety: no dangling tasks in the queue.
703
- }
704
702
} while (--remainingSpinnCount > 0 );
705
703
706
- final boolean finalEntered = entered ;
707
- // Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread.
708
- chan .eventLoop ().execute (() -> loopSend (chan , finalEntered ));
704
+ if (remainingSpinnCount <= 0 ) {
705
+ // Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread.
706
+ chan .eventLoop ().execute (() -> loopSend (chan , entered ));
707
+ return ;
708
+ }
709
+
710
+ if (entered ) {
711
+ // queue was empty
712
+ // The send loop will be triggered later when a new task is added,
713
+ autoBatchFlushEndPointContext .hasOngoingSendLoop .exit ();
714
+ // Guarantee thread-safety: no dangling tasks in the queue, see scheduleSendJobIfNeeded()
715
+ if (!taskQueue .isEmpty ()) {
716
+ loopSend0 (autoBatchFlushEndPointContext , chan , remainingSpinnCount , false );
717
+ }
718
+ }
709
719
}
710
720
711
721
private int pollBatch (final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext , ContextualChannel chan ) {
@@ -771,24 +781,6 @@ private void trySetEndpointQuiescence(ContextualChannel chan) {
771
781
}
772
782
}
773
783
774
- private void onEndpointQuiescence () {
775
- if (channel .context .initialState == ConnectionContext .State .ENDPOINT_CLOSED ) {
776
- return ;
777
- }
778
-
779
- this .logPrefix = null ;
780
- // Create happens-before with channelActive()
781
- if (!CHANNEL .compareAndSet (this , DummyContextualChannelInstances .CHANNEL_WILL_RECONNECT ,
782
- DummyContextualChannelInstances .CHANNEL_CONNECTING )) {
783
- onUnexpectedState ("onEndpointQuiescence" , ConnectionContext .State .WILL_RECONNECT );
784
- return ;
785
- }
786
-
787
- // neither connectionWatchdog nor doReconnectOnEndpointQuiescence could be null
788
- // noinspection DataFlowIssue
789
- connectionWatchdog .reconnectOnAutoBatchFlushEndpointQuiescence ();
790
- }
791
-
792
784
private void onWillReconnect (@ Nonnull final ConnectionContext .CloseStatus closeStatus ,
793
785
final AutoBatchFlushEndPointContext autoBatchFlushEndPointContext ) {
794
786
final @ Nullable Deque <RedisCommand <?, ?, ?>> retryableFailedToSendTasks = autoBatchFlushEndPointContext
@@ -831,6 +823,25 @@ private void onWontReconnect(@Nonnull final ConnectionContext.CloseStatus closeS
831
823
}
832
824
}
833
825
826
+ private void onEndpointQuiescence () {
827
+ if (channel .context .initialState == ConnectionContext .State .ENDPOINT_CLOSED ) {
828
+ return ;
829
+ }
830
+
831
+ this .logPrefix = null ;
832
+ // Create happens-before with channelActive()
833
+ if (!CHANNEL .compareAndSet (this , DummyContextualChannelInstances .CHANNEL_WILL_RECONNECT ,
834
+ DummyContextualChannelInstances .CHANNEL_CONNECTING )) {
835
+ onUnexpectedState ("onEndpointQuiescence" , ConnectionContext .State .WILL_RECONNECT );
836
+ return ;
837
+ }
838
+
839
+ // notify connectionWatchDog that it is safe to reconnect now.
840
+ // neither connectionWatchdog nor doReconnectOnEndpointQuiescence could be null
841
+ // noinspection DataFlowIssue
842
+ connectionWatchdog .reconnectOnAutoBatchFlushEndpointQuiescence ();
843
+ }
844
+
834
845
private void offerFirstAll (Deque <RedisCommand <?, ?, ?>> commands ) {
835
846
commands .forEach (cmd -> {
836
847
if (cmd instanceof DemandAware .Sink ) {
0 commit comments