Skip to content

Commit

Permalink
Reuse Header in AggregatingReplyingKafkaTemplate
Browse files Browse the repository at this point in the history
First of all we extract `Header correlation = record.headers().lastHeader(correlationHeaderName);`.

Then we convert its value back for a new header we add back to the aggregated records in the list.
Therefore no need in extra object and conversion logic.
  • Loading branch information
NathanQingyangXu authored Dec 13, 2023
1 parent a98a7b8 commit 7259e5e
Showing 1 changed file with 1 addition and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.BatchConsumerAwareMessageListener;
Expand Down Expand Up @@ -148,11 +147,7 @@ public void onMessage(List<ConsumerRecord<K, Collection<ConsumerRecord<K, R>>>>
if (this.releaseStrategy.test(list, false)) {
ConsumerRecord<K, Collection<ConsumerRecord<K, R>>> done =
new ConsumerRecord<>(AGGREGATED_RESULTS_TOPIC, 0, 0L, null, list);
done.headers()
.add(new RecordHeader(correlationHeaderName,
isBinaryCorrelation()
? ((CorrelationKey) correlationId).getCorrelationId()
: ((String) correlationId).getBytes(StandardCharsets.UTF_8)));
done.headers().add(correlation);
this.pending.remove(correlationId);
checkOffsetsAndCommitIfNecessary(list, consumer);
completed.add(done);
Expand Down

0 comments on commit 7259e5e

Please sign in to comment.