67
67
import com .linecorp .decaton .protocol .Decaton .TaskMetadataProto ;
68
68
import com .linecorp .decaton .protocol .Sample .HelloTask ;
69
69
70
+ import lombok .RequiredArgsConstructor ;
71
+
70
72
@ ExtendWith (MockitoExtension .class )
71
73
public class ProcessingContextImplTest {
72
74
private static class NamedProcessor implements DecatonProcessor <HelloTask > {
@@ -90,6 +92,22 @@ public void process(ProcessingContext<HelloTask> ctx, HelloTask task)
90
92
}
91
93
}
92
94
95
+ @ RequiredArgsConstructor
96
+ private static class AsyncCompleteProcessor implements DecatonProcessor <byte []> {
97
+ private final CountDownLatch latch ;
98
+
99
+ @ Override
100
+ public void process (ProcessingContext <byte []> context , byte [] task ) throws InterruptedException {
101
+ Completion comp = context .deferCompletion ();
102
+ new Thread (() -> {
103
+ try {
104
+ latch .await ();
105
+ } catch (InterruptedException ignored ) {}
106
+ comp .complete ();
107
+ }).start ();
108
+ }
109
+ }
110
+
93
111
private static final HelloTask TASK = HelloTask .getDefaultInstance ();
94
112
95
113
private static final DecatonTaskRequest REQUEST =
@@ -354,21 +372,7 @@ public void testPush_Level2_MultiPush_SyncAndAsync() throws InterruptedException
354
372
@ Timeout (5 )
355
373
public void testRetry () throws InterruptedException {
356
374
CountDownLatch retryLatch = new CountDownLatch (1 );
357
- DecatonProcessor <byte []> retryProcessor = spy (
358
- // This can't be a lambda for mockito
359
- new DecatonProcessor <byte []>() {
360
- @ Override
361
- public void process (ProcessingContext <byte []> context , byte [] task )
362
- throws InterruptedException {
363
- Completion comp = context .deferCompletion ();
364
- new Thread (() -> {
365
- try {
366
- retryLatch .await ();
367
- } catch (InterruptedException ignored ) {}
368
- comp .complete ();
369
- }).start ();
370
- }
371
- });
375
+ DecatonProcessor <byte []> retryProcessor = spy (new AsyncCompleteProcessor (retryLatch ));
372
376
TaskRequest request = new TaskRequest (
373
377
new TopicPartition ("topic" , 1 ), 1 , null , "TEST" .getBytes (StandardCharsets .UTF_8 ), null , null , REQUEST .toByteArray (), null );
374
378
DecatonTask <byte []> task = new DecatonTask <>(
@@ -399,21 +403,7 @@ public void testRetry_NOT_CONFIGURED() throws InterruptedException {
399
403
@ Timeout (5 )
400
404
public void testRetryAtCompletionTimeout () throws InterruptedException {
401
405
CountDownLatch retryLatch = new CountDownLatch (1 );
402
- DecatonProcessor <byte []> retryProcessor = spy (
403
- // This can't be a lambda for mockito
404
- new DecatonProcessor <byte []>() {
405
- @ Override
406
- public void process (ProcessingContext <byte []> context , byte [] task )
407
- throws InterruptedException {
408
- Completion comp = context .deferCompletion ();
409
- new Thread (() -> {
410
- try {
411
- retryLatch .await ();
412
- } catch (InterruptedException ignored ) {}
413
- comp .complete ();
414
- }).start ();
415
- }
416
- });
406
+ DecatonProcessor <byte []> retryProcessor = spy (new AsyncCompleteProcessor (retryLatch ));
417
407
TaskRequest request = new TaskRequest (
418
408
new TopicPartition ("topic" , 1 ), 1 , null , "TEST" .getBytes (StandardCharsets .UTF_8 ), null , null , REQUEST .toByteArray (), null );
419
409
DecatonTask <byte []> task = new DecatonTask <>(
0 commit comments