From 8a6b76d010111541f7d2dd747c3a28ebaf47d8b1 Mon Sep 17 00:00:00 2001 From: aesperer Date: Wed, 16 Oct 2024 13:23:08 +0900 Subject: [PATCH 1/4] refactor: Use Stream API and Optional to improve record processing --- .../BatchMessagingMessageConverter.java | 86 +++++++++++-------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 9315ceac47..d2991b41cf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -19,10 +19,7 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; @@ -160,48 +157,65 @@ public Message toMessage(List> records, @Nullable Acknow List natives = new ArrayList<>(); List> raws = new ArrayList<>(); List conversionFailures = new ArrayList<>(); + addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures); commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes, timestamps); + records.forEach(record -> processRecord(record, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, + convertedHeaders, natives, raws, conversionFailures, rawHeaders, type)); + + return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); + } + + private void processRecord(ConsumerRecord record, List payloads, List keys, + List topics, List partitions, List offsets, + List timestampTypes, List timestamps, List> convertedHeaders, + List natives, List> raws, List conversionFailures, + Map rawHeaders, Type type) { + payloads.add(obtainPayload(type, record, conversionFailures)); + keys.add(record.key()); + topics.add(record.topic()); + partitions.add(record.partition()); + offsets.add(record.offset()); + + if (record.timestampType() != null) { + timestampTypes.add(record.timestampType().name()); + } + timestamps.add(record.timestamp()); + boolean logged = false; String info = null; - for (ConsumerRecord record : records) { - payloads.add(obtainPayload(type, record, conversionFailures)); - keys.add(record.key()); - topics.add(record.topic()); - partitions.add(record.partition()); - offsets.add(record.offset()); - if (record.timestampType() != null) { - timestampTypes.add(record.timestampType().name()); - } - timestamps.add(record.timestamp()); - if (this.headerMapper != null && record.headers() != null) { - Map converted = new HashMap<>(); - this.headerMapper.toHeaders(record.headers(), converted); - convertedHeaders.add(converted); - Object object = converted.get(KafkaHeaders.LISTENER_INFO); - if (object instanceof String) { - info = (String) object; - } - } - else { - if (!logged) { - this.logger.debug(() -> - "No header mapper is available; Jackson is required for the default mapper; " - + "headers (if present) are not mapped but provided raw in " - + KafkaHeaders.NATIVE_HEADERS); - logged = true; - } - natives.add(record.headers()); - } - if (this.rawRecordHeader) { - raws.add(record); + + if (this.headerMapper != null && record.headers() != null) { + Map converted = new HashMap<>(); + this.headerMapper.toHeaders(record.headers(), converted); + convertedHeaders.add(converted); + Object object = converted.get(KafkaHeaders.LISTENER_INFO); + info = Optional.ofNullable(object) + .filter(String.class::isInstance) + .map(String.class::cast) + .orElse(null); + } + else { + if (!logged) { + logHeaderWarningOnce(); + logged = true; } + natives.add(record.headers()); + } + if (this.rawRecordHeader) { + raws.add(record); } if (info != null) { rawHeaders.put(KafkaHeaders.LISTENER_INFO, info); } - return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); + } + + private void logHeaderWarningOnce() { + this.logger.debug(() -> + "No header mapper is available; Jackson is required for the default mapper; " + + "headers (if present) are not mapped but provided raw in " + + KafkaHeaders.NATIVE_HEADERS); } private void addToRawHeaders(Map rawHeaders, List> convertedHeaders, From 5c3050a80cff40de0793680714366795665d0e4f Mon Sep 17 00:00:00 2001 From: aesperer Date: Wed, 16 Oct 2024 13:32:00 +0900 Subject: [PATCH 2/4] refactor: Meets the standards of eckstyle --- .../converter/BatchMessagingMessageConverter.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index d2991b41cf..6120b24902 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -19,7 +19,11 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.HashMap; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; @@ -28,6 +32,7 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Bytes; +import org.jetbrains.annotations.NotNull; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; @@ -139,8 +144,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) { } @Override // NOSONAR - public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, - Consumer consumer, Type type) { + public @NotNull Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, + Consumer consumer, Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -163,7 +168,6 @@ public Message toMessage(List> records, @Nullable Acknow timestamps); records.forEach(record -> processRecord(record, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, convertedHeaders, natives, raws, conversionFailures, rawHeaders, type)); - return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); } From de78ab6447b72c4233e04c87abfa6e48302f4f77 Mon Sep 17 00:00:00 2001 From: aesperer Date: Wed, 16 Oct 2024 13:40:33 +0900 Subject: [PATCH 3/4] refactor: Meets the standards of eckstyle --- .../BatchMessagingMessageConverter.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 6120b24902..ba3d4822bd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -19,11 +19,11 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.ArrayList; import java.util.HashMap; +import java.util.Optional; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; @@ -32,7 +32,6 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.utils.Bytes; -import org.jetbrains.annotations.NotNull; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; @@ -144,8 +143,7 @@ public void setRawRecordHeader(boolean rawRecordHeader) { } @Override // NOSONAR - public @NotNull Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, - Consumer consumer, Type type) { + public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Type type) { KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId, this.generateTimestamp); @@ -172,10 +170,10 @@ public void setRawRecordHeader(boolean rawRecordHeader) { } private void processRecord(ConsumerRecord record, List payloads, List keys, - List topics, List partitions, List offsets, - List timestampTypes, List timestamps, List> convertedHeaders, - List natives, List> raws, List conversionFailures, - Map rawHeaders, Type type) { + List topics, List partitions, List offsets, + List timestampTypes, List timestamps, List> convertedHeaders, + List natives, List> raws, List conversionFailures, + Map rawHeaders, Type type) { payloads.add(obtainPayload(type, record, conversionFailures)); keys.add(record.key()); topics.add(record.topic()); From 2c0f911b91e27f999ade819e647e5b82fdfb5bca Mon Sep 17 00:00:00 2001 From: aesperer Date: Wed, 16 Oct 2024 13:59:06 +0900 Subject: [PATCH 4/4] refactor: Meets the standards of eckstyle --- .../support/converter/BatchMessagingMessageConverter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index ba3d4822bd..31ea66dc07 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -19,10 +19,10 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.commons.logging.LogFactory;