From 0c5f4b99e8ee46c349d7e7a2de25371bb9425263 Mon Sep 17 00:00:00 2001 From: "Zhiyang.Wang1" Date: Fri, 15 Dec 2023 13:11:16 +0800 Subject: [PATCH] minor adjustment at `MessagingMessageListenerAdapter` --- .../BatchMessagingMessageListenerAdapter.java | 4 ++-- .../adapter/MessagingMessageListenerAdapter.java | 16 +++++----------- .../RecordMessagingMessageListenerAdapter.java | 4 ++-- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index 5cd097b645..a2c1d87704 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ public class BatchMessagingMessageListenerAdapter extends MessagingMessage private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter(); - private KafkaListenerErrorHandler errorHandler; + private final KafkaListenerErrorHandler errorHandler; private BatchToRecordAdapter batchToRecordAdapter; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 7275a1b446..3c43335c45 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -529,14 +529,8 @@ private Message checkHeaders(Message reply, @Nullable String topic, @Nulla @SuppressWarnings("unchecked") private void sendSingleResult(Object result, String topic, @Nullable Object source) { - byte[] correlationId = null; - boolean sourceIsMessage = source instanceof Message; - if (sourceIsMessage - && getCorrelation((Message) source) != null) { - correlationId = getCorrelation((Message) source); - } - if (sourceIsMessage) { - sendReplyForMessageSource(result, topic, source, correlationId); + if (source instanceof Message message) { + sendReplyForMessageSource(result, topic, message, getCorrelation(message)); } else { this.replyTemplate.send(topic, result); @@ -544,11 +538,11 @@ && getCorrelation((Message) source) != null) { } @SuppressWarnings("unchecked") - private void sendReplyForMessageSource(Object result, String topic, Object source, @Nullable byte[] correlationId) { + private void sendReplyForMessageSource(Object result, String topic, Message source, @Nullable byte[] correlationId) { MessageBuilder builder = MessageBuilder.withPayload(result) .setHeader(KafkaHeaders.TOPIC, topic); if (this.replyHeadersConfigurer != null) { - Map headersToCopy = ((Message) source).getHeaders().entrySet().stream() + Map headersToCopy = source.getHeaders().entrySet().stream() .filter(e -> { String key = e.getKey(); return !key.equals(MessageHeaders.ID) && !key.equals(MessageHeaders.TIMESTAMP) @@ -568,7 +562,7 @@ private void sendReplyForMessageSource(Object result, String topic, Object sourc if (correlationId != null) { builder.setHeader(this.correlationHeaderName, correlationId); } - setPartition(builder, ((Message) source)); + setPartition(builder, source); this.replyTemplate.send(builder.build()); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java index d0b2ca5474..c0da30c278 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,7 +53,7 @@ public class RecordMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter implements AcknowledgingConsumerAwareMessageListener { - private KafkaListenerErrorHandler errorHandler; + private final KafkaListenerErrorHandler errorHandler; public RecordMessagingMessageListenerAdapter(Object bean, Method method) { this(bean, method, null);