Skip to content

Commit

Permalink
GH-3589: Refactor toMessage batch method and fix logging/header issues
Browse files Browse the repository at this point in the history
  • Loading branch information
bky373 committed Oct 25, 2024
1 parent 7a26b9b commit c0e823b
Showing 1 changed file with 47 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -63,6 +62,7 @@
* @author Biju Kunjummen
* @author Sanghyeok An
* @author Hope Kim
* @author Borahm Lee
* @since 1.1
*/
public class BatchMessagingMessageConverter implements BatchMessageConverter {
Expand Down Expand Up @@ -144,7 +144,8 @@ public void setRawRecordHeader(boolean rawRecordHeader) {
}

@Override // NOSONAR
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer, Type type) {
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type type) {

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(this.generateMessageId,
this.generateTimestamp);
Expand All @@ -165,65 +166,39 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
addToRawHeaders(rawHeaders, convertedHeaders, natives, raws, conversionFailures);
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
timestamps);
records.forEach(record -> processRecord(record, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps,
convertedHeaders, natives, raws, conversionFailures, rawHeaders, type));
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
}

private void processRecord(ConsumerRecord<?, ?> record, List<Object> payloads, List<Object> keys,
List<String> topics, List<Integer> partitions, List<Long> offsets,
List<String> timestampTypes, List<Long> timestamps, List<Map<String, Object>> convertedHeaders,
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures,
Map<String, Object> rawHeaders, Type type) {
payloads.add(obtainPayload(type, record, conversionFailures));
keys.add(record.key());
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());

if (record.timestampType() != null) {
timestampTypes.add(record.timestampType().name());
}
timestamps.add(record.timestamp());

boolean logged = false;
String info = null;

if (this.headerMapper != null && record.headers() != null) {
Map<String, Object> converted = new HashMap<>();
this.headerMapper.toHeaders(record.headers(), converted);
convertedHeaders.add(converted);
Object object = converted.get(KafkaHeaders.LISTENER_INFO);
info = Optional.ofNullable(object)
.filter(String.class::isInstance)
.map(String.class::cast)
.orElse(null);
}
else {
if (!logged) {
logHeaderWarningOnce();
logged = true;
String listenerInfo = null;
for (ConsumerRecord<?, ?> record : records) {
addRecordInfo(record, type, payloads, keys, topics, partitions, offsets, timestampTypes, timestamps, conversionFailures);
if (this.headerMapper != null && record.headers() != null) {
addToConvertedHeaders(record.headers(), convertedHeaders);
if (!convertedHeaders.isEmpty()) {
Object obj = convertedHeaders.get(convertedHeaders.size() - 1).get(KafkaHeaders.LISTENER_INFO);
if (obj != null && obj instanceof String) {
listenerInfo = (String) obj;
}
}
} else {
natives.add(record.headers());
}
if (this.rawRecordHeader) {
raws.add(record);
}
natives.add(record.headers());
}
if (this.rawRecordHeader) {
raws.add(record);
if (this.headerMapper == null) {
this.logger.warn(() ->
"No header mapper is available; Jackson is required for the default mapper; "
+ "headers (if present) are not mapped but provided raw in "
+ KafkaHeaders.NATIVE_HEADERS);
}
if (info != null) {
rawHeaders.put(KafkaHeaders.LISTENER_INFO, info);
if (listenerInfo != null) {
rawHeaders.put(KafkaHeaders.LISTENER_INFO, listenerInfo);
}
}

private void logHeaderWarningOnce() {
this.logger.debug(() ->
"No header mapper is available; Jackson is required for the default mapper; "
+ "headers (if present) are not mapped but provided raw in "
+ KafkaHeaders.NATIVE_HEADERS);
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
}

private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Object>> convertedHeaders,
List<Headers> natives, List<ConsumerRecord<?, ?>> raws, List<ConversionException> conversionFailures) {

if (this.headerMapper != null) {
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
}
Expand All @@ -236,12 +211,32 @@ private void addToRawHeaders(Map<String, Object> rawHeaders, List<Map<String, Ob
rawHeaders.put(KafkaHeaders.CONVERSION_FAILURES, conversionFailures);
}

private void addRecordInfo(ConsumerRecord<?, ?> record, Type type, List<Object> payloads, List<Object> keys,
List<String> topics, List<Integer> partitions, List<Long> offsets, List<String> timestampTypes,
List<Long> timestamps, List<ConversionException> conversionFailures) {
payloads.add(obtainPayload(type, record, conversionFailures));
keys.add(record.key());
topics.add(record.topic());
partitions.add(record.partition());
offsets.add(record.offset());
timestamps.add(record.timestamp());
if (record.timestampType() != null) {
timestampTypes.add(record.timestampType().name());
}
}

private Object obtainPayload(Type type, ConsumerRecord<?, ?> record, List<ConversionException> conversionFailures) {
return this.recordConverter == null || !containerType(type)
? extractAndConvertValue(record, type)
: convert(record, type, conversionFailures);
}

private void addToConvertedHeaders(Headers headers, List<Map<String, Object>> convertedHeaders) {
Map<String, Object> converted = new HashMap<>();
this.headerMapper.toHeaders(headers, converted);
convertedHeaders.add(converted);
}

@Override
public List<ProducerRecord<?, ?>> fromMessage(Message<?> message, String defaultTopic) {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit c0e823b

Please sign in to comment.