diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java index cd0f357aed..b359ae3f95 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java @@ -16,6 +16,7 @@ package org.springframework.kafka.support.converter; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import com.fasterxml.jackson.core.JsonProcessingException; @@ -29,6 +30,7 @@ * {@code String<->byte[]} conversion is avoided. * * @author Gary Russell + * @author Vladimir Loginov * @since 2.3 * */ @@ -44,8 +46,9 @@ public ByteArrayJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - Object payload = super.convertPayload(message); - return payload == null ? null : getObjectMapper().writeValueAsBytes(payload); + return message.getPayload() instanceof KafkaNull + ? null + : getObjectMapper().writeValueAsBytes(message.getPayload()); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java index 6ea3d38c80..65e3ca86df 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.utils.Bytes; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import com.fasterxml.jackson.core.JsonProcessingException; @@ -31,6 +32,7 @@ * {@code String<->byte[]} conversion is avoided. * * @author Gary Russell + * @author Vladimir Loginov * @since 2.1.7 * */ @@ -46,8 +48,9 @@ public BytesJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - Object payload = super.convertPayload(message); - return payload == null ? null : Bytes.wrap(getObjectMapper().writeValueAsBytes(payload)); + return message.getPayload() instanceof KafkaNull + ? null + : Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload())); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java index 0de7028f84..a32872c18c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java @@ -93,12 +93,8 @@ protected Headers initialRecordHeaders(Message message) { @Override protected Object convertPayload(Message message) { - Object payload = super.convertPayload(message); - if (payload != null) { - throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value " - + "corresponding to the configured Kafka Serializer"); - } - return payload; + throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value " + + "corresponding to the configured Kafka Serializer"); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java index ba92443c2f..869b8d2762 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java @@ -16,6 +16,7 @@ package org.springframework.kafka.support.converter; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import com.fasterxml.jackson.core.JsonProcessingException; @@ -31,6 +32,7 @@ * @author Gary Russell * @author Artem Bilan * @author Dariusz Szablinski + * @author Vladimir Loginov */ public class StringJsonMessageConverter extends JsonMessageConverter { @@ -44,9 +46,11 @@ public StringJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - Object payload = super.convertPayload(message); - return payload == null ? null : getObjectMapper().writeValueAsString(payload); - } catch (JsonProcessingException e) { + return message.getPayload() instanceof KafkaNull + ? null + : getObjectMapper().writeValueAsString(message.getPayload()); + } + catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java index 3201621f23..d7ed4c1996 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java @@ -17,6 +17,7 @@ package org.springframework.kafka.annotation; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -56,6 +57,7 @@ import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.BytesJsonMessageConverter; import org.springframework.kafka.support.converter.ConversionException; @@ -75,6 +77,7 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Vladimir Loginov * * @since 1.3.2 * @@ -131,6 +134,8 @@ public void testBatchOfPojoMessages(@Autowired KafkaAdmin admin) throws Exceptio assertThat(listener.received.size()).isGreaterThan(0); assertThat(listener.received.get(0).getPayload()).isInstanceOf(Foo.class); assertThat(listener.received.get(0).getPayload().getBar()).isEqualTo("bar"); + assertThatNoException().isThrownBy(() -> this.template.send( + new GenericMessage<>(KafkaNull.INSTANCE, Collections.singletonMap(KafkaHeaders.TOPIC, topic)))); verify(admin, never()).clusterId(); }