Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3407 : Support KafkaHeaders.DELIVERY_ATTEMP for batch listeners. #3539

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,27 @@ It is disabled by default to avoid the (small) overhead of looking up the state

The `DefaultErrorHandler` and `DefaultAfterRollbackProcessor` support this feature.

[[delivery-attempts-header-for-batch-listener]]
== Delivery Attempts Header for batch listener

When processing `ConsumerRecord` with the `BatchListener`, the `KafkaHeaders.DELIVERY_ATTEMPT` header can be present in a different way compared to `SingleRecordListener`.

Starting with version 3.3, if you want to inject the `KafkaHeaders.DELIVERY_ATTEMPT` header into the `ConsumerRecord` when using the `BatchListener`, set the `DeliveryAttemptAwareRetryListener` as the `RetryListener` in the `ErrorHandler`.

Please refer to the code below.
[source, java]
----
final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
----

Then, whenever a batch fails to complete, the `DeliveryAttemptAwareRetryListener` will inject a `KafkaHeaders.DELIVERY_ATTMPT` header into the `ConsumerRecord`.

[[li-header]]
== Listener Info Header

Expand Down Expand Up @@ -796,4 +817,3 @@ DefaultErrorHandler handler() {
----

This will retry after `1, 2, 4, 8, 10, 10` seconds, before calling the recoverer.

Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ When extending `KafkaAdmin`, user applications may override the `createAdmin` me

When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method.

[[x33-kafka-headers-for-batch-listeners]]
=== KafkaHeaders.DELIVERY_ATTEMPT for batch listeners
When using a `BatchListener`, the `ConsumerRecord` can have the `KafkaHeaders.DELIVERY_ATTMPT` header in its headers fields.
If the `DeliveryAttemptAwareRetryListener` is set to error handler as retry listener, each `ConsumerRecord` has delivery attempt header.
For more details, see xref:kafka/annotation-error-handling.adoc#delivery-attempts-header-for-batch-listener[kafka-headers-for-batch-listener].
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.nio.ByteBuffer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.kafka.support.KafkaHeaders;

/**
* The DeliveryAttemptAwareRetryListener class for {@link RetryListener} implementations.
* The DeliveryAttemptAwareRetryListener adds the {@link KafkaHeaders}.DELIVERY_ATTEMPT header
* to the record's headers when batch records fail and are retried.
* Note that DeliveryAttemptAwareRetryListener modifies the headers of the original record.
*
* @author Sanghyeok An
* @since 3.3
*/

public class DeliveryAttemptAwareRetryListener implements RetryListener {

@Override
public void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt) {
// Pass
}

/**
* Invoke after delivery failure for batch records.
* If the {@link KafkaHeaders}.DELIVERY_ATTEMPT header already exists in the {@link ConsumerRecord}'s headers,
* it will be removed. Then, the provided `deliveryAttempt` is added to the {@link ConsumerRecord}'s headers.
* @param records the records.
* @param ex the exception.
* @param deliveryAttempt the delivery attempt, if available.
*/
@Override
public void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
for (ConsumerRecord<?, ?> record : records) {
record.headers().remove(KafkaHeaders.DELIVERY_ATTEMPT);

byte[] buff = new byte[4]; // NOSONAR (magic #)
ByteBuffer bb = ByteBuffer.wrap(buff);
bb.putInt(deliveryAttempt);
record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.backoff.FixedBackOff;

/**
* @author Sanghyeok An
* @since 3.3.0
*/

@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka
class DeliveryAttemptAwareRetryListenerIntegrationTests {

static final String MAIN_TOPIC_CONTAINER_FACTORY0 = "deliveryMyTestKafkaListenerContainerFactory0";

static final String TEST_TOPIC0 = "myBatchDeliveryAttemptTopic0";

static final int MAX_ATTEMPT_COUNT0 = 3;

static final CountDownLatch latch0 = new CountDownLatch(MAX_ATTEMPT_COUNT0 + 1);

static final String MAIN_TOPIC_CONTAINER_FACTORY1 = "deliveryMyTestKafkaListenerContainerFactory1";

static final String TEST_TOPIC1 = "myBatchDeliveryAttemptTopic1";

static final int MAX_ATTEMPT_COUNT1 = 10;

static final CountDownLatch latch1 = new CountDownLatch(MAX_ATTEMPT_COUNT1 + 1);

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Test
void should_have_delivery_attempt_header_in_each_consumer_record(@Autowired TestTopicListener0 listener) {

// Given
String msg1 = "1";
String msg2 = "2";
String msg3 = "3";

// When
kafkaTemplate.send(TEST_TOPIC0, msg1);
kafkaTemplate.send(TEST_TOPIC0, msg2);
kafkaTemplate.send(TEST_TOPIC0, msg3);

// Then
assertThat(awaitLatch(latch0)).isTrue();

Map<Integer, Integer> deliveryAttemptCountMap = convertToMap(listener.receivedHeaders);

for (int attemptCnt = 1; attemptCnt <= MAX_ATTEMPT_COUNT0; attemptCnt++) {
assertThat(deliveryAttemptCountMap.get(attemptCnt)).isGreaterThan(0);
}
}

@Test
void should_have_delivery_attempt_header_in_each_consumer_record_with_more_bigger_max_attempt(@Autowired TestTopicListener1 listener) {
// Given
String msg1 = "1";
String msg2 = "2";
String msg3 = "3";

// When
kafkaTemplate.send(TEST_TOPIC1, msg1);
kafkaTemplate.send(TEST_TOPIC1, msg2);
kafkaTemplate.send(TEST_TOPIC1, msg3);

// Then
assertThat(awaitLatch(latch1)).isTrue();

Map<Integer, Integer> deliveryAttemptCountMap = convertToMap(listener.receivedHeaders);

for (int attemptCnt = 1; attemptCnt <= MAX_ATTEMPT_COUNT1; attemptCnt++) {
assertThat(deliveryAttemptCountMap.get(attemptCnt)).isGreaterThan(0);
}
}

private Map<Integer, Integer> convertToMap(List<Header> headers) {
Map<Integer, Integer> map = new HashMap<>();
for (Header header : headers) {
int attemptCount = ByteBuffer.wrap(header.value()).getInt();
Integer cnt = map.getOrDefault(attemptCount, 0);
map.put(attemptCount, cnt + 1);
}
return map;
}

private boolean awaitLatch(CountDownLatch latch) {
try {
return latch.await(60, TimeUnit.SECONDS);
}
catch (Exception e) {
fail(e.getMessage());
throw new RuntimeException(e);
}
}

private static CommonErrorHandler createErrorHandler(int interval, int maxAttemptCount) {
final FixedBackOff fixedBackOff = new FixedBackOff(interval, maxAttemptCount);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
return errorHandler;
}

private static ConcurrentKafkaListenerContainerFactory<String, String> createListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory, CommonErrorHandler errorHandler) {

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);

final ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setDeliveryAttemptHeader(true);
return factory;
}

static class TestTopicListener0 {
final List<Header> receivedHeaders = new ArrayList<>();

@KafkaListener(
topics = TEST_TOPIC0,
containerFactory = MAIN_TOPIC_CONTAINER_FACTORY0,
batch = "true")
public void listen(List<ConsumerRecord<?, ?>> records) {
for (ConsumerRecord<?, ?> record : records) {
Iterable<Header> headers = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT);
for (Header header : headers) {
receivedHeaders.add(header);
}
}
latch0.countDown();
throw new RuntimeException("Failed.");
}
}

