Skip to content

make use of incoming Message's replyTopic header if needed #2904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Venil Noronha
* @author Nathan Xu
*/
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware {

Expand Down Expand Up @@ -466,7 +467,13 @@ private String evaluateTopic(Object request, Object source, Object result, @Null
* @since 2.1.3
*/
@SuppressWarnings("unchecked")
protected void sendResponse(Object result, String topic, @Nullable Object source, boolean returnTypeMessage) {
protected void sendResponse(Object result, @Nullable String topic, @Nullable Object source, boolean returnTypeMessage) {
if (topic == null && source instanceof Message) {
byte[] replyTopicBytes = getReplyTopic((Message<?>) source);
if (replyTopicBytes != null) {
topic = new String(replyTopicBytes, StandardCharsets.UTF_8);
}
}
if (!returnTypeMessage && topic == null) {
this.logger.debug(() -> "No replyTopic to handle the reply: " + result);
}
Expand All @@ -482,14 +489,14 @@ else if (result instanceof Message) {
iterableOfMessages = iterator.next() instanceof Message;
}
if (iterableOfMessages || this.splitIterables) {
((Iterable<V>) result).forEach(v -> {
for (V v : (Iterable<V>) result) {
if (v instanceof Message) {
this.replyTemplate.send((Message<?>) v);
this.replyTemplate.send(checkHeaders(v, topic, source));
}
else {
this.replyTemplate.send(topic, v);
}
});
}
}
else {
sendSingleResult(result, topic, source);
Expand All @@ -506,19 +513,19 @@ private Message<?> checkHeaders(Object result, String topic, @Nullable Object so
MessageHeaders headers = reply.getHeaders();
boolean needsTopic = headers.get(KafkaHeaders.TOPIC) == null;
boolean sourceIsMessage = source instanceof Message;
boolean needsCorrelation = headers.get(this.correlationHeaderName) == null && sourceIsMessage;
boolean needsCorrelation = headers.get(this.correlationHeaderName) == null && sourceIsMessage
&& getCorrelationId((Message<?>) source) != null;
boolean needsPartition = headers.get(KafkaHeaders.PARTITION) == null && sourceIsMessage
&& getReplyPartition((Message<?>) source) != null;
if (needsTopic || needsCorrelation || needsPartition) {
MessageBuilder<?> builder = MessageBuilder.fromMessage(reply);
if (needsTopic) {
builder.setHeader(KafkaHeaders.TOPIC, topic);
}
if (needsCorrelation && sourceIsMessage) {
builder.setHeader(this.correlationHeaderName,
((Message<?>) source).getHeaders().get(this.correlationHeaderName));
if (needsCorrelation) {
setCorrelationId(builder, (Message<?>) source);
}
if (sourceIsMessage && reply.getHeaders().get(KafkaHeaders.REPLY_PARTITION) == null) {
if (needsPartition) {
setPartition(builder, (Message<?>) source);
}
reply = builder.build();
Expand Down Expand Up @@ -571,6 +578,30 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc
this.replyTemplate.send(builder.build());
}

private void setTopic(MessageBuilder<?> builder, Message<?> source) {
byte[] topicBytes = getReplyTopic(source);
if (topicBytes != null) {
builder.setHeader(KafkaHeaders.TOPIC, new String(topicBytes, StandardCharsets.UTF_8));
}
}

@Nullable
private byte[] getReplyTopic(Message<?> source) {
return source.getHeaders().get(KafkaHeaders.REPLY_TOPIC, byte[].class);
}

private void setCorrelationId(MessageBuilder<?> builder, Message<?> source) {
byte[] correlationIdBytes = getCorrelationId(source);
if (correlationIdBytes != null) {
builder.setHeader(this.correlationHeaderName, correlationIdBytes);
}
}

@Nullable
private byte[] getCorrelationId(Message<?> source) {
return source.getHeaders().get(this.correlationHeaderName, byte[].class);
}

private void setPartition(MessageBuilder<?> builder, Message<?> source) {
byte[] partitionBytes = getReplyPartition(source);
if (partitionBytes != null) {
Expand Down