Skip to content

Commit

Permalink
Improvements in BatchMessagingMessageConverter#toMessage()
Browse files Browse the repository at this point in the history
* refactor: Use Stream API and Optional to improve record processing
  • Loading branch information
esperar authored Oct 21, 2024
1 parent fad2efc commit 6f79247
Showing 1 changed file with 50 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -142,8 +143,7 @@ public void setRawRecordHeader(boolean rawRecordHeader) {
}

@Override // NOSONAR
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type type) {
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer, Type type) {

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
this.generateTimestamp);
Expand All @@ -160,48 +160,64 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
List<Headers> natives = new ArrayList<>();
List<ConsumerRecord<?, ?>> raws = new ArrayList<>();
List<ConversionException> conversionFailures = new ArrayList<>();

addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures);
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
timestamps);
records.forEach(record -> processRecord(record, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps,
convertedHeaders, natives, raws, conversionFailures, rawHeaders, type));
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
}

private void processRecord(ConsumerRecord<?, ?> record, List<Object> payloads, List<Object> keys,
List<String> topics, List<Integer> partitions, List<Long> offsets,
List<String> timestampTypes, List<Long> timestamps, List<Map<String, Object>> convertedHeaders,
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures,
Map<String, Object> rawHeaders, Type type) {
payloads.add(obtainPayload(type, record, conversionFailures));
keys.add(record.key());
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());

if (record.timestampType() != null) {
timestampTypes.add(record.timestampType().name());
}
timestamps.add(record.timestamp());

boolean logged = false;
String info = null;
for (ConsumerRecord<?, ?> record : records) {
payloads.add(obtainPayload(type, record, conversionFailures));
keys.add(record.key());
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());
if (record.timestampType() != null) {
timestampTypes.add(record.timestampType().name());
}
timestamps.add(record.timestamp());
if (this.headerMapper != null && record.headers() != null) {
Map<String, Object> converted = new HashMap<>();
this.headerMapper.toHeaders(record.headers(), converted);
convertedHeaders.add(converted);
Object object = converted.get(KafkaHeaders.LISTENER_INFO);
if (object instanceof String) {
info = (String) object;
}
}
else {
if (!logged) {
this.logger.debug(() ->
"No header mapper is available; Jackson is required for the default mapper; "
+ "headers (if present) are not mapped but provided raw in "
+ KafkaHeaders.NATIVE_HEADERS);
logged = true;
}
natives.add(record.headers());
}
if (this.rawRecordHeader) {
raws.add(record);

if (this.headerMapper != null && record.headers() != null) {
Map<String, Object> converted = new HashMap<>();
this.headerMapper.toHeaders(record.headers(), converted);
convertedHeaders.add(converted);
Object object = converted.get(KafkaHeaders.LISTENER_INFO);
info = Optional.ofNullable(object)
.filter(String.class::isInstance)
.map(String.class::cast)
.orElse(null);
}
else {
if (!logged) {
logHeaderWarningOnce();
logged = true;
}
natives.add(record.headers());
}
if (this.rawRecordHeader) {
raws.add(record);
}
if (info != null) {
rawHeaders.put(KafkaHeaders.LISTENER_INFO, info);
}
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
}

private void logHeaderWarningOnce() {
this.logger.debug(() ->
"No header mapper is available; Jackson is required for the default mapper; "
+ "headers (if present) are not mapped but provided raw in "
+ KafkaHeaders.NATIVE_HEADERS);
}

private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders,
Expand Down

0 comments on commit 6f79247

Please sign in to comment.