From 77357948896e94bb7b3c06434af14f756de709ce Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 28 Mar 2024 13:19:36 -0400 Subject: [PATCH] GH-3166: BatchInterceptor issues with retries Fixes: #3166 * When retries are enabled, batch interceptor is not invoking the intercept methods for failures on retries and the possible eventual success method call. Addressing this issue. * Addressing PR review (cherry picked from commit c26f26e4598f40c0bf32b5294b110de438b3faf6) --- .../KafkaMessageListenerContainer.java | 34 +++++---- .../KafkaMessageListenerContainerTests.java | 75 ++++++++++++++++++- 2 files changed, 93 insertions(+), 16 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 6d8be1fab9..9b0a26d42b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2272,12 +2272,8 @@ private List> createRecordList(final ConsumerRecords @Nullable private RuntimeException doInvokeBatchListener(final ConsumerRecords records, // NOSONAR List> recordList) { - - Object sample = startMicrometerSample(); try { invokeBatchOnMessage(records, recordList); - batchInterceptAfter(records, null); - successTimer(sample, null); if (this.batchFailed) { this.batchFailed = false; if (this.commonErrorHandler != null) { @@ -2290,9 +2286,6 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor } } catch (RuntimeException e) { - this.batchFailed = true; - failureTimer(sample, null); - batchInterceptAfter(records, e); if (this.commonErrorHandler == null) { throw e; } @@ -2446,15 +2439,26 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords r recordList = createRecordList(records); } } - if (this.wantsFullRecords) { - this.batchListener.onMessage(records, // NOSONAR - this.isAnyManualAck - ? new ConsumerBatchAcknowledgment(records, recordList) - : null, - this.consumer); + Object sample = startMicrometerSample(); + try { + if (this.wantsFullRecords) { + this.batchListener.onMessage(records, // NOSONAR + this.isAnyManualAck + ? new ConsumerBatchAcknowledgment(records, recordList) + : null, + this.consumer); + } + else { + doInvokeBatchOnMessage(records, recordList); // NOSONAR + } + batchInterceptAfter(records, null); + successTimer(sample, null); } - else { - doInvokeBatchOnMessage(records, recordList); // NOSONAR + catch (RuntimeException e) { + this.batchFailed = true; + failureTimer(sample, null); + batchInterceptAfter(records, e); + throw e; } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 2abf892d41..42489e65d9 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -4021,7 +4021,6 @@ public ConsumerRecords intercept(ConsumerRecords consumer) { container.stop(); } + @Test + @SuppressWarnings("unchecked") + public void invokeBatchInterceptorSuccessFailureOnRetry() throws Exception { + ConsumerFactory cf = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer); + ConsumerRecord firstRecord = new ConsumerRecord<>("test-topic", 0, 0L, 1, "data-1"); + ConsumerRecord secondRecord = new ConsumerRecord<>("test-topic", 0, 1L, 1, "data-2"); + Map>> records = new HashMap<>(); + records.put(new TopicPartition("test-topic", 0), List.of(firstRecord, secondRecord)); + ConsumerRecords consumerRecords = new ConsumerRecords<>(records); + AtomicInteger invocation = new AtomicInteger(0); + given(consumer.poll(any(Duration.class))).willAnswer(i -> { + if (invocation.getAndIncrement() == 0) { + return consumerRecords; + } + else { + // Subsequent polls after the first one returns empty records. + return new ConsumerRecords(Map.of()); + } + }); + TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] { + new TopicPartitionOffset("test-topic", 0) }; + + CountDownLatch latch = new CountDownLatch(4); // 3 failures, 1 success + BatchMessageListener batchMessageListener = spy( + new BatchMessageListener() { // Cannot be lambda: Mockito doesn't mock final classes + + @Override + public void onMessage(List> data) { + latch.countDown(); + if (latch.getCount() > 0) { + throw new IllegalArgumentException("Failed record"); + } + } + + }); + + ContainerProperties containerProps = new ContainerProperties(topicPartition); + containerProps.setGroupId("grp"); + containerProps.setAckMode(ContainerProperties.AckMode.BATCH); + containerProps.setMissingTopicsFatal(false); + containerProps.setMessageListener(batchMessageListener); + containerProps.setClientId("clientId"); + + BatchInterceptor batchInterceptor = spy(new BatchInterceptor() { + + @Override + public ConsumerRecords intercept(ConsumerRecords records, + Consumer consumer) { + return records; + } + + }); + + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + container.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0, 3))); + container.setBatchInterceptor(batchInterceptor); + container.start(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + + InOrder inOrder = inOrder(batchInterceptor, batchMessageListener, consumer); + for (int i = 0; i < 3; i++) { + inOrder.verify(batchInterceptor).intercept(eq(consumerRecords), eq(consumer)); + inOrder.verify(batchMessageListener).onMessage(eq(List.of(firstRecord, secondRecord))); + inOrder.verify(batchInterceptor).failure(eq(consumerRecords), any(), eq(consumer)); + } + inOrder.verify(batchInterceptor).intercept(eq(consumerRecords), eq(consumer)); + inOrder.verify(batchMessageListener).onMessage(eq(List.of(firstRecord, secondRecord))); + inOrder.verify(batchInterceptor).success(eq(consumerRecords), eq(consumer)); + container.stop(); + } + @Test public void testOffsetAndMetadataWithoutProvider() throws InterruptedException { testOffsetAndMetadata(null, new OffsetAndMetadata(1));