diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index e6483db39b..78d74599e0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -43,6 +43,7 @@ * * @author Gary Russell * @author Artem Bilan + * @author Soby Chacko * * @since 2.1.3 * @@ -267,11 +268,11 @@ else if (value instanceof String) { * @return the value to add. */ protected Object headerValueToAddIn(Header header) { - Object mapped = mapRawIn(header.key(), header.value()); - if (mapped == null) { - mapped = header.value(); + if (header == null || header.value() == null) { + return null; } - return mapped; + String mapped = mapRawIn(header.key(), header.value()); + return mapped != null ? mapped : header.value(); } @Nullable diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java index 8e498d57e3..43724a9d8b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java @@ -46,6 +46,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; /** * @author Gary Russell @@ -358,6 +362,20 @@ void deserializationExceptionHeadersAreMappedAsNonByteArray() { assertThat(headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull(); } + @Test + void ensureNullHeaderValueHandledGraciously() { + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + + Header mockHeader = mock(Header.class); + given(mockHeader.value()).willReturn(null); + + Object result = mapper.headerValueToAddIn(mockHeader); + + assertThat(result).isNull(); + verify(mockHeader).value(); + verify(mockHeader, never()).key(); + } + public static final class Foo { private String bar = "bar";