diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc index d41fa77f61..a2616389fd 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/serdes.adoc @@ -397,6 +397,8 @@ ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs, new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate)); ---- +Starting with version `3.1.2`, a `RecoveryCallback` can be set on the `RetryingDeserializer` optionally. + Refer to the https://github.com/spring-projects/spring-retry[spring-retry] project for configuration of the `RetryTemplate` with a retry policy, back off policy, etc. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java index 5e2e7df137..fe01c2449e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/RetryingDeserializer.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; +import org.springframework.lang.Nullable; +import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryOperations; import org.springframework.util.Assert; @@ -30,12 +32,10 @@ * deserialization in case of transient errors. * * @param Type to be deserialized into. - * * @author Gary Russell * @author Wang Zhiyang - * + * @author Soby Chacko * @since 2.3 - * */ public class RetryingDeserializer implements Deserializer { @@ -43,6 +43,9 @@ public class RetryingDeserializer implements Deserializer { private final RetryOperations retryOperations; + @Nullable + private RecoveryCallback recoveryCallback; + public RetryingDeserializer(Deserializer delegate, RetryOperations retryOperations) { Assert.notNull(delegate, "the 'delegate' deserializer cannot be null"); Assert.notNull(retryOperations, "the 'retryOperations' deserializer cannot be null"); @@ -50,6 +53,15 @@ public RetryingDeserializer(Deserializer delegate, RetryOperations retryOpera this.retryOperations = retryOperations; } + /** + * Set a recovery callback to execute when the retries are exhausted. + * @param recoveryCallback {@link RecoveryCallback} to execute + * @since 3.1.2 + */ + public void setRecoveryCallback(@Nullable RecoveryCallback recoveryCallback) { + this.recoveryCallback = recoveryCallback; + } + @Override public void configure(Map configs, boolean isKey) { this.delegate.configure(configs, isKey); @@ -57,17 +69,17 @@ public void configure(Map configs, boolean isKey) { @Override public T deserialize(String topic, byte[] data) { - return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data)); + return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data), this.recoveryCallback); } @Override public T deserialize(String topic, Headers headers, byte[] data) { - return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data)); + return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback); } @Override public T deserialize(String topic, Headers headers, ByteBuffer data) { - return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data)); + return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback); } @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java index 837471711f..fb1d3846d1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/RetryingDeserializerTests.java @@ -17,6 +17,9 @@ package org.springframework.kafka.support.serializer; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -28,19 +31,20 @@ import org.apache.kafka.common.utils.Utils; import org.junit.jupiter.api.Test; +import org.springframework.retry.RecoveryCallback; +import org.springframework.retry.RetryContext; import org.springframework.retry.support.RetryTemplate; /** * @author Gary Russell * @author Wang Zhiyang - * + * @author Soby Chacko * @since 2.3 - * */ -public class RetryingDeserializerTests { +class RetryingDeserializerTests { @Test - void testRetry() { + void basicRetryingDeserializer() { Deser delegate = new Deser(); RetryingDeserializer rdes = new RetryingDeserializer<>(delegate, new RetryTemplate()); assertThat(rdes.deserialize("foo", "bar".getBytes())).isEqualTo("bar"); @@ -54,6 +58,17 @@ void testRetry() { rdes.close(); } + @Test + void retryingDeserializerWithRecoveryCallback() throws Exception { + RetryingDeserializer rdes = new RetryingDeserializer<>((s, b) -> { + throw new RuntimeException(); + }, new RetryTemplate()); + RecoveryCallback recoveryCallback = mock(); + rdes.setRecoveryCallback(recoveryCallback); + rdes.deserialize("my-topic", "my-data".getBytes()); + verify(recoveryCallback).recover(any(RetryContext.class)); + } + public static class Deser implements Deserializer { int n;