Skip to content

Commit

Permalink
GH-3114: DeserializationException propagation
Browse files Browse the repository at this point in the history
Fixes: #3114

* Since DeserializationExceptionHeader is currently porpagated as a `byte[]`,
  it encounters some issues when processing the header especially in batch
  listeners. Fixing this by providing the deserialization header without `byte[]` conversion
* Adding test to verify
* Refactoring in SerializationTestUtils

(cherry picked from commit 7cc7fc8)
  • Loading branch information
sobychacko authored and spring-builds committed Mar 14, 2024
1 parent e4d9994 commit 87a577e
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
*
* @author Gary Russell
* @author Artem Bilan
* @author Soby Chacko
*
* @since 1.3
*
Expand Down Expand Up @@ -323,6 +324,10 @@ public void toHeaders(Headers source, final Map<String, Object> headers) {
else if (headerName.equals(KafkaHeaders.LISTENER_INFO) && matchesForInbound(headerName)) {
headers.put(headerName, new String(header.value(), getCharset()));
}
else if (headerName.equals(KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER) ||
headerName.equals(KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)) {
headers.put(headerName, header);
}
else if (!(headerName.equals(JSON_TYPES)) && matchesForInbound(headerName)) {
if (jsonTypes.containsKey(headerName)) {
String requestedType = jsonTypes.get(headerName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,12 +38,32 @@
*
* @author Gary Russell
* @author Wang ZhiYang
* @author Soby Chacko
*
* @since 2.2
*
*/
public final class KafkaUtils {

/**
* Header name for deserialization exceptions.
* @since 3.0.15
*/
public static final String DESERIALIZER_EXCEPTION_HEADER_PREFIX = "springDeserializerException";

/**
* Header name for deserialization exceptions.
* @since 3.0.15
*/
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Key";

/**
* Header name for deserialization exceptions.
* @since 3.0.15
*/
public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Value";


private static Function<ProducerRecord<?, ?>, String> prFormatter = ProducerRecord::toString;

private static Function<ConsumerRecord<?, ?>, String> crFormatter =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,19 +50,19 @@ public final class SerializationUtils {
* Header name for deserialization exceptions.
* @since 2.8
*/
public static final String DESERIALIZER_EXCEPTION_HEADER_PREFIX = "springDeserializerException";
public static final String DESERIALIZER_EXCEPTION_HEADER_PREFIX = KafkaUtils.DESERIALIZER_EXCEPTION_HEADER_PREFIX;

/**
* Header name for deserialization exceptions.
* @since 2.8
*/
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Key";
public static final String KEY_DESERIALIZER_EXCEPTION_HEADER = KafkaUtils.KEY_DESERIALIZER_EXCEPTION_HEADER;

/**
* Header name for deserialization exceptions.
* @since 2.8
*/
public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = DESERIALIZER_EXCEPTION_HEADER_PREFIX + "Value";
public static final String VALUE_DESERIALIZER_EXCEPTION_HEADER = KafkaUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER;

private SerializationUtils() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,10 +33,6 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -80,6 +76,7 @@
/**
* @author Gary Russell
* @author Tomaz Fernandes
* @author Soby Chacko
* @since 2.4.3
*
*/
Expand Down Expand Up @@ -174,9 +171,9 @@ void valueHeaderStripped() {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
Headers headers = new RecordHeaders();
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
header(false)));
SerializationTestUtils.header(false)));
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
header(true)));
SerializationTestUtils.header(true)));
Headers custom = new RecordHeaders();
custom.add(new RecordHeader("foo", "bar".getBytes()));
recoverer.setHeadersFunction((rec, ex) -> custom);
Expand Down Expand Up @@ -206,7 +203,7 @@ void keyHeaderStripped() {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
Headers headers = new RecordHeaders();
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
header(true)));
SerializationTestUtils.header(true)));
CompletableFuture future = new CompletableFuture();
future.complete(new Object());
willReturn(future).given(template).send(any(ProducerRecord.class));
Expand All @@ -225,9 +222,9 @@ void keyDeserOnly() {
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
Headers headers = new RecordHeaders();
DeserializationException deserEx = createDeserEx(true);
DeserializationException deserEx = SerializationTestUtils.createDeserEx(true);
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
header(true, deserEx)));
SerializationTestUtils.header(deserEx)));
CompletableFuture future = new CompletableFuture();
future.complete(new Object());
willReturn(future).given(template).send(any(ProducerRecord.class));
Expand All @@ -250,9 +247,9 @@ void headersNotStripped() {
recoverer.setRetainExceptionHeader(true);
Headers headers = new RecordHeaders();
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
header(false)));
SerializationTestUtils.header(false)));
headers.add(SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
header(true)));
SerializationTestUtils.header(true)));
CompletableFuture future = new CompletableFuture();
future.complete(new Object());
willReturn(future).given(template).send(any(ProducerRecord.class));
Expand Down Expand Up @@ -302,27 +299,6 @@ void tombstoneWithMultiTemplatesExplicit() {
verify(template2).send(any(ProducerRecord.class));
}

