diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java index 7cb5b297ee..c2bfcd3922 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/DefaultKafkaHeaderMapper.java @@ -55,6 +55,7 @@ * * @author Gary Russell * @author Artem Bilan + * @author Soby Chacko * * @since 1.3 * @@ -323,6 +324,10 @@ public void toHeaders(Headers source, final Map headers) { else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) { headers.put(headerName, new String(header.value(), getCharset())); } + else if (headerName.equals(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) || + headerName.equals(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)) { + headers.put(headerName, header); + } else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) { if (jsonTypes.containsKey(headerName)) { String requestedType = jsonTypes.get(headerName); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java index 616e6b224d..29438b98b4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,12 +38,32 @@ * * @author Gary Russell * @author Wang ZhiYang + * @author Soby Chacko * * @since 2.2 * */ public final class KafkaUtils { + /** + * Header name for deserialization exceptions. + * @since 3.0.15 + */ + public static final String DESERIALIZER_EXCEPTION_HEADER_PREFIX = "springDeserializerException"; + + /** + * Header name for deserialization exceptions. + * @since 3.0.15 + */ + public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Key"; + + /** + * Header name for deserialization exceptions. + * @since 3.0.15 + */ + public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Value"; + + private static Function, String> prFormatter = ProducerRecord::toString; private static Function, String> crFormatter = diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/SerializationUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/SerializationUtils.java index 48c0121039..3c2259af65 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/SerializationUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/SerializationUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,19 +50,19 @@ public final class SerializationUtils { * Header name for deserialization exceptions. * @since 2.8 */ - public static final String DESERIALIZER_EXCEPTION_HEADER_PREFIX = "springDeserializerException"; + public static final String DESERIALIZER_EXCEPTION_HEADER_PREFIX = KafkaUtils.DESERIALIZER_EXCEPTION_HEADER_PREFIX; /** * Header name for deserialization exceptions. * @since 2.8 */ - public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Key"; + public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER; /** * Header name for deserialization exceptions. * @since 2.8 */ - public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Value"; + public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER; private SerializationUtils() { } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java index d3624dfab0..488e5ef7dd 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,10 +33,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.io.UncheckedIOException; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -80,6 +76,7 @@ /** * @author Gary Russell * @author Tomaz Fernandes + * @author Soby Chacko * @since 2.4.3 * */ @@ -174,9 +171,9 @@ void valueHeaderStripped() { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); Headers headers = new RecordHeaders(); headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, - header(false))); + SerializationTestUtils.header(false))); headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, - header(true))); + SerializationTestUtils.header(true))); Headers custom = new RecordHeaders(); custom.add(new RecordHeader("foo", "bar".getBytes())); recoverer.setHeadersFunction((rec, ex) -> custom); @@ -206,7 +203,7 @@ void keyHeaderStripped() { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); Headers headers = new RecordHeaders(); headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, - header(true))); + SerializationTestUtils.header(true))); CompletableFuture future = new CompletableFuture(); future.complete(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); @@ -225,9 +222,9 @@ void keyDeserOnly() { KafkaOperations template = mock(KafkaOperations.class); DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template); Headers headers = new RecordHeaders(); - DeserializationException deserEx = createDeserEx(true); + DeserializationException deserEx = SerializationTestUtils.createDeserEx(true); headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, - header(true, deserEx))); + SerializationTestUtils.header(deserEx))); CompletableFuture future = new CompletableFuture(); future.complete(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); @@ -250,9 +247,9 @@ void headersNotStripped() { recoverer.setRetainExceptionHeader(true); Headers headers = new RecordHeaders(); headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, - header(false))); + SerializationTestUtils.header(false))); headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, - header(true))); + SerializationTestUtils.header(true))); CompletableFuture future = new CompletableFuture(); future.complete(new Object()); willReturn(future).given(template).send(any(ProducerRecord.class)); @@ -302,27 +299,6 @@ void tombstoneWithMultiTemplatesExplicit() { verify(template2).send(any(ProducerRecord.class)); } - private byte[] header(boolean isKey) { - return header(isKey, createDeserEx(isKey)); - } - - private DeserializationException createDeserEx(boolean isKey) { - return new DeserializationException( - isKey ? "testK" : "testV", - isKey ? "key".getBytes() : "value".getBytes(), isKey, null); - } - - private byte[] header(boolean isKey, DeserializationException deserEx) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - new ObjectOutputStream(baos).writeObject(deserEx); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - return baos.toByteArray(); - } - @SuppressWarnings({"unchecked", "rawtypes"}) @Test void allOriginalHeaders() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java index daf1aa6c29..afa7525d04 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/DefaultKafkaHeaderMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,11 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.jupiter.api.Test; +import org.springframework.core.log.LogAccessor; import org.springframework.kafka.support.DefaultKafkaHeaderMapper.NonTrustedHeaderType; +import org.springframework.kafka.support.serializer.DeserializationException; +import org.springframework.kafka.support.serializer.SerializationTestUtils; +import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.ExecutorSubscribableChannel; @@ -46,6 +50,7 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Soby Chacko * * @since 1.3 * @@ -321,6 +326,38 @@ void inboundJson() { .containsKey("baz"); } + @Test + void deserializationExceptionHeadersAreMappedAsNonByteArray() { + DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper(); + + byte[] keyDeserExceptionBytes = SerializationTestUtils.header(true); + Header keyHeader = SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, + keyDeserExceptionBytes); + byte[] valueDeserExceptionBytes = SerializationTestUtils.header(false); + Header valueHeader = SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, + valueDeserExceptionBytes); + Headers headers = new RecordHeaders( + new Header[] { keyHeader, valueHeader }); + Map springHeaders = new HashMap<>(); + mapper.toHeaders(headers, springHeaders); + assertThat(springHeaders.get(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(keyHeader); + assertThat(springHeaders.get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(valueHeader); + + LogAccessor logger = new LogAccessor(this.getClass()); + + DeserializationException keyDeserializationException = SerializationUtils.byteArrayToDeserializationException(logger, keyHeader); + assertThat(keyDeserExceptionBytes).containsExactly(SerializationTestUtils.header(keyDeserializationException)); + + DeserializationException valueDeserializationException = + SerializationUtils.byteArrayToDeserializationException(logger, valueHeader); + assertThat(valueDeserExceptionBytes).containsExactly(SerializationTestUtils.header(valueDeserializationException)); + + headers = new RecordHeaders(); + mapper.fromHeaders(new MessageHeaders(springHeaders), headers); + assertThat(headers.lastHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull(); + assertThat(headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull(); + } + public static final class Foo { private String bar = "bar"; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationTestUtils.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationTestUtils.java index 4837c6575f..2f7194aa51 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationTestUtils.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationTestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2023-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,16 @@ package org.springframework.kafka.support.serializer; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.UncheckedIOException; + import org.apache.kafka.common.header.Header; /** * @author Gary Russell + * @author Soby Chacko * @since 2.9.11 * */ @@ -32,4 +38,25 @@ public static Header deserializationHeader(String key, byte[] value) { return new DeserializationExceptionHeader(key, value); } + public static byte[] header(boolean isKey) { + return header(createDeserEx(isKey)); + } + + public static DeserializationException createDeserEx(boolean isKey) { + return new DeserializationException( + isKey ? "testK" : "testV", + isKey ? "key".getBytes() : "value".getBytes(), isKey, null); + } + + public static byte[] header(DeserializationException deserEx) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + new ObjectOutputStream(baos).writeObject(deserEx); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + return baos.toByteArray(); + } + }