diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc index ad0724bc41..3128120512 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc @@ -724,6 +724,24 @@ public void listen(List> fooMessages) { } ---- +If record in the batch cannot be converted, its payload is set as `null` into the target `payloads` list. +The conversion exception is logged as warning for this record and also stored into a `KafkaHeaders.CONVERSION_FAILURES` header as an item of the `List`. +The target `@KafkaListener` method may perform Java `Stream` API to filter out those `null` values from the payload list or do something with the conversion exceptions header: + +[source, java] +---- +@KafkaListener(id = "foo", topics = "foo", autoStartup = "false") +public void listen(List list, + @Header(KafkaHeaders.CONVERSION_FAILURES) List conversionFailures) { + + for (int i = 0; i < list.size(); i++) { + if (conversionFailures.get(i) != null) { + throw new BatchListenerFailedException("Conversion Failed", conversionFailures.get(i), i); + } + } +} +---- + [[conversionservice-customization]] == `ConversionService` Customization diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 7808724d46..d33ed98b34 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -29,9 +29,11 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.Bytes; import org.springframework.core.log.LogAccessor; +import org.springframework.core.log.LogMessage; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; import org.springframework.kafka.support.JacksonPresent; @@ -54,7 +56,7 @@ *

* If a {@link RecordMessageConverter} is provided, and the batch type is a {@link ParameterizedType} * with a single generic type parameter, each record will be passed to the converter, thus supporting - * a method signature {@code List foos}. + * a method signature {@code List myObjects}. * * @author Marius Bogoevici * @author Gary Russell @@ -63,11 +65,13 @@ * @author Sanghyeok An * @author Hope Kim * @author Borahm Lee + * @author Artem Bilan + * * @since 1.1 */ public class BatchMessagingMessageConverter implements BatchMessageConverter { - protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR + protected final LogAccessor logger = new LogAccessor(getClass()); // NOSONAR @Nullable private final RecordMessageConverter recordConverter; @@ -102,7 +106,7 @@ public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordCon /** * Generate {@link Message} {@code ids} for produced messages. If set to {@code false}, - * will try to use a default value. By default set to {@code false}. + * will try to use a default value. By default, set to {@code false}. * @param generateMessageId true if a message id should be generated */ public void setGenerateMessageId(boolean generateMessageId) { @@ -111,7 +115,7 @@ public void setGenerateMessageId(boolean generateMessageId) { /** * Generate {@code timestamp} for produced messages. If set to {@code false}, -1 is - * used instead. By default set to {@code false}. + * used instead. By default, set to {@code false}. * @param generateTimestamp true if a timestamp should be generated */ public void setGenerateTimestamp(boolean generateTimestamp) { @@ -147,8 +151,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) { public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Type type) { - KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, - this.generateTimestamp); + KafkaMessageHeaders kafkaMessageHeaders = + new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); Map rawHeaders = kafkaMessageHeaders.getRawHeaders(); List payloads = new ArrayList<>(); @@ -169,16 +173,18 @@ public Message toMessage(List> records, @Nullable Acknow String listenerInfo = null; for (ConsumerRecord record : records) { - addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures); - if (this.headerMapper != null && record.headers() != null) { - Map converted = convertHeaders(record.headers(), convertedHeaders); + addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, + conversionFailures); + Headers recordHeaders = record.headers(); + if (this.headerMapper != null && recordHeaders != null) { + Map converted = convertHeaders(recordHeaders, convertedHeaders); Object obj = converted.get(KafkaHeaders.LISTENER_INFO); - if (obj instanceof String) { - listenerInfo = (String) obj; + if (obj instanceof String info) { + listenerInfo = info; } } else { - natives.add(record.headers()); + natives.add(recordHeaders); } if (this.rawRecordHeader) { raws.add(record); @@ -198,6 +204,7 @@ public Message toMessage(List> records, @Nullable Acknow private void addToRawHeaders(Map rawHeaders, List> convertedHeaders, List natives, List> raws, List conversionFailures) { + if (this.headerMapper != null) { rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders); } @@ -211,16 +218,18 @@ private void addToRawHeaders(Map rawHeaders, List record, Type type, List payloads, List keys, - List topics, List partitions, List offsets, List timestampTypes, - List timestamps, List conversionFailures) { + List topics, List partitions, List offsets, List timestampTypes, + List timestamps, List conversionFailures) { + payloads.add(obtainPayload(type, record, conversionFailures)); keys.add(record.key()); topics.add(record.topic()); partitions.add(record.partition()); offsets.add(record.offset()); timestamps.add(record.timestamp()); - if (record.timestampType() != null) { - timestampTypes.add(record.timestampType().name()); + TimestampType timestampType = record.timestampType(); + if (timestampType != null) { + timestampTypes.add(timestampType.name()); } } @@ -264,24 +273,29 @@ protected Object extractAndConvertValue(ConsumerRecord record, Type type) protected Object convert(ConsumerRecord record, Type type, List conversionFailures) { try { Object payload = this.recordConverter - .toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload(); + .toMessage(record, null, null, ((ParameterizedType) type).getActualTypeArguments()[0]).getPayload(); conversionFailures.add(null); return payload; } catch (ConversionException ex) { byte[] original = null; - if (record.value() instanceof byte[]) { - original = (byte[]) record.value(); + if (record.value() instanceof byte[] bytes) { + original = bytes; } - else if (record.value() instanceof Bytes) { - original = ((Bytes) record.value()).get(); + else if (record.value() instanceof Bytes bytes) { + original = bytes.get(); } - else if (record.value() instanceof String) { - original = ((String) record.value()).getBytes(StandardCharsets.UTF_8); + else if (record.value() instanceof String string) { + original = string.getBytes(StandardCharsets.UTF_8); } if (original != null) { SerializationUtils.deserializationException(record.headers(), original, ex, false); conversionFailures.add(ex); + logger.warn(ex, + LogMessage.format("Could not convert message for topic=%s, partition=%d, offset=%d", + record.topic(), + record.partition(), + record.offset())); return null; } throw new ConversionException("The batch converter can only report conversion failures to the listener " @@ -296,8 +310,8 @@ else if (record.value() instanceof String) { * @return true if the conditions are met. */ private boolean containerType(Type type) { - return type instanceof ParameterizedType - && ((ParameterizedType) type).getActualTypeArguments().length == 1; + return type instanceof ParameterizedType parameterizedType + && parameterizedType.getActualTypeArguments().length == 1; } }