private byte[] header(boolean isKey) {
return header(isKey, createDeserEx(isKey));
}

private DeserializationException createDeserEx(boolean isKey) {
return new DeserializationException(
isKey ? "testK" : "testV",
isKey ? "key".getBytes() : "value".getBytes(), isKey, null);
}

private byte[] header(boolean isKey, DeserializationException deserEx) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
new ObjectOutputStream(baos).writeObject(deserEx);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return baos.toByteArray();
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Test
void allOriginalHeaders() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,7 +34,11 @@
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper.NonTrustedHeaderType;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.kafka.support.serializer.SerializationTestUtils;
import org.springframework.kafka.support.serializer.SerializationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
Expand All @@ -46,6 +50,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Soby Chacko
*
* @since 1.3
*
Expand Down Expand Up @@ -321,6 +326,38 @@ void inboundJson() {
.containsKey("baz");
}

@Test
void deserializationExceptionHeadersAreMappedAsNonByteArray() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();

byte[] keyDeserExceptionBytes = SerializationTestUtils.header(true);
Header keyHeader = SerializationTestUtils.deserializationHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER,
keyDeserExceptionBytes);
byte[] valueDeserExceptionBytes = SerializationTestUtils.header(false);
Header valueHeader = SerializationTestUtils.deserializationHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER,
valueDeserExceptionBytes);
Headers headers = new RecordHeaders(
new Header[] { keyHeader, valueHeader });
Map<String, Object> springHeaders = new HashMap<>();
mapper.toHeaders(headers, springHeaders);
assertThat(springHeaders.get(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(keyHeader);
assertThat(springHeaders.get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isEqualTo(valueHeader);

LogAccessor logger = new LogAccessor(this.getClass());

DeserializationException keyDeserializationException = SerializationUtils.byteArrayToDeserializationException(logger, keyHeader);
assertThat(keyDeserExceptionBytes).containsExactly(SerializationTestUtils.header(keyDeserializationException));

DeserializationException valueDeserializationException =
SerializationUtils.byteArrayToDeserializationException(logger, valueHeader);
assertThat(valueDeserExceptionBytes).containsExactly(SerializationTestUtils.header(valueDeserializationException));

headers = new RecordHeaders();
mapper.fromHeaders(new MessageHeaders(springHeaders), headers);
assertThat(headers.lastHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER)).isNull();
assertThat(headers.lastHeader(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER)).isNull();
}

public static final class Foo {

private String bar = "bar";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,16 @@

package org.springframework.kafka.support.serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.UncheckedIOException;

import org.apache.kafka.common.header.Header;

/**
* @author Gary Russell
* @author Soby Chacko
* @since 2.9.11
*
*/
Expand All @@ -32,4 +38,25 @@ public static Header deserializationHeader(String key, byte[] value) {
return new DeserializationExceptionHeader(key, value);
}

public static byte[] header(boolean isKey) {
return header(createDeserEx(isKey));
}

public static DeserializationException createDeserEx(boolean isKey) {
return new DeserializationException(
isKey ? "testK" : "testV",
isKey ? "key".getBytes() : "value".getBytes(), isKey, null);
}

public static byte[] header(DeserializationException deserEx) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
new ObjectOutputStream(baos).writeObject(deserEx);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return baos.toByteArray();
}

}

0 comments on commit 87a577e

Please sign in to comment.