Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-3407 : Support KafkaHeaders.DELIVERY_ATTEMP for batch listeners. #3539

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[[kafka-headers-for-batch-listener]]
artembilan marked this conversation as resolved.
Show resolved Hide resolved
= Kafka Headers for batch listener

When processing `ConsumerRecord` with the `BatchListener`, the `KafkaHeaders.DELIVERY_ATTEMPT` header can be present in a different way compared to `SingleRecordListener`.

To inject the `KafkaHeaders.DELIVERY_ATTEMPT` header into `ConsumerRecord` when using the `BatchListener`, set the `DeliveryAttemptAwareRetryListener` as the `RetryListener` in the `ErrorHandler`.

Please refer to the code below.
[source, java]
----
final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
----
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,9 @@ When extending `KafkaAdmin`, user applications may override the `createAdmin` me

When using `KafkaStreamsCustomizer` it is now possible to return a custom implementation of the `KafkaStreams` object by overriding the `initKafkaStreams` method.

[[x33-kafka-headers-for-batch-listeners]]
=== KafkaHeaders.DELIVERY_ATTEMPT for batch listeners
When using a `BatchListener`, the `ConsumerRecord` can have the `KafkaHeaders.DELIVERY_ATTMPT` header in its headers fields.
If the `DeliveryAttemptAwareRetryListener` is set to error handler as retry listener, each `ConsumerRecord` has delivery attempt header.
For more details, see xref:retrytopic/kafka-headers-for-batch-listener.adoc[kafka-headers-for-batch-listener].
artembilan marked this conversation as resolved.
Show resolved Hide resolved

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import java.nio.ByteBuffer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.kafka.support.KafkaHeaders;

/**
* The DeliveryAttemptAwareRetryListener class for {@link RetryListener} implementations.
* The DeliveryAttemptAwareRetryListener adds the {@link KafkaHeaders}.DELIVERY_ATTEMPT header
* to the record's headers when batch records fail and are retried.
* Note that DeliveryAttemptAwareRetryListener modifies the headers of the original record.
*
* @author Sanghyeok An
* @since 3.3
* @see KafkaConsumerBackoffManager
artembilan marked this conversation as resolved.
Show resolved Hide resolved
*/

public class DeliveryAttemptAwareRetryListener implements RetryListener {

@Override
public void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt) {
// Pass
}

/**
* Invoke after delivery failure for batch records.
* If the {@link KafkaHeaders}.DELIVERY_ATTEMPT header already exists in the {@link ConsumerRecord}'s headers,
* it will be removed. Then, the provided `deliveryAttempt` is added to the {@link ConsumerRecord}'s headers.
* @param records the records.
* @param ex the exception.
* @param deliveryAttempt the delivery attempt, if available.
*/
@Override
public void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
for (ConsumerRecord<?, ?> record : records) {
record.headers().remove(KafkaHeaders.DELIVERY_ATTEMPT);

byte[] buff = new byte[4]; // NOSONAR (magic #)
ByteBuffer bb = ByteBuffer.wrap(buff);
bb.putInt(deliveryAttempt);
record.headers().add(new RecordHeader(KafkaHeaders.DELIVERY_ATTEMPT, buff));
}
}

}
Loading