Skip to content

Commit

Permalink
Gh-2924: Fix KafkaNull for JSONConverters
Browse files Browse the repository at this point in the history
 Fixes: gh-2924

 Add the possibility of sending KafkaNull message payload with JsonMessageConverter.

 **Cherry-pick to 3.0.x**
  • Loading branch information
pswrdf authored and sobychacko committed Dec 8, 2023
1 parent 9e87a0c commit 60501ae
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.support.converter;

import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -29,6 +30,7 @@
* {@code String<->byte[]} conversion is avoided.
*
* @author Gary Russell
* @author Vladimir Loginov
* @since 2.3
*
*/
Expand All @@ -44,7 +46,9 @@ public ByteArrayJsonMessageConverter(ObjectMapper objectMapper) {
@Override
protected Object convertPayload(Message<?> message) {
try {
return getObjectMapper().writeValueAsBytes(message.getPayload());
return message.getPayload() instanceof KafkaNull
? null
: getObjectMapper().writeValueAsBytes(message.getPayload());
}
catch (JsonProcessingException e) {
throw new ConversionException("Failed to convert to JSON", message, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import org.apache.kafka.common.utils.Bytes;

import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -31,6 +32,7 @@
* {@code String<->byte[]} conversion is avoided.
*
* @author Gary Russell
* @author Vladimir Loginov
* @since 2.1.7
*
*/
Expand All @@ -46,7 +48,9 @@ public BytesJsonMessageConverter(ObjectMapper objectMapper) {
@Override
protected Object convertPayload(Message<?> message) {
try {
return Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload()));
return message.getPayload() instanceof KafkaNull
? null
: Bytes.wrap(getObjectMapper().writeValueAsBytes(message.getPayload()));
}
catch (JsonProcessingException e) {
throw new ConversionException("Failed to convert to JSON", message, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.support.converter;

import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -31,6 +32,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Dariusz Szablinski
* @author Vladimir Loginov
*/
public class StringJsonMessageConverter extends JsonMessageConverter {

Expand All @@ -44,12 +46,12 @@ public StringJsonMessageConverter(ObjectMapper objectMapper) {
@Override
protected Object convertPayload(Message<?> message) {
try {
return getObjectMapper()
.writeValueAsString(message.getPayload());
return message.getPayload() instanceof KafkaNull
? null
: getObjectMapper().writeValueAsString(message.getPayload());
}
catch (JsonProcessingException e) {
throw new ConversionException("Failed to convert to JSON", message, e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.annotation;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.BytesJsonMessageConverter;
import org.springframework.kafka.support.converter.ConversionException;
Expand All @@ -74,6 +76,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Vladimir Loginov
*
* @since 1.3.2
*
Expand Down Expand Up @@ -126,6 +129,8 @@ public void testBatchOfPojoMessages(@Autowired KafkaAdmin admin) throws Exceptio
assertThat(listener.received.size()).isGreaterThan(0);
assertThat(listener.received.get(0).getPayload()).isInstanceOf(Foo.class);
assertThat(listener.received.get(0).getPayload().getBar()).isEqualTo("bar");
assertThatNoException().isThrownBy(() -> this.template.send(
new GenericMessage<>(KafkaNull.INSTANCE, Collections.singletonMap(KafkaHeaders.TOPIC, topic))));
verify(admin, never()).clusterId();
}

Expand Down

0 comments on commit 60501ae

Please sign in to comment.