Skip to content

Commit

Permalink
minor adjustment at MessagingMessageListenerAdapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Zhiyang.Wang1 committed Dec 15, 2023
1 parent 7805541 commit 0c5f4b9
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,7 +63,7 @@ public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessage

private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();

private KafkaListenerErrorHandler errorHandler;
private final KafkaListenerErrorHandler errorHandler;

private BatchToRecordAdapter<K, V> batchToRecordAdapter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,26 +529,20 @@ private Message<?> checkHeaders(Message<?> reply, @Nullable String topic, @Nulla

@SuppressWarnings("unchecked")
private void sendSingleResult(Object result, String topic, @Nullable Object source) {
byte[] correlationId = null;
boolean sourceIsMessage = source instanceof Message;
if (sourceIsMessage
&& getCorrelation((Message<?>) source) != null) {
correlationId = getCorrelation((Message<?>) source);
}
if (sourceIsMessage) {
sendReplyForMessageSource(result, topic, source, correlationId);
if (source instanceof Message<?> message) {
sendReplyForMessageSource(result, topic, message, getCorrelation(message));
}
else {
this.replyTemplate.send(topic, result);
}
}

@SuppressWarnings("unchecked")
private void sendReplyForMessageSource(Object result, String topic, Object source, @Nullable byte[] correlationId) {
private void sendReplyForMessageSource(Object result, String topic, Message<?> source, @Nullable byte[] correlationId) {
MessageBuilder<Object> builder = MessageBuilder.withPayload(result)
.setHeader(KafkaHeaders.TOPIC, topic);
if (this.replyHeadersConfigurer != null) {
Map<String, Object> headersToCopy = ((Message<?>) source).getHeaders().entrySet().stream()
Map<String, Object> headersToCopy = source.getHeaders().entrySet().stream()
.filter(e -> {
String key = e.getKey();
return !key.equals(MessageHeaders.ID) && !key.equals(MessageHeaders.TIMESTAMP)
Expand All @@ -568,7 +562,7 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc
if (correlationId != null) {
builder.setHeader(this.correlationHeaderName, correlationId);
}
setPartition(builder, ((Message<?>) source));
setPartition(builder, source);
this.replyTemplate.send(builder.build());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,7 +53,7 @@
public class RecordMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
implements AcknowledgingConsumerAwareMessageListener<K, V> {

private KafkaListenerErrorHandler errorHandler;
private final KafkaListenerErrorHandler errorHandler;

public RecordMessagingMessageListenerAdapter(Object bean, Method method) {
this(bean, method, null);
Expand Down

0 comments on commit 0c5f4b9

Please sign in to comment.