From 99f1c45c51c8336549455edcc3dc639fe2e79628 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 9 Oct 2024 10:14:24 -0400 Subject: [PATCH] GH-3544: Support SpEL in @KafkaListener containerPostProcessor (#3548) Fixes: #3544 https://github.com/spring-projects/spring-kafka/issues/3544 - Enhance resolveContainerPostProcessor method in KafkaListenerAnnotationBeanPostProcessor to evaluate SpEL expressions - Verify containerPostProcessor property in KafkaListener annotation can be specified as a SpEL expression (cherry picked from commit 3f45fc01f2db6a083b94930e71a6ac359bdc9dfa) --- .../KafkaListenerAnnotationBeanPostProcessor.java | 15 ++++++++++----- .../listener/ContainerCustomizationTests.java | 5 +++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 836342bbe9..00a0889cef 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -745,11 +745,16 @@ private KafkaListenerContainerFactory resolveContainerFactory(KafkaListener k private void resolveContainerPostProcessor(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener) { - - String containerPostProcessor = kafkaListener.containerPostProcessor(); - if (StringUtils.hasText(containerPostProcessor)) { - endpoint.setContainerPostProcessor(this.beanFactory.getBean(containerPostProcessor, - ContainerPostProcessor.class)); + Object containerPostProcessor = resolveExpression(kafkaListener.containerPostProcessor()); + if (containerPostProcessor instanceof ContainerPostProcessor cpp) { + endpoint.setContainerPostProcessor(cpp); + } + else { + String containerPostProcessorBeanName = resolveExpressionAsString(kafkaListener.containerPostProcessor(), "containerPostProcessor"); + if (StringUtils.hasText(containerPostProcessorBeanName)) { + endpoint.setContainerPostProcessor( + this.beanFactory.getBean(containerPostProcessorBeanName, ContainerPostProcessor.class)); + } } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerCustomizationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerCustomizationTests.java index b93ff04406..ce8f5f368d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerCustomizationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerCustomizationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,6 +43,7 @@ * Tests for container customizations. * * @author Francois Rosiere + * @author Soby Chacko * @since 3.1 */ @SuppressWarnings("unused") @@ -129,7 +130,7 @@ public void postProcessor(String foo) { id = CONTAINER_CUSTOMIZER_AND_POST_PROCESSOR, topics = TOPIC, containerFactory = "containerFactoryWithCustomizer", - containerPostProcessor = "infoContainerPostProcessor") + containerPostProcessor = "#{__listener.infoContainerPostProcessor}") public void containerCustomizerAndPostProcessor(String foo) { }