From 769588906f0d8a5caf51976627484b50eb0536e3 Mon Sep 17 00:00:00 2001 From: Nathan Xu Date: Tue, 12 Dec 2023 23:34:19 -0500 Subject: [PATCH] reuse RecordHeader in AggregatingReplyingKafkaTemplate --- .../requestreply/AggregatingReplyingKafkaTemplate.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java index 585fe4f687..fc6a1ea9f5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java @@ -34,7 +34,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.internals.RecordHeader; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.BatchConsumerAwareMessageListener; @@ -148,11 +147,7 @@ public void onMessage(List>>> if (this.releaseStrategy.test(list, false)) { ConsumerRecord>> done = new ConsumerRecord<>(AGGREGATED_RESULTS_TOPIC, 0, 0L, null, list); - done.headers() - .add(new RecordHeader(correlationHeaderName, - isBinaryCorrelation() - ? ((CorrelationKey) correlationId).getCorrelationId() - : ((String) correlationId).getBytes(StandardCharsets.UTF_8))); + done.headers().add(correlation); this.pending.remove(correlationId); checkOffsetsAndCommitIfNecessary(list, consumer); completed.add(done);