Exception when combining kafka blocking/non-blocking retries #2658
-
Hello, Currently, i'm configuring both blocking/non-blocking so according to the documentation i created the following Configuration @Slf4j
@Configuration
@EnableKafka
public class KafkaRetryConfig extends RetryTopicConfigurationSupport {
@Value("${kafka.retry.maxRetryAttempts}")
private int maxRetryAtempts;
@Value("${kafka.retry.retryIntervalMilliseconds}")
private int retryIntervalMillis;
@Value("${kafka.retry.timeout}")
private int retryTimeout;
@Value("${kafka.topic.name}")
private String topicName;
private static final String RETRY_TOPIC_SUFFIX = ".retry";
private static final String DLT_TOPIC_SUFFIX = ".DLT";
@Bean
public RetryTopicConfiguration attRetryableTopic(@Qualifier("retryableKafkaTemplate")KafkaTemplate<String, CustomMessage> retryableKafkaTemplate) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(retryIntervalMillis)
.maxAttempts(maxRetryAtempts)
.timeoutAfter(retryTimeout)
.includeTopic(topicName)
.listenerFactory("retryKafkaListenerContainerFactory")
.sameIntervalTopicReuseStrategy(SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
.retryTopicSuffix(RETRY_TOPIC_SUFFIX)
.dltSuffix(DLT_TOPIC_SUFFIX)
.doNotAutoCreateRetryTopics()
.create(retryableKafkaTemplate);
}
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(WebClientRequestException.class)
.backOff(new FixedBackOff(retryIntervalMillis, maxRetryAtempts));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.addAll(Arrays.asList(MessageHandlingException.class, SerializationException.class, NullPointerException.class,
IllegalArgumentException.class, DataIntegrityViolationException.class, DeserializationException.class));
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setRetainAllRetryHeaderValues(false);
}
} for some reason i got the following error: Could some give me insight about what's missing? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 8 replies
-
Simply add a See the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#programmatic-construction |
Beta Was this translation helpful? Give feedback.
Simply add a
TaskScheduler
bean - it is used to schedule partition resumption after the delay.See the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#programmatic-construction