Skip to content

Commit

Permalink
GH-3032: Add RecoveryCallback to RetryingDeserializer
Browse files Browse the repository at this point in the history
Fixes: #3032

**Auto-cherry-pick to `3.1.x`**
  • Loading branch information
sobychacko authored Feb 15, 2024
1 parent e7e77b9 commit 284a188
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
----

Starting with version `3.1.2`, a `RecoveryCallback` can be set on the `RetryingDeserializer` optionally.

Refer to the https://github.com/spring-projects/spring-retry[spring-retry] project for configuration of the `RetryTemplate` with a retry policy, back off policy, etc.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;

import org.springframework.lang.Nullable;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryOperations;
import org.springframework.util.Assert;

Expand All @@ -30,44 +32,54 @@
* deserialization in case of transient errors.
*
* @param <T> Type to be deserialized into.
*
* @author Gary Russell
* @author Wang Zhiyang
*
* @author Soby Chacko
* @since 2.3
*
*/
public class RetryingDeserializer<T> implements Deserializer<T> {

private final Deserializer<T> delegate;

private final RetryOperations retryOperations;

@Nullable
private RecoveryCallback<T> recoveryCallback;

public RetryingDeserializer(Deserializer<T> delegate, RetryOperations retryOperations) {
Assert.notNull(delegate, "the 'delegate' deserializer cannot be null");
Assert.notNull(retryOperations, "the 'retryOperations' deserializer cannot be null");
this.delegate = delegate;
this.retryOperations = retryOperations;
}

/**
* Set a recovery callback to execute when the retries are exhausted.
* @param recoveryCallback {@link RecoveryCallback} to execute
* @since 3.1.2
*/
public void setRecoveryCallback(@Nullable RecoveryCallback<T> recoveryCallback) {
this.recoveryCallback = recoveryCallback;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.delegate.configure(configs, isKey);
}

@Override
public T deserialize(String topic, byte[] data) {
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data));
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, data), this.recoveryCallback);
}

@Override
public T deserialize(String topic, Headers headers, byte[] data) {
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data));
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback);
}

@Override
public T deserialize(String topic, Headers headers, ByteBuffer data) {
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data));
return this.retryOperations.execute(context -> this.delegate.deserialize(topic, headers, data), this.recoveryCallback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.springframework.kafka.support.serializer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
Expand All @@ -28,19 +31,20 @@
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;

import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;

/**
* @author Gary Russell
* @author Wang Zhiyang
*
* @author Soby Chacko
* @since 2.3
*
*/
public class RetryingDeserializerTests {
class RetryingDeserializerTests {

@Test
void testRetry() {
void basicRetryingDeserializer() {
Deser delegate = new Deser();
RetryingDeserializer<String> rdes = new RetryingDeserializer<>(delegate, new RetryTemplate());
assertThat(rdes.deserialize("foo", "bar".getBytes())).isEqualTo("bar");
Expand All @@ -54,6 +58,17 @@ void testRetry() {
rdes.close();
}

@Test
void retryingDeserializerWithRecoveryCallback() throws Exception {
RetryingDeserializer<String> rdes = new RetryingDeserializer<>((s, b) -> {
throw new RuntimeException();
}, new RetryTemplate());
RecoveryCallback<String> recoveryCallback = mock();
rdes.setRecoveryCallback(recoveryCallback);
rdes.deserialize("my-topic", "my-data".getBytes());
verify(recoveryCallback).recover(any(RetryContext.class));
}

public static class Deser implements Deserializer<String> {

int n;
Expand Down

0 comments on commit 284a188

Please sign in to comment.