From 6a9cae8009dbeaae5891fc171607e33e480d64b1 Mon Sep 17 00:00:00 2001 From: Loginov Vladimir Date: Thu, 7 Dec 2023 21:05:33 +0400 Subject: [PATCH 1/2] Gh-2924: Fix send null payload with KafkaTemplate#send(Message) and JsonMessageConverter Fixes: https://github.com/spring-projects/spring-kafka/issues/2924 Add posibility to send KafkaNull message payload with JsonMessageConverter. --- .../converter/ByteArrayJsonMessageConverter.java | 5 +++-- .../support/converter/BytesJsonMessageConverter.java | 5 +++-- .../kafka/support/converter/JsonMessageConverter.java | 10 +++++++--- .../support/converter/StringJsonMessageConverter.java | 10 ++++------ 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java index 6348495caf..cd0f357aed 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,8 @@ public ByteArrayJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - return getObjectMapper().writeValueAsBytes(message.getPayload()); + Object payload = super.convertPayload(message); + return payload == null ? null : getObjectMapper().writeValueAsBytes(payload); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java index e219b15e9f..6ea3d38c80 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,7 +46,8 @@ public BytesJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - return Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload())); + Object payload = super.convertPayload(message); + return payload == null ? null : Bytes.wrap(getObjectMapper().writeValueAsBytes(payload)); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java index 2e1df2ca9a..0de7028f84 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -93,8 +93,12 @@ protected Headers initialRecordHeaders(Message message) { @Override protected Object convertPayload(Message message) { - throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value " - + "corresponding to the configured Kafka Serializer"); + Object payload = super.convertPayload(message); + if (payload != null) { + throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value " + + "corresponding to the configured Kafka Serializer"); + } + return payload; } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java index 3162904935..ba92443c2f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,12 +44,10 @@ public StringJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - return getObjectMapper() - .writeValueAsString(message.getPayload()); - } - catch (JsonProcessingException e) { + Object payload = super.convertPayload(message); + return payload == null ? null : getObjectMapper().writeValueAsString(payload); + } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); } } - } From caa847fe8430afb5ee421470f619b5dd738d8dd6 Mon Sep 17 00:00:00 2001 From: Loginov Vladimir Date: Fri, 8 Dec 2023 02:46:07 +0400 Subject: [PATCH 2/2] Gh-2924: Fix send null payload with KafkaTemplate#send(Message) and JsonMessageConverter Fixes: https://github.com/spring-projects/spring-kafka/issues/2924 Add posibility to send KafkaNull message payload with JsonMessageConverter. **Cherry-pick to 3.0.x** --- .../converter/ByteArrayJsonMessageConverter.java | 7 +++++-- .../support/converter/BytesJsonMessageConverter.java | 7 +++++-- .../kafka/support/converter/JsonMessageConverter.java | 8 ++------ .../support/converter/StringJsonMessageConverter.java | 10 +++++++--- .../kafka/annotation/BatchListenerConversionTests.java | 5 +++++ 5 files changed, 24 insertions(+), 13 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java index cd0f357aed..b359ae3f95 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java @@ -16,6 +16,7 @@ package org.springframework.kafka.support.converter; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import com.fasterxml.jackson.core.JsonProcessingException; @@ -29,6 +30,7 @@ * {@code String<->byte[]} conversion is avoided. * * @author Gary Russell + * @author Vladimir Loginov * @since 2.3 * */ @@ -44,8 +46,9 @@ public ByteArrayJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - Object payload = super.convertPayload(message); - return payload == null ? null : getObjectMapper().writeValueAsBytes(payload); + return message.getPayload() instanceof KafkaNull + ? null + : getObjectMapper().writeValueAsBytes(message.getPayload()); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java index 6ea3d38c80..65e3ca86df 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.utils.Bytes; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import com.fasterxml.jackson.core.JsonProcessingException; @@ -31,6 +32,7 @@ * {@code String<->byte[]} conversion is avoided. * * @author Gary Russell + * @author Vladimir Loginov * @since 2.1.7 * */ @@ -46,8 +48,9 @@ public BytesJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - Object payload = super.convertPayload(message); - return payload == null ? null : Bytes.wrap(getObjectMapper().writeValueAsBytes(payload)); + return message.getPayload() instanceof KafkaNull + ? null + : Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload())); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java index 0de7028f84..a32872c18c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java @@ -93,12 +93,8 @@ protected Headers initialRecordHeaders(Message message) { @Override protected Object convertPayload(Message message) { - Object payload = super.convertPayload(message); - if (payload != null) { - throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value " - + "corresponding to the configured Kafka Serializer"); - } - return payload; + throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value " + + "corresponding to the configured Kafka Serializer"); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java index ba92443c2f..869b8d2762 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java @@ -16,6 +16,7 @@ package org.springframework.kafka.support.converter; +import org.springframework.kafka.support.KafkaNull; import org.springframework.messaging.Message; import com.fasterxml.jackson.core.JsonProcessingException; @@ -31,6 +32,7 @@ * @author Gary Russell * @author Artem Bilan * @author Dariusz Szablinski + * @author Vladimir Loginov */ public class StringJsonMessageConverter extends JsonMessageConverter { @@ -44,9 +46,11 @@ public StringJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - Object payload = super.convertPayload(message); - return payload == null ? null : getObjectMapper().writeValueAsString(payload); - } catch (JsonProcessingException e) { + return message.getPayload() instanceof KafkaNull + ? null + : getObjectMapper().writeValueAsString(message.getPayload()); + } + catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java index 3201621f23..d7ed4c1996 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/BatchListenerConversionTests.java @@ -17,6 +17,7 @@ package org.springframework.kafka.annotation; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -56,6 +57,7 @@ import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.KafkaNull; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.BytesJsonMessageConverter; import org.springframework.kafka.support.converter.ConversionException; @@ -75,6 +77,7 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Vladimir Loginov * * @since 1.3.2 * @@ -131,6 +134,8 @@ public void testBatchOfPojoMessages(@Autowired KafkaAdmin admin) throws Exceptio assertThat(listener.received.size()).isGreaterThan(0); assertThat(listener.received.get(0).getPayload()).isInstanceOf(Foo.class); assertThat(listener.received.get(0).getPayload().getBar()).isEqualTo("bar"); + assertThatNoException().isThrownBy(() -> this.template.send( + new GenericMessage<>(KafkaNull.INSTANCE, Collections.singletonMap(KafkaHeaders.TOPIC, topic)))); verify(admin, never()).clusterId(); }