Skip to content

fix some defects of setting default Message headers in MessagingMessageListenerAdapter #2908

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Venil Noronha
* @author Nathan Xu
*/
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware {

Expand Down Expand Up @@ -470,8 +471,8 @@ protected void sendResponse(Object result, String topic, @Nullable Object source
if (!returnTypeMessage && topic == null) {
this.logger.debug(() -> "No replyTopic to handle the reply: " + result);
}
else if (result instanceof Message) {
Message<?> reply = checkHeaders(result, topic, source);
else if (result instanceof Message<?> mResult) {
Message<?> reply = checkHeaders(mResult, topic, source);
this.replyTemplate.send(reply);
}
else {
Expand All @@ -483,8 +484,9 @@ else if (result instanceof Message) {
}
if (iterableOfMessages || this.splitIterables) {
((Iterable<V>) result).forEach(v -> {
if (v instanceof Message) {
this.replyTemplate.send((Message<?>) v);
if (v instanceof Message<?> mv) {
Message<?> aReply = checkHeaders(mv, topic, source);
this.replyTemplate.send(aReply);
}
else {
this.replyTemplate.send(topic, v);
Expand All @@ -501,24 +503,23 @@ else if (result instanceof Message) {
}
}

private Message<?> checkHeaders(Object result, String topic, @Nullable Object source) { // NOSONAR (complexity)
Message<?> reply = (Message<?>) result;
private Message<?> checkHeaders(Message<?> reply, @Nullable String topic, @Nullable Object source) { // NOSONAR (complexity)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think choosing Message<?> as the first argument type makes more sense, for this method is only meant to be used with that precondition.

Copy link
Contributor Author

@NathanQingyangXu NathanQingyangXu Nov 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether we can use Nullable to topic, but from its invocation context, seems we could not rule out the nullness possibility?

MessageHeaders headers = reply.getHeaders();
boolean needsTopic = headers.get(KafkaHeaders.TOPIC) == null;
boolean needsTopic = topic != null && headers.get(KafkaHeaders.TOPIC) == null;
boolean sourceIsMessage = source instanceof Message;
boolean needsCorrelation = headers.get(this.correlationHeaderName) == null && sourceIsMessage;
boolean needsCorrelation = headers.get(this.correlationHeaderName) == null && sourceIsMessage
&& getCorrelation((Message<?>) source) != null;
boolean needsPartition = headers.get(KafkaHeaders.PARTITION) == null && sourceIsMessage
&& getReplyPartition((Message<?>) source) != null;
if (needsTopic || needsCorrelation || needsPartition) {
MessageBuilder<?> builder = MessageBuilder.fromMessage(reply);
if (needsTopic) {
builder.setHeader(KafkaHeaders.TOPIC, topic);
}
if (needsCorrelation && sourceIsMessage) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems the second condtion has been implied in the first condition

builder.setHeader(this.correlationHeaderName,
((Message<?>) source).getHeaders().get(this.correlationHeaderName));
if (needsCorrelation) {
setCorrelation(builder, (Message<?>) source);
}
if (sourceIsMessage && reply.getHeaders().get(KafkaHeaders.REPLY_PARTITION) == null) {
Copy link
Contributor Author

@NathanQingyangXu NathanQingyangXu Nov 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems only PARTITION header matters for reply side, which has been covered in existing needsPartition variable.

if (needsPartition) {
setPartition(builder, (Message<?>) source);
}
reply = builder.build();
Expand All @@ -531,8 +532,8 @@ private void sendSingleResult(Object result, String topic, @Nullable Object sour
byte[] correlationId = null;
boolean sourceIsMessage = source instanceof Message;
if (sourceIsMessage
&& ((Message<?>) source).getHeaders().get(this.correlationHeaderName) != null) {
correlationId = ((Message<?>) source).getHeaders().get(this.correlationHeaderName, byte[].class);
&& getCorrelation((Message<?>) source) != null) {
correlationId = getCorrelation((Message<?>) source);
}
if (sourceIsMessage) {
sendReplyForMessageSource(result, topic, source, correlationId);
Expand Down Expand Up @@ -571,6 +572,18 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc
this.replyTemplate.send(builder.build());
}

private void setCorrelation(MessageBuilder<?> builder, Message<?> source) {
byte[] correlationBytes = getCorrelation(source);
if (correlationBytes != null) {
builder.setHeader(this.correlationHeaderName, correlationBytes);
}
}

@Nullable
private byte[] getCorrelation(Message<?> source) {
return source.getHeaders().get(this.correlationHeaderName, byte[].class);
}

private void setPartition(MessageBuilder<?> builder, Message<?> source) {
byte[] partitionBytes = getReplyPartition(source);
if (partitionBytes != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@

/**
* @author Gary Russell
* @author Nathan Xu
* @since 2.1.3
*
*/
Expand All @@ -116,7 +117,8 @@
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST,
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST,
ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST })
ReplyingKafkaTemplateTests.L_REPLY, ReplyingKafkaTemplateTests.L_REQUEST,
ReplyingKafkaTemplateTests.M_REPLY, ReplyingKafkaTemplateTests.M_REQUEST })
public class ReplyingKafkaTemplateTests {

public static final String A_REPLY = "aReply";
Expand Down Expand Up @@ -167,6 +169,10 @@ public class ReplyingKafkaTemplateTests {

public static final String L_REQUEST = "lRequest";

public static final String M_REPLY = "mReply";

public static final String M_REQUEST = "mRequest";

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

Expand Down Expand Up @@ -845,6 +851,24 @@ void requestTimeoutWithMessage() throws Exception {
}
}

@Test
void testMessageIterableReturn() throws Exception {
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(M_REPLY);
try {
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
Headers headers = new RecordHeaders();
ProducerRecord<Integer, String> record = new ProducerRecord<>(M_REQUEST, null, null, null, "foo", headers);
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
assertThat(consumerRecord.value()).isEqualTo("FOO");
}
finally {
template.stop();
template.destroy();
}
}

@Configuration
@EnableKafka
public static class Config {
Expand Down Expand Up @@ -1011,6 +1035,15 @@ public Message<String> handleL(String in) throws InterruptedException {
.build();
}

@KafkaListener(id = M_REQUEST, topics = M_REQUEST)
@SendTo // default REPLY_TOPIC header
public List<Message<String>> handleM(String in) throws InterruptedException {
Message<String> message = MessageBuilder.withPayload(in.toUpperCase())
.setHeader("serverSentAnError", "user error")
.build();
return Collections.singletonList(message);
}

}

@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)
Expand Down