From 5fb1562d95b4b9ff816e56fee49d99293d5a3acb Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Fri, 25 Oct 2024 12:00:42 +0900 Subject: [PATCH] GH-3589: Refactor `toMessage` batch method and fix logging/header issues Remove unnecessary check Fix checkstyle violation Change log level from WARN to DEBUG Refactor headers conversion method Add natives condition for logging when no header mapper --- .../BatchMessagingMessageConverter.java | 101 +++++++++--------- 1 file changed, 48 insertions(+), 53 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index a22fcfd8b4..7808724d46 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; @@ -63,6 +62,7 @@ * @author Biju Kunjummen * @author Sanghyeok An * @author Hope Kim + * @author Borahm Lee * @since 1.1 */ public class BatchMessagingMessageConverter implements BatchMessageConverter { @@ -93,7 +93,7 @@ public BatchMessagingMessageConverter() { * @param recordConverter the converter. * @since 1.3.2 */ - public BatchMessagingMessageConverter(RecordMessageConverter recordConverter) { + public BatchMessagingMessageConverter(@Nullable RecordMessageConverter recordConverter) { this.recordConverter = recordConverter; if (JacksonPresent.isJackson2Present()) { this.headerMapper = new DefaultKafkaHeaderMapper(); @@ -144,7 +144,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) { } @Override // NOSONAR - public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Type type) { + public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, + Consumer consumer, Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -165,65 +166,38 @@ public Message toMessage(List> records, @Nullable Acknow addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures); commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes, timestamps); - records.forEach(record -> processRecord(record, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, - convertedHeaders, natives, raws, conversionFailures, rawHeaders, type)); - return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); - } - - private void processRecord(ConsumerRecord record, List payloads, List keys, - List topics, List partitions, List offsets, - List timestampTypes, List timestamps, List> convertedHeaders, - List natives, List> raws, List conversionFailures, - Map rawHeaders, Type type) { - payloads.add(obtainPayload(type, record, conversionFailures)); - keys.add(record.key()); - topics.add(record.topic()); - partitions.add(record.partition()); - offsets.add(record.offset()); - - if (record.timestampType() != null) { - timestampTypes.add(record.timestampType().name()); - } - timestamps.add(record.timestamp()); - boolean logged = false; - String info = null; - - if (this.headerMapper != null && record.headers() != null) { - Map converted = new HashMap<>(); - this.headerMapper.toHeaders(record.headers(), converted); - convertedHeaders.add(converted); - Object object = converted.get(KafkaHeaders.LISTENER_INFO); - info = Optional.ofNullable(object) - .filter(String.class::isInstance) - .map(String.class::cast) - .orElse(null); - } - else { - if (!logged) { - logHeaderWarningOnce(); - logged = true; + String listenerInfo = null; + for (ConsumerRecord record : records) { + addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures); + if (this.headerMapper != null && record.headers() != null) { + Map converted = convertHeaders(record.headers(), convertedHeaders); + Object obj = converted.get(KafkaHeaders.LISTENER_INFO); + if (obj instanceof String) { + listenerInfo = (String) obj; + } + } + else { + natives.add(record.headers()); + } + if (this.rawRecordHeader) { + raws.add(record); } - natives.add(record.headers()); } - if (this.rawRecordHeader) { - raws.add(record); + if (this.headerMapper == null && !natives.isEmpty()) { + this.logger.debug(() -> + "No header mapper is available; Jackson is required for the default mapper; " + + "headers (if present) are not mapped but provided raw in " + + KafkaHeaders.NATIVE_HEADERS); } - if (info != null) { - rawHeaders.put(KafkaHeaders.LISTENER_INFO, info); + if (listenerInfo != null) { + rawHeaders.put(KafkaHeaders.LISTENER_INFO, listenerInfo); } - } - - private void logHeaderWarningOnce() { - this.logger.debug(() -> - "No header mapper is available; Jackson is required for the default mapper; " - + "headers (if present) are not mapped but provided raw in " - + KafkaHeaders.NATIVE_HEADERS); + return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); } private void addToRawHeaders(Map rawHeaders, List> convertedHeaders, List natives, List> raws, List conversionFailures) { - if (this.headerMapper != null) { rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders); } @@ -236,12 +210,33 @@ private void addToRawHeaders(Map rawHeaders, List record, Type type, List payloads, List keys, + List topics, List partitions, List offsets, List timestampTypes, + List timestamps, List conversionFailures) { + payloads.add(obtainPayload(type, record, conversionFailures)); + keys.add(record.key()); + topics.add(record.topic()); + partitions.add(record.partition()); + offsets.add(record.offset()); + timestamps.add(record.timestamp()); + if (record.timestampType() != null) { + timestampTypes.add(record.timestampType().name()); + } + } + private Object obtainPayload(Type type, ConsumerRecord record, List conversionFailures) { return this.recordConverter == null || !containerType(type) ? extractAndConvertValue(record, type) : convert(record, type, conversionFailures); } + private Map convertHeaders(Headers headers, List> convertedHeaders) { + Map converted = new HashMap<>(); + this.headerMapper.toHeaders(headers, converted); + convertedHeaders.add(converted); + return converted; + } + @Override public List> fromMessage(Message message, String defaultTopic) { throw new UnsupportedOperationException();