static class TestTopicListener1 {
final List<Header> receivedHeaders = new ArrayList<>();

@KafkaListener(
topics = TEST_TOPIC1,
containerFactory = MAIN_TOPIC_CONTAINER_FACTORY1,
batch = "true")
public void listen(List<ConsumerRecord<?, ?>> records) {
for (ConsumerRecord<?, ?> record : records) {
Iterable<Header> headers = record.headers().headers(KafkaHeaders.DELIVERY_ATTEMPT);
for (Header header : headers) {
receivedHeaders.add(header);
}
}
latch1.countDown();
throw new RuntimeException("Failed.");
}
}

@Configuration
static class TestConfiguration {

@Bean
TestTopicListener0 testTopicListener0() {
return new TestTopicListener0();
}

@Bean
TestTopicListener1 testTopicListener1() {
return new TestTopicListener1();
}
}

@Configuration
static class KafkaProducerConfig {

@Autowired
EmbeddedKafkaBroker broker;

@Bean
ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = KafkaTestUtils.producerProps(
this.broker.getBrokersAsString());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
artembilan marked this conversation as resolved.
Show resolved Hide resolved
return new DefaultKafkaProducerFactory<>(props);
}

@Bean("customKafkaTemplate")
KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}

@EnableKafka
@Configuration
static class KafkaConsumerConfig {

@Autowired
EmbeddedKafkaBroker broker;

@Bean
ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = KafkaTestUtils.consumerProps(
this.broker.getBrokersAsString(),
"DeliveryAttemptAwareRetryListenerIntegrationTestsGroupId",
"true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
artembilan marked this conversation as resolved.
Show resolved Hide resolved

return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
deliveryMyTestKafkaListenerContainerFactory0(ConsumerFactory<String, String> consumerFactory) {
CommonErrorHandler errorHandler = createErrorHandler(1, MAX_ATTEMPT_COUNT0);
return createListenerContainerFactory(consumerFactory, errorHandler);
}

@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
deliveryMyTestKafkaListenerContainerFactory1(ConsumerFactory<String, String> consumerFactory) {
CommonErrorHandler errorHandler = createErrorHandler(1, MAX_ATTEMPT_COUNT1);
return createListenerContainerFactory(consumerFactory, errorHandler);
}

}

}
Loading