Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dltHandlerMethod field to @RetryableTopic annotation matching RetryTopicConfigurationBuilder #3183

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.kafka.support.EndpointHandlerMethod;

/**
* Annotation to configure the {@link org.springframework.kafka.support.EndpointHandlerMethod} for handling the DLT,
* as part of {@link RetryableTopic} annotation configuration.
* This is equivalent to the builder way of {@link org.springframework.kafka.retrytopic.RetryTopicConfigurer}
* configuration via {@link org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder#dltHandlerMethod(EndpointHandlerMethod)}.
*
* @author Joo Hyuk Kim
*
* @since 3.2
*
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder#dltHandlerMethod(String, String)
*
*/
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DltHandlerMethod {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, thanks for looking into this!

I wonder if, according to the annotation-driven model, it would be more natural to have something like this instead:

@DltHandler
public void handleFailed(Object message) {
}

in the same class where we have that @RetryableTopic.

See similar model with @Retryable and @Recover in Spring Retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it can be subjective as to what natural is 🤔.

Copy link
Contributor Author

@JooHyukKim JooHyukKim Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into how @RetryableTopic is implemented, @RetryableTopic is really is just annotation-representation of RetryTopicConfigurationBuilder which gets constructed using the RetryTopicConfigurationBuilder itself during startup.

This is why it also sort of felt natural (unfortunately 😅) to make current proprosal (PR). We are trying to resolve this asymmetry between the two approaches.

  • With RetryTopicConfiguration, we only need a RetryTopicConfigurationBuilder to configure both in one go, builder-style
  • With @RetryableTopic, we are restricted to same-class, same method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But still, maintainer's design perspective also counts alot, so lemme know what you think, @artembilan! ✌🏼✌🏼

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you said before, the RetryTopicConfiguration is global. The @RetryableTopic is for specific service, therefore it feels logical that DLT handler must be that service-specific as well.
So, when you encapsulate logic for your @RetryableTopic, you definitely might go the way to encapsulate DLT over there as well.
Therefore, in most cases, your new annotation would point to the same service and method in there.
Let's just stick with best practice how to design this kind of services!

Copy link
Contributor Author

@JooHyukKim JooHyukKim Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. yes I said the RetryTopicConfiguration is global. At the same time, I also believe that @RetryableTopic being for specific service is of capability/choice, not as restriction? That much of freedom feels reasonable knowing that DLT can already be configured via RetryTopicConfigurationBuilder.

And as per encapsulation of DLT, we have RetryTopicConfigurationBuilder which already allows configuration of DLT, meaning retry and DLT's can be associated?

This PR is not introducing a new dependency between two isolated concepts -between Retry and DLT, but already existing capability of one-go-configuration via RetryTopicConfiguration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, again: with an annotation we already have a scope of the method where it is declared, so comparing RetryTopicConfigurationBuilder and annotation configuration is not really one-to-one.
Therefore, if we go the way of annotation DLT, then I'd prefer to have it in a scope where its @RetryableTopic counterpart is declared.
Plus we already have a positive experience with the patter we have in Spring Retry.
Or even see here @KafkaHandler and its isDefault() attribute.
Another example is a @RestController with its @..Mapping and @ExceptionHandler.

So, the general point is: if we have a service and give some its method a specific @KafkaListener aspect, the other related aspects must be declared in this service as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the insightful explanation🙏🏽🙏🏽

Took me a while to realize the differences between the annotation and builder way, and how it all fits overall design of Spring libraries.

It also makes sense to consider the positive feedback for the current state implementation👍🏼. Thanks again @artembilan!


/**
* The bean name of the DLT handler method.
*
* @return the bean name.
*/
String beanName() default "";

/**
* The method name of the DLT handler method.
*
* @return the method name.
*/
String methodName() default "";

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
* @author João Lima
* @author Adrian Chlebosz
* @author Wang Zhiyang
* @author Joo Hyuk Kim
*
* @since 2.7
*
Expand Down Expand Up @@ -222,4 +223,13 @@
*/
String concurrency() default "";

/**
* Configure the DLT handler method.
* Note that this will override the {@link DltHandler}-annotated method in the same declared class.
*
* @return the DLT handler method configuration.
* @since 3.2
*/
DltHandlerMethod dltHandlerMethod() default @DltHandlerMethod;

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* @author Gary Russell
* @author Adrian Chlebosz
* @author Wang Zhiyang
* @author Joo Hyuk Kim
*
* @since 2.7
*
Expand Down Expand Up @@ -149,7 +150,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Class<?> clazz
.customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory))
.retryTopicSuffix(resolveExpressionAsString(annotation.retryTopicSuffix(), "retryTopicSuffix"))
.dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix"))
.dltHandlerMethod(getDltProcessor(clazz, bean))
.dltHandlerMethod(getDltProcessor(annotation.dltHandlerMethod(), clazz, bean))
.includeTopics(Arrays.asList(topics))
.listenerFactory(resolveExpressionAsString(annotation.listenerContainerFactory(), "listenerContainerFactory"))
.autoCreateTopics(resolveExpressionAsBoolean(annotation.autoCreateTopics(), "autoCreateTopics"),
Expand Down Expand Up @@ -227,6 +228,13 @@ private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnn
.collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions())));
}

