From 6a9cae8009dbeaae5891fc171607e33e480d64b1 Mon Sep 17 00:00:00 2001 From: Loginov Vladimir Date: Thu, 7 Dec 2023 21:05:33 +0400 Subject: [PATCH] Gh-2924: Fix send null payload with KafkaTemplate#send(Message) and JsonMessageConverter Fixes: https://github.com/spring-projects/spring-kafka/issues/2924 Add posibility to send KafkaNull message payload with JsonMessageConverter. --- .../converter/ByteArrayJsonMessageConverter.java | 5 +++-- .../support/converter/BytesJsonMessageConverter.java | 5 +++-- .../kafka/support/converter/JsonMessageConverter.java | 10 +++++++--- .../support/converter/StringJsonMessageConverter.java | 10 ++++------ 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java index 6348495caf..cd0f357aed 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/ByteArrayJsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,8 @@ public ByteArrayJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - return getObjectMapper().writeValueAsBytes(message.getPayload()); + Object payload = super.convertPayload(message); + return payload == null ? null : getObjectMapper().writeValueAsBytes(payload); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java index e219b15e9f..6ea3d38c80 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BytesJsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,7 +46,8 @@ public BytesJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - return Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload())); + Object payload = super.convertPayload(message); + return payload == null ? null : Bytes.wrap(getObjectMapper().writeValueAsBytes(payload)); } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java index 2e1df2ca9a..0de7028f84 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/JsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -93,8 +93,12 @@ protected Headers initialRecordHeaders(Message message) { @Override protected Object convertPayload(Message message) { - throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value " - + "corresponding to the configured Kafka Serializer"); + Object payload = super.convertPayload(message); + if (payload != null) { + throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value " + + "corresponding to the configured Kafka Serializer"); + } + return payload; } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java index 3162904935..ba92443c2f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/StringJsonMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,12 +44,10 @@ public StringJsonMessageConverter(ObjectMapper objectMapper) { @Override protected Object convertPayload(Message message) { try { - return getObjectMapper() - .writeValueAsString(message.getPayload()); - } - catch (JsonProcessingException e) { + Object payload = super.convertPayload(message); + return payload == null ? null : getObjectMapper().writeValueAsString(payload); + } catch (JsonProcessingException e) { throw new ConversionException("Failed to convert to JSON", message, e); } } - }