diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 9315ceac47..31ea66dc07 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; @@ -142,8 +143,7 @@ public void setRawRecordHeader(boolean rawRecordHeader) { } @Override // NOSONAR - public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, - Consumer consumer, Type type) { + public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -160,48 +160,64 @@ public Message toMessage(List> records, @Nullable Acknow List natives = new ArrayList<>(); List> raws = new ArrayList<>(); List conversionFailures = new ArrayList<>(); + addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures); commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes, timestamps); + records.forEach(record -> processRecord(record, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, + convertedHeaders, natives, raws, conversionFailures, rawHeaders, type)); + return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); + } + + private void processRecord(ConsumerRecord record, List payloads, List keys, + List topics, List partitions, List offsets, + List timestampTypes, List timestamps, List> convertedHeaders, + List natives, List> raws, List conversionFailures, + Map rawHeaders, Type type) { + payloads.add(obtainPayload(type, record, conversionFailures)); + keys.add(record.key()); + topics.add(record.topic()); + partitions.add(record.partition()); + offsets.add(record.offset()); + + if (record.timestampType() != null) { + timestampTypes.add(record.timestampType().name()); + } + timestamps.add(record.timestamp()); + boolean logged = false; String info = null; - for (ConsumerRecord record : records) { - payloads.add(obtainPayload(type, record, conversionFailures)); - keys.add(record.key()); - topics.add(record.topic()); - partitions.add(record.partition()); - offsets.add(record.offset()); - if (record.timestampType() != null) { - timestampTypes.add(record.timestampType().name()); - } - timestamps.add(record.timestamp()); - if (this.headerMapper != null && record.headers() != null) { - Map converted = new HashMap<>(); - this.headerMapper.toHeaders(record.headers(), converted); - convertedHeaders.add(converted); - Object object = converted.get(KafkaHeaders.LISTENER_INFO); - if (object instanceof String) { - info = (String) object; - } - } - else { - if (!logged) { - this.logger.debug(() -> - "No header mapper is available; Jackson is required for the default mapper; " - + "headers (if present) are not mapped but provided raw in " - + KafkaHeaders.NATIVE_HEADERS); - logged = true; - } - natives.add(record.headers()); - } - if (this.rawRecordHeader) { - raws.add(record); + + if (this.headerMapper != null && record.headers() != null) { + Map converted = new HashMap<>(); + this.headerMapper.toHeaders(record.headers(), converted); + convertedHeaders.add(converted); + Object object = converted.get(KafkaHeaders.LISTENER_INFO); + info = Optional.ofNullable(object) + .filter(String.class::isInstance) + .map(String.class::cast) + .orElse(null); + } + else { + if (!logged) { + logHeaderWarningOnce(); + logged = true; } + natives.add(record.headers()); + } + if (this.rawRecordHeader) { + raws.add(record); } if (info != null) { rawHeaders.put(KafkaHeaders.LISTENER_INFO, info); } - return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); + } + + private void logHeaderWarningOnce() { + this.logger.debug(() -> + "No header mapper is available; Jackson is required for the default mapper; " + + "headers (if present) are not mapped but provided raw in " + + KafkaHeaders.NATIVE_HEADERS); } private void addToRawHeaders(Map rawHeaders, List> convertedHeaders,