Skip to content

Commit

Permalink
GH-3463: Allow @DltHandler on super class (#3464)
Browse files Browse the repository at this point in the history
Fixes: #3463

Currently, a `@DltHandler`-annotated method must be in the same class as the corresponding `@KafkaListener` annotation.
Some logic might be the same for different `@KafkaListener` services.

* Use `MethodIntrospector` in the `RetryableTopicAnnotationProcessor` to be able to process methods from super classes as well

**Auto-cherry-pick to `3.2.x` & `3.1.x`**
  • Loading branch information
artembilan authored Aug 28, 2024
1 parent 1318092 commit 4dc0976
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.expression.BeanFactoryResolver;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.kafka.core.KafkaOperations;
Expand Down Expand Up @@ -64,6 +65,7 @@
* @author Gary Russell
* @author Adrian Chlebosz
* @author Wang Zhiyang
* @author Artem Bilan
*
* @since 2.7
*
Expand Down Expand Up @@ -228,8 +230,10 @@ private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnn
}

private EndpointHandlerMethod getDltProcessor(Class<?> clazz, Object bean) {
return Arrays.stream(ReflectionUtils.getDeclaredMethods(clazz))
.filter(method -> AnnotationUtils.findAnnotation(method, DltHandler.class) != null)
ReflectionUtils.MethodFilter selector =
(method) -> AnnotationUtils.findAnnotation(method, DltHandler.class) != null;
return MethodIntrospector.selectMethods(clazz, selector)
.stream()
.map(method -> RetryTopicConfigurer.createHandlerMethodWith(bean, method))
.findFirst()
.orElse(RetryTopicConfigurer.DEFAULT_DLT_HANDLER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
* Test class level non-blocking retries.
*
* @author Wang Zhiyang
* @author Artem Bilan
*
* @since 3.2
*/
Expand Down Expand Up @@ -227,7 +228,6 @@ void shouldRetryFifthTopicWithTwoListenersAndManualAssignment(@Autowired FifthTo
assertThat(awaitLatch(latchContainer.countDownLatch51)).isTrue();
assertThat(awaitLatch(latchContainer.countDownLatch52)).isTrue();
assertThat(awaitLatch(latchContainer.countDownLatchDltThree)).isTrue();
assertThat(awaitLatch(latchContainer.countDownLatchDltFour)).isTrue();
assertThat(listener1.topics).containsExactly(TWO_LISTENERS_TOPIC, TWO_LISTENERS_TOPIC
+ "-listener1-0", TWO_LISTENERS_TOPIC + "-listener1-1", TWO_LISTENERS_TOPIC + "-listener1-2",
TWO_LISTENERS_TOPIC + "-listener1-dlt");
Expand Down Expand Up @@ -387,6 +387,21 @@ public void shouldNotGetHere() {
}
}

static class AbstractFifthTopicListener {

final List<String> topics = Collections.synchronizedList(new ArrayList<>());

@Autowired
CountDownLatchContainer container;

@DltHandler
public void annotatedDltMethod(ConsumerRecord<?, ?> record) {
this.topics.add(record.topic());
container.countDownLatchDltThree.countDown();
}

}

@RetryableTopic(attempts = "4",
backoff = @Backoff(250),
numPartitions = "2",
Expand All @@ -397,12 +412,7 @@ public void shouldNotGetHere() {
@KafkaListener(id = "fifthTopicId1", topicPartitions = {@TopicPartition(topic = TWO_LISTENERS_TOPIC,
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))},
containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
static class FifthTopicListener1 {

final List<String> topics = Collections.synchronizedList(new ArrayList<>());

@Autowired
CountDownLatchContainer container;
static class FifthTopicListener1 extends AbstractFifthTopicListener {

@KafkaHandler
public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
Expand All @@ -411,12 +421,6 @@ public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_T
throw new RuntimeException("Annotated woooops... " + receivedTopic);
}

@DltHandler
public void annotatedDltMethod(ConsumerRecord<?, ?> record) {
this.topics.add(record.topic());
container.countDownLatchDltThree.countDown();
}

}

@RetryableTopic(attempts = "4",
Expand All @@ -429,12 +433,7 @@ public void annotatedDltMethod(ConsumerRecord<?, ?> record) {
@KafkaListener(id = "fifthTopicId2", topicPartitions = {@TopicPartition(topic = TWO_LISTENERS_TOPIC,
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))},
containerFactory = MAIN_TOPIC_CONTAINER_FACTORY)
static class FifthTopicListener2 {

final List<String> topics = Collections.synchronizedList(new ArrayList<>());

@Autowired
CountDownLatchContainer container;
static class FifthTopicListener2 extends AbstractFifthTopicListener {

@KafkaHandler
public void listenWithAnnotation2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
Expand All @@ -443,12 +442,6 @@ public void listenWithAnnotation2(String message, @Header(KafkaHeaders.RECEIVED_
throw new RuntimeException("Annotated woooops... " + receivedTopic);
}

@DltHandler
public void annotatedDltMethod(ConsumerRecord<?, ?> record) {
this.topics.add(record.topic());
container.countDownLatchDltFour.countDown();
}

}

@Component
Expand Down Expand Up @@ -575,9 +568,7 @@ static class CountDownLatchContainer {

CountDownLatch countDownLatchDltTwo = new CountDownLatch(1);

CountDownLatch countDownLatchDltThree = new CountDownLatch(1);

CountDownLatch countDownLatchDltFour = new CountDownLatch(1);
CountDownLatch countDownLatchDltThree = new CountDownLatch(2);

CountDownLatch countDownLatchReuseOne = new CountDownLatch(2);

Expand Down

0 comments on commit 4dc0976

Please sign in to comment.