Skip to content

Commit

Permalink
Align Record/Batch Interceptor lifecycle (#3053)
Browse files Browse the repository at this point in the history
Fixes: #2287

Resolves #2287

* Align lifecycle for earlyRecordInterceptor(intercept + failure/success + afterRecord) and earlyBatchInterceptor(intercept + failure/success).
* Fix unit test KafkaMessageListenerContainerTests, see #2722 and #2287.

(cherry picked from commit 0b321cf)
  • Loading branch information
Wzy19930507 authored and sobychacko committed Mar 1, 2024
1 parent 7077f4e commit fd150ad
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2665,6 +2665,7 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.earlyBatchInterceptor.success(nextArg, this.consumer);
}
}
return next;
Expand All @@ -2680,6 +2681,8 @@ private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> recordArg)
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
+ KafkaUtils.format(recordArg));
ackCurrent(recordArg);
this.earlyRecordInterceptor.success(recordArg, this.consumer);
this.earlyRecordInterceptor.afterRecord(recordArg, this.consumer);
}
}
return cRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3901,6 +3901,9 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
containerProps.setMessageListener((MessageListener) msg -> {
});
containerProps.setClientId("clientId");
if (early) {
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
}

RecordInterceptor<Integer, String> recordInterceptor = spy(new RecordInterceptor<Integer, String>() {

Expand All @@ -3925,17 +3928,27 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
inOrder.verify(recordInterceptor).setupThreadState(eq(consumer));
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(recordInterceptor).intercept(eq(firstRecord), eq(consumer));
if (ackMode.equals(AckMode.RECORD)) {
if (AckMode.RECORD.equals(ackMode)) {
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))),
any(Duration.class));
}
else {
verify(consumer, never()).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(1L))),
any(Duration.class));
}
inOrder.verify(recordInterceptor).success(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor).afterRecord(eq(firstRecord), eq(consumer));
inOrder.verify(recordInterceptor).intercept(eq(secondRecord), eq(consumer));
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
any(Duration.class));
if (AckMode.RECORD.equals(ackMode)) {
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
any(Duration.class));
}
inOrder.verify(recordInterceptor).success(eq(secondRecord), eq(consumer));
inOrder.verify(recordInterceptor).afterRecord(eq(secondRecord), eq(consumer));
if (AckMode.BATCH.equals(ackMode)) {
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
any(Duration.class));
}
container.stop();
}

Expand Down Expand Up @@ -3971,7 +3984,7 @@ public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception
containerProps.setMessageListener((BatchMessageListener) msgs -> {
});
containerProps.setClientId("clientId");
if (!early) {
if (early) {
containerProps.setTransactionManager(mock(PlatformTransactionManager.class));
}

Expand All @@ -3998,6 +4011,7 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
inOrder.verify(interceptor).setupThreadState(eq(consumer));
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
inOrder.verify(interceptor).intercept(any(), eq(consumer));
inOrder.verify(interceptor).success(any(), eq(consumer));
inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))),
any(Duration.class));
container.stop();
Expand Down

0 comments on commit fd150ad

Please sign in to comment.