private EndpointHandlerMethod getDltProcessor(DltHandlerMethod dltHandlerMethod, Class<?> clazz, Object bean) {
if (StringUtils.hasText(dltHandlerMethod.beanName()) && StringUtils.hasText(dltHandlerMethod.methodName())) {
return new EndpointHandlerMethod(dltHandlerMethod.beanName(), dltHandlerMethod.methodName());
}
return getDltProcessor(clazz, bean);
}

private EndpointHandlerMethod getDltProcessor(Class<?> clazz, Object bean) {
return Arrays.stream(ReflectionUtils.getDeclaredMethods(clazz))
.filter(method -> AnnotationUtils.findAnnotation(method, DltHandler.class) != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.DltHandlerMethod;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
Expand Down Expand Up @@ -94,7 +95,10 @@
* @author Tomaz Fernandes
* @author Gary Russell
* @author Wang Zhiyang
* @author Joo Hyuk Kim
*
* @since 2.7
*
*/
@SpringJUnitConfig
@DirtiesContext
Expand All @@ -121,6 +125,8 @@ public class RetryTopicIntegrationTests {

public final static String MANUAL_TOPIC = "myRetryTopic6";

public final static String ANNOTATION_DLT_HANDLER_TOPIC = "myRetryTopic7-dlt";

public final static String NOT_RETRYABLE_EXCEPTION_TOPIC = "noRetryTopic";

public final static String FIRST_REUSE_RETRY_TOPIC = "reuseRetry1";
Expand Down Expand Up @@ -305,6 +311,13 @@ public void shouldGoStraightToDlt() {
assertThat(awaitLatch(latchContainer.countDownLatchDltTwo)).isTrue();
}

@Test
public void shouldGoStraightToAnnotatedDlt() {
logger.debug("Sending message to topic " + ANNOTATION_DLT_HANDLER_TOPIC);
kafkaTemplate.send(ANNOTATION_DLT_HANDLER_TOPIC, "Testing topic with annotation");
assertThat(awaitLatch(latchContainer.countDownLatchAnnotationDltHandlerFive)).isTrue();
}

private boolean awaitLatch(CountDownLatch latch) {
try {
return latch.await(60, TimeUnit.SECONDS);
Expand Down Expand Up @@ -507,6 +520,32 @@ public void annotatedDltMethod(Object message) {
}
}

@Component
static class AnnotatedDltHandlerListener {

@Autowired
CountDownLatchContainer container;

@RetryableTopic(dltHandlerMethod = @DltHandlerMethod(beanName = "annotationDltHandlerService", methodName = "handle"))
@KafkaListener(topics = ANNOTATION_DLT_HANDLER_TOPIC, containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
public void listenWithAnnotation3(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
logger.info("Message {} received by AnnotatedDltHandlerListener in topic {} ", message, receivedTopic);
throw new MyDontRetryException(ANNOTATION_DLT_HANDLER_TOPIC + " dlt handler woooops... " + receivedTopic);
}
}

@Component
static class AnnotationDltHandlerService {

@Autowired
CountDownLatchContainer container;

public void handle(Object message) {
logger.info("Received message in AnnotationDltHandlerService!");
container.countDownLatchAnnotationDltHandlerFive.countDown();
}
}

@Component
static class FirstReuseRetryTopicListener {

Expand Down Expand Up @@ -594,6 +633,8 @@ static class CountDownLatchContainer {

CountDownLatch countDownLatchDltFour = new CountDownLatch(1);

CountDownLatch countDownLatchAnnotationDltHandlerFive = new CountDownLatch(1);

CountDownLatch countDownLatchReuseOne = new CountDownLatch(2);

CountDownLatch countDownLatchReuseTwo = new CountDownLatch(5);
Expand Down Expand Up @@ -766,6 +807,16 @@ CountDownLatchContainer latchContainer() {
MyCustomDltProcessor myCustomDltProcessor() {
return new MyCustomDltProcessor();
}

@Bean
AnnotatedDltHandlerListener annotatedDltHandlerListener() {
return new AnnotatedDltHandlerListener();
}

@Bean
AnnotationDltHandlerService annotationDltHandlerService() {
return new AnnotationDltHandlerService();
}
}

@Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.DltHandlerMethod;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor;
Expand All @@ -57,6 +58,7 @@
* @author Gary Russell
* @author Adrian Chlebosz
* @author Wang Zhiyang
* @author Joo Hyuk Kim
*
* @since 2.7
*/
Expand Down Expand Up @@ -95,6 +97,15 @@ class RetryableTopicAnnotationProcessorTests {

private final Object beanWithDlt = createBean();

// Retry including DLT annotation
private final Method retryIncludingDlt = ReflectionUtils
.findMethod(RetryableTopicIncludingDlt.class, listenerMethodName);

private final RetryableTopic annotationIncludingDlt = AnnotationUtils.findAnnotation(retryIncludingDlt,
RetryableTopic.class);

private final Object beanIncludingDlt = createBean();

// Retry without DLT
private final Method listenWithRetry = ReflectionUtils.findMethod(RetryableTopicAnnotationFactory.class,
listenerMethodName);
Expand Down Expand Up @@ -377,6 +388,22 @@ void shouldCreateExceptionBasedDltRoutingSpec(boolean isMethod) {
assertThat(destinationTopicProperties.get(2).suffix()).isEqualTo("-dlt");
}

@Test
void dltHandlerMethodInsideRetryableTopicShouldOverrideDltHandlerAnnotatedMethod() {
// setup
given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class))
.willReturn(kafkaOperationsFromDefaultName);
RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory);

// given
RetryTopicConfiguration configuration = processor.processAnnotation(topics, retryIncludingDlt,
annotationIncludingDlt, beanIncludingDlt);

// then
EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod();
assertThat(dltHandlerMethod.getMethodName()).isEqualTo("yesHandleDlt");
}

static class RetryableTopicAnnotationFactory {

@KafkaListener
Expand Down Expand Up @@ -419,6 +446,20 @@ void handleDlt() {

}

static class RetryableTopicIncludingDlt {

@KafkaListener
@RetryableTopic(dltHandlerMethod = @DltHandlerMethod(beanName = "dltHandlerBean", methodName = "yesHandleDlt"))
void listenWithRetry() {
// NoOps
}

@DltHandler
void noHandleDlt() {
// NoOps
}
}

static class RetryableTopicAnnotationFactoryWithCustomDltRouting {
@KafkaListener
@RetryableTopic(
Expand Down
Loading