Skip to content

Commit

Permalink
Align Record/Batch Interceptor lifecycle (#3053)
Browse files Browse the repository at this point in the history
Fixes: #2287 

Resolves #2287

* Align lifecycle for earlyRecordInterceptor(intercept + failure/success + afterRecord) and earlyBatchInterceptor(intercept + failure/success).
* Fix unit test KafkaMessageListenerContainerTests, see #2722 and #2287.
  • Loading branch information
Wzy19930507 authored Feb 21, 2024
1 parent 6f32959 commit 0b321cf
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2662,6 +2662,7 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.earlyBatchInterceptor.success(nextArg, this.consumer);
}
}
return next;
Expand All @@ -2677,6 +2678,8 @@ private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg)
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
+ KafkaUtils.format(recordArg));
ackCurrent(recordArg);
this.earlyRecordInterceptor.success(recordArg, this.consumer);
this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer);
}
}
return cRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3898,6 +3898,9 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
containerProps.setMessageListener((MessageListener<?, ?>) msg -> {
});
containerProps.setClientId("clientId");
if (early) {
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
}

RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {

Expand All @@ -3922,17 +3925,27 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer));
if (ackMode.equals(AckMode.RECORD)) {
if (AckMode.RECORD.equals(ackMode)) {
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))),
any(Duration.class));
}
else {
verify(consumer, never()).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))),
any(Duration.class));
}
inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer));
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
any(Duration.class));
if (AckMode.RECORD.equals(ackMode)) {
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
any(Duration.class));
}
inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer));
inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer));
if (AckMode.BATCH.equals(ackMode)) {
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
any(Duration.class));
}
container.stop();
}

Expand Down Expand Up @@ -3968,7 +3981,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception
containerProps.setMessageListener((BatchMessageListener<?, ?>) msgs -> {
});
containerProps.setClientId("clientId");
if (!early) {
if (early) {
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
}

Expand All @@ -3995,6 +4008,7 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
inOrder.verify(interceptor).setupThreadState(eq(consumer));
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(interceptor).intercept(any(), eq(consumer));
inOrder.verify(interceptor).success(any(), eq(consumer));
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
any(Duration.class));
container.stop();
Expand Down

0 comments on commit 0b321cf

Please sign in to comment.