Skip to content

Commit

Permalink
Gh-2924: Fix send null payload with KafkaTemplate#send(Message) and …
Browse files Browse the repository at this point in the history
…JsonMessageConverter

Fixes: #2924

 Add posibility to send KafkaNull message payload with JsonMessageConverter.
 **Cherry-pick to 3.0.x**
  • Loading branch information
Loginov Vladimir committed Dec 7, 2023
1 parent 6a9cae8 commit caa847f
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.kafka.support.converter;

import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -29,6 +30,7 @@
* {@code String<->byte[]} conversion is avoided.
*
* @author Gary Russell
* @author Vladimir Loginov
* @since 2.3
*
*/
Expand All @@ -44,8 +46,9 @@ public ByteArrayJsonMessageConverter(ObjectMapper objectMapper) {
@Override
protected Object convertPayload(Message<?> message) {
try {
Object payload = super.convertPayload(message);
return payload == null ? null : getObjectMapper().writeValueAsBytes(payload);
return message.getPayload() instanceof KafkaNull
? null
: getObjectMapper().writeValueAsBytes(message.getPayload());
}
catch (JsonProcessingException e) {
throw new ConversionException("Failed to convert to JSON", message, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.common.utils.Bytes;

import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -31,6 +32,7 @@
* {@code String<->byte[]} conversion is avoided.
*
* @author Gary Russell
* @author Vladimir Loginov
* @since 2.1.7
*
*/
Expand All @@ -46,8 +48,9 @@ public BytesJsonMessageConverter(ObjectMapper objectMapper) {
@Override
protected Object convertPayload(Message<?> message) {
try {
Object payload = super.convertPayload(message);
return payload == null ? null : Bytes.wrap(getObjectMapper().writeValueAsBytes(payload));
return message.getPayload() instanceof KafkaNull
? null
: Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload()));
}
catch (JsonProcessingException e) {
throw new ConversionException("Failed to convert to JSON", message, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,8 @@ protected Headers initialRecordHeaders(Message<?> message) {

@Override
protected Object convertPayload(Message<?> message) {
Object payload = super.convertPayload(message);
if (payload != null) {
throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value "
+ "corresponding to the configured Kafka Serializer");
}
return payload;
throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value "
+ "corresponding to the configured Kafka Serializer");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.kafka.support.converter;

import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -31,6 +32,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Dariusz Szablinski
* @author Vladimir Loginov
*/
public class StringJsonMessageConverter extends JsonMessageConverter {

Expand All @@ -44,9 +46,11 @@ public StringJsonMessageConverter(ObjectMapper objectMapper) {
@Override
protected Object convertPayload(Message<?> message) {
try {
Object payload = super.convertPayload(message);
return payload == null ? null : getObjectMapper().writeValueAsString(payload);
} catch (JsonProcessingException e) {
return message.getPayload() instanceof KafkaNull
? null
: getObjectMapper().writeValueAsString(message.getPayload());
}
catch (JsonProcessingException e) {
throw new ConversionException("Failed to convert to JSON", message, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.annotation;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.BytesJsonMessageConverter;
import org.springframework.kafka.support.converter.ConversionException;
Expand All @@ -75,6 +77,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Vladimir Loginov
*
* @since 1.3.2
*
Expand Down Expand Up @@ -131,6 +134,8 @@ public void testBatchOfPojoMessages(@Autowired KafkaAdmin admin) throws Exceptio
assertThat(listener.received.size()).isGreaterThan(0);
assertThat(listener.received.get(0).getPayload()).isInstanceOf(Foo.class);
assertThat(listener.received.get(0).getPayload().getBar()).isEqualTo("bar");
assertThatNoException().isThrownBy(() -> this.template.send(
new GenericMessage<>(KafkaNull.INSTANCE, Collections.singletonMap(KafkaHeaders.TOPIC, topic))));
verify(admin, never()).clusterId();
}

Expand Down

0 comments on commit caa847f

Please sign in to comment.