Skip to content

Commit

Permalink
GH-3555: Improve docs for KafkaHeaders.CONVERSION_FAILURES
Browse files Browse the repository at this point in the history
Fixes: #3555
Issue link: #3555

The batch might be processed in the `@KafkaListener` silently without looking into conversion failures header.
So, that might cause in impression that nothing is wrong with the batch.

* Mention `KafkaHeaders.CONVERSION_FAILURES` in the docs
* Add `warn` for the failed conversion in the `BatchMessagingMessageConverter`
* Some other code optimization clean up in the `BatchMessagingMessageConverter`
  • Loading branch information
artembilan committed Nov 1, 2024
1 parent 07cff76 commit 4b8e8a4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,24 @@ public void listen(List<Message<Foo>> fooMessages) {
}
----

If record in the batch cannot be converted, its payload is set as `null` into the target `payloads` list.
The conversion exception is logged as warning for this record and also stored into a `KafkaHeaders.CONVERSION_FAILURES` header as an item of the `List<ConversionException>`.
The target `@KafkaListener` method may perform Java `Stream` API to filter out those `null` values from the payload list or do something with the conversion exceptions header:

[source, java]
----
@KafkaListener(id = "foo", topics = "foo", autoStartup = "false")
public void listen(List<Foo> list,
@Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> conversionFailures) {
for (int i = 0; i < list.size(); i++) {
if (conversionFailures.get(i) != null) {
throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i);
}
}
}
----

[[conversionservice-customization]]
== `ConversionService` Customization

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Bytes;

import org.springframework.core.log.LogAccessor;
import org.springframework.core.log.LogMessage;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.JacksonPresent;
Expand All @@ -54,7 +56,7 @@
* <p>
* If a {@link RecordMessageConverter} is provided, and the batch type is a {@link ParameterizedType}
* with a single generic type parameter, each record will be passed to the converter, thus supporting
* a method signature {@code List<Foo> foos}.
* a method signature {@code List<MyType> myObjects}.
*
* @author Marius Bogoevici
* @author Gary Russell
Expand All @@ -63,11 +65,13 @@
* @author Sanghyeok An
* @author Hope Kim
* @author Borahm Lee
* @author Artem Bilan
*
* @since 1.1
*/
public class BatchMessagingMessageConverter implements BatchMessageConverter {

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR

@Nullable
private final RecordMessageConverter recordConverter;
Expand Down Expand Up @@ -102,7 +106,7 @@ public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordCon

/**
* Generate {@link Message} {@code ids} for produced messages. If set to {@code false},
* will try to use a default value. By default set to {@code false}.
* will try to use a default value. By default, set to {@code false}.
* @param generateMessageId true if a message id should be generated
*/
public void setGenerateMessageId(boolean generateMessageId) {
Expand All @@ -111,7 +115,7 @@ public void setGenerateMessageId(boolean generateMessageId) {

/**
* Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is
* used instead. By default set to {@code false}.
* used instead. By default, set to {@code false}.
* @param generateTimestamp true if a timestamp should be generated
*/
public void setGenerateTimestamp(boolean generateTimestamp) {
Expand Down Expand Up @@ -147,8 +151,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) {
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type type) {

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
this.generateTimestamp);
KafkaMessageHeaders kafkaMessageHeaders =
new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp);

Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
List<Object> payloads = new ArrayList<>();
Expand All @@ -169,16 +173,18 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow

String listenerInfo = null;
for (ConsumerRecord<?, ?> record : records) {
addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures);
if (this.headerMapper != null && record.headers() != null) {
Map<String, Object> converted = convertHeaders(record.headers(), convertedHeaders);
addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps,
conversionFailures);
Headers recordHeaders = record.headers();
if (this.headerMapper != null && recordHeaders != null) {
Map<String, Object> converted = convertHeaders(recordHeaders, convertedHeaders);
Object obj = converted.get(KafkaHeaders.LISTENER_INFO);
if (obj instanceof String) {
listenerInfo = (String) obj;
if (obj instanceof String info) {
listenerInfo = info;
}
}
else {
natives.add(record.headers());
natives.add(recordHeaders);
}
if (this.rawRecordHeader) {
raws.add(record);
Expand All @@ -198,6 +204,7 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow

private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders,
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {

if (this.headerMapper != null) {
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
}
Expand All @@ -211,16 +218,18 @@ private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Ob
}

private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object> payloads, List<Object> keys,
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
List<Long> timestamps, List<ConversionException> conversionFailures) {
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
List<Long> timestamps, List<ConversionException> conversionFailures) {

payloads.add(obtainPayload(type, record, conversionFailures));
keys.add(record.key());
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());
timestamps.add(record.timestamp());
if (record.timestampType() != null) {
timestampTypes.add(record.timestampType().name());
TimestampType timestampType = record.timestampType();
if (timestampType != null) {
timestampTypes.add(timestampType.name());
}
}

Expand Down Expand Up @@ -264,24 +273,29 @@ protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type)
protected Object convert(ConsumerRecord<?, ?> record, Type type, List<ConversionException> conversionFailures) {
try {
Object payload = this.recordConverter
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
.toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload();
conversionFailures.add(null);
return payload;
}
catch (ConversionException ex) {
byte[] original = null;
if (record.value() instanceof byte[]) {
original = (byte[]) record.value();
if (record.value() instanceof byte[] bytes) {
original = bytes;
}
else if (record.value() instanceof Bytes) {
original = ((Bytes) record.value()).get();
else if (record.value() instanceof Bytes bytes) {
original = bytes.get();
}
else if (record.value() instanceof String) {
original = ((String) record.value()).getBytes(StandardCharsets.UTF_8);
else if (record.value() instanceof String string) {
original = string.getBytes(StandardCharsets.UTF_8);
}
if (original != null) {
SerializationUtils.deserializationException(record.headers(), original, ex, false);
conversionFailures.add(ex);
logger.warn(ex,
LogMessage.format("Could not convert message for topic=%s, partition=%d, offset=%d",
record.topic(),
record.partition(),
record.offset()));
return null;
}
throw new ConversionException("The batch converter can only report conversion failures to the listener "
Expand All @@ -296,8 +310,8 @@ else if (record.value() instanceof String) {
* @return true if the conditions are met.
*/
private boolean containerType(Type type) {
return type instanceof ParameterizedType
&& ((ParameterizedType) type).getActualTypeArguments().length == 1;
return type instanceof ParameterizedType parameterizedType
&& parameterizedType.getActualTypeArguments().length == 1;
}

}

0 comments on commit 4b8e8a4

Please sign in to comment.