diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/DltHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/DltHandler.java index 272c363d41..f69c11eca4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/DltHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/DltHandler.java @@ -28,8 +28,9 @@ /** * Annotation to determine the method that should process the DLT topic message. - * The method can have the same parameters as a {@link KafkaListener} method can (Message, Acknowledgement, etc). + * The method can have the same parameters as a {@link KafkaListener} method can have (Message, Acknowledgement, etc). * + *

* The annotated method must be in the same class as the corresponding {@link KafkaListener} annotation. * * @author Tomaz Fernandes diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafkaRetryTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafkaRetryTopic.java index 958a1144ae..a798c76c3f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafkaRetryTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafkaRetryTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -55,6 +55,7 @@ * bean. This annotation is meta-annotated with {@code @EnableKafka} so it is not * necessary to specify both. * + *

* To configure the feature's components, extend the * {@link RetryTopicConfigurationSupport} class and override the appropriate methods on a * {@link Configuration @Configuration} class, such as: diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java index fdb3fbef2a..bfa45ea6a6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java @@ -20,6 +20,7 @@ import org.springframework.core.MethodParameter; import org.springframework.kafka.support.KafkaNull; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver; @@ -59,7 +60,7 @@ public Object resolveArgument(MethodParameter parameter, Message message) thr } @Override - protected boolean isEmptyPayload(Object payload) { + protected boolean isEmptyPayload(@Nullable Object payload) { return payload == null || payload instanceof KafkaNull; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java index cdd72ae10e..230c41c61d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java @@ -33,6 +33,7 @@ import org.springframework.core.annotation.RepeatableContainers; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.retrytopic.RetryTopicConfiguration; +import org.springframework.lang.Nullable; /** @@ -42,9 +43,11 @@ * one from a {@link RetryableTopic} annotation, or from the bean container if no * annotation is available. * + *

* If beans are found in the container there's a check to determine whether or not the * provided topics should be handled by any of such instances. * + *

* If the annotation is provided, a * {@link org.springframework.kafka.annotation.DltHandler} annotated method is looked up. * @@ -58,10 +61,13 @@ */ public class RetryTopicConfigurationProvider { + @Nullable private final BeanFactory beanFactory; + @Nullable private final BeanExpressionResolver resolver; + @Nullable private final BeanExpressionContext expressionContext; private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryTopicConfigurationProvider.class)); @@ -71,7 +77,7 @@ public class RetryTopicConfigurationProvider { * expression context. * @param beanFactory the bean factory. */ - public RetryTopicConfigurationProvider(BeanFactory beanFactory) { + public RetryTopicConfigurationProvider(@Nullable BeanFactory beanFactory) { this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory ? new BeanExpressionContext((ConfigurableBeanFactory) beanFactory, null) : null); // NOSONAR @@ -83,13 +89,14 @@ public RetryTopicConfigurationProvider(BeanFactory beanFactory) { * @param resolver the bean expression resolver. * @param expressionContext the bean expression context. */ - public RetryTopicConfigurationProvider(BeanFactory beanFactory, BeanExpressionResolver resolver, - BeanExpressionContext expressionContext) { + public RetryTopicConfigurationProvider(@Nullable BeanFactory beanFactory, @Nullable BeanExpressionResolver resolver, + @Nullable BeanExpressionContext expressionContext) { this.beanFactory = beanFactory; this.resolver = resolver; this.expressionContext = expressionContext; } + @Nullable public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method method, Object bean) { RetryableTopic annotation = MergedAnnotations.from(method, SearchStrategy.TYPE_HIERARCHY, RepeatableContainers.none()) @@ -102,6 +109,7 @@ public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method : maybeGetFromContext(topics); } + @Nullable private RetryTopicConfiguration maybeGetFromContext(String[] topics) { if (this.beanFactory == null || !ListableBeanFactory.class.isAssignableFrom(this.beanFactory.getClass())) { LOGGER.warn("No ListableBeanFactory found, skipping RetryTopic configuration."); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index db8c10e01e..5b716bf719 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -19,7 +19,9 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Objects; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanInitializationException; @@ -38,6 +40,7 @@ import org.springframework.kafka.retrytopic.RetryTopicConfigurer; import org.springframework.kafka.retrytopic.RetryTopicConstants; import org.springframework.kafka.support.EndpointHandlerMethod; +import org.springframework.lang.Nullable; import org.springframework.retry.annotation.Backoff; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy; @@ -70,10 +73,13 @@ public class RetryableTopicAnnotationProcessor { private static final String CSQ_FOR_OSQ = "] for ["; + @Nullable private final BeanFactory beanFactory; + @Nullable private final BeanExpressionResolver resolver; + @Nullable private final BeanExpressionContext expressionContext; /** @@ -93,8 +99,8 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory) { * @param resolver the bean expression resolver. * @param expressionContext the bean expression context. */ - public RetryableTopicAnnotationProcessor(BeanFactory beanFactory, BeanExpressionResolver resolver, - BeanExpressionContext expressionContext) { + public RetryableTopicAnnotationProcessor(@Nullable BeanFactory beanFactory, @Nullable BeanExpressionResolver resolver, + @Nullable BeanExpressionContext expressionContext) { this.beanFactory = beanFactory; this.resolver = resolver; @@ -116,12 +122,10 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, boolean traverse = false; if (StringUtils.hasText(annotation.traversingCauses())) { Boolean traverseResolved = resolveExpressionAsBoolean(annotation.traversingCauses(), "traversingCauses"); - if (traverseResolved != null) { - traverse = traverseResolved; - } - else { - traverse = !includes.isEmpty() || !excludes.isEmpty(); - } + traverse = Objects.requireNonNullElseGet( + traverseResolved, + () -> !includes.isEmpty() || !excludes.isEmpty() + ); } Boolean autoStartDlt = null; if (StringUtils.hasText(annotation.autoStartDltHandler())) { @@ -157,9 +161,11 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, return builder.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics)); } - private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, BeanFactory beanFactory) { // NOSONAR + private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, @Nullable BeanFactory beanFactory) { // NOSONAR StandardEvaluationContext evaluationContext = new StandardEvaluationContext(); - evaluationContext.setBeanResolver(new BeanFactoryResolver(beanFactory)); + if (beanFactory != null) { + evaluationContext.setBeanResolver(new BeanFactoryResolver(beanFactory)); + } // Code from Spring Retry Long min = backoff.delay() == 0 ? backoff.value() : backoff.delay(); @@ -210,7 +216,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean .orElse(RetryTopicConfigurer.DEFAULT_DLT_HANDLER); } - private KafkaOperations getKafkaTemplate(String kafkaTemplateName, String[] topics) { + private KafkaOperations getKafkaTemplate(@Nullable String kafkaTemplateName, String[] topics) { if (StringUtils.hasText(kafkaTemplateName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by bean name"); try { @@ -236,6 +242,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean } } + @Nullable private String resolveExpressionAsString(String value, String attribute) { Object resolved = resolveExpression(value); if (resolved instanceof String str) { @@ -248,6 +255,7 @@ else if (resolved != null) { return null; } + @Nullable private Integer resolveExpressionAsInteger(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Integer result = null; @@ -268,6 +276,8 @@ else if (resolved != null || required) { return result; } + @SuppressWarnings("SameParameterValue") + @Nullable private Short resolveExpressionAsShort(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Short result = null; @@ -288,6 +298,7 @@ else if (resolved != null || required) { return result; } + @Nullable private Long resolveExpressionAsLong(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Long result = null; @@ -308,6 +319,8 @@ else if (resolved != null || required) { return result; } + @SuppressWarnings("SameParameterValue") + @Nullable private Double resolveExpressionAsDouble(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Double result = null; @@ -328,6 +341,7 @@ else if (resolved != null || required) { return result; } + @Nullable private Boolean resolveExpressionAsBoolean(String value, String attribute) { Object resolved = resolveExpression(value); Boolean result = null; @@ -349,7 +363,8 @@ else if (resolved != null) { private List> resolveClasses(Class[] fromAnnot, String[] names, String type) { - List> classes = new ArrayList<>(Arrays.asList(fromAnnot)); + List> classes = new ArrayList<>(fromAnnot.length + names.length); + Collections.addAll(classes, fromAnnot); try { for (String name : names) { Class clazz = ClassUtils.forName(name, ClassUtils.getDefaultClassLoader()); @@ -365,8 +380,9 @@ private List> resolveClasses(Class exceptionType) { * @author Gary Russell * */ + @SuppressWarnings("serial") private static final class ExtendedBinaryExceptionClassifier extends BinaryExceptionClassifier { ExtendedBinaryExceptionClassifier(Map, Boolean> typeMap, boolean defaultValue) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java index 2d6c5bb160..94c9f49c5c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.kafka.listener.ListenerContainerRegistry; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; +import org.springframework.lang.Nullable; /** * Provide the component instances that will be used with @@ -154,7 +155,7 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() { * @param applicationContext the application context. * @return the instance. */ - public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry, + public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(@Nullable ListenerContainerRegistry registry, ApplicationContext applicationContext) { return new ContainerPartitionPausingBackOffManagerFactory(registry, applicationContext); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java index 4e79a851c8..5d2fe41cf6 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -310,7 +310,7 @@ protected Consumer configureDestinationTopicResolver() @Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME) public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext, @Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) - ListenerContainerRegistry registry, + @Nullable ListenerContainerRegistry registry, ObjectProvider componentFactoryProvider, @Nullable RetryTopicSchedulerWrapper wrapper, @Nullable TaskScheduler taskScheduler) { @@ -325,7 +325,7 @@ public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContex } private void configurePartitionPausingFactory(ContainerPartitionPausingBackOffManagerFactory factory, - ListenerContainerRegistry registry, @Nullable TaskScheduler scheduler) { + @Nullable ListenerContainerRegistry registry, @Nullable TaskScheduler scheduler) { Assert.notNull(scheduler, "Either a RetryTopicSchedulerWrapper or TaskScheduler bean is required"); factory.setBackOffHandler(new ContainerPausingBackOffHandler( @@ -346,8 +346,10 @@ protected RetryTopicComponentFactory createComponentFactory() { */ public static class BlockingRetriesConfigurer { + @Nullable private BackOff backOff; + @Nullable private Class[] retryableExceptions; /** @@ -378,10 +380,12 @@ public BlockingRetriesConfigurer backOff(BackOff backoff) { return this; } + @Nullable BackOff getBackOff() { return this.backOff; } + @Nullable Class[] getRetryableExceptions() { return this.retryableExceptions; } @@ -393,10 +397,13 @@ Class[] getRetryableExceptions() { */ public static class CustomizersConfigurer { + @Nullable private Consumer errorHandlerCustomizer; + @Nullable private Consumer> listenerContainerCustomizer; + @Nullable private Consumer deadLetterPublishingRecovererCustomizer; /** @@ -406,6 +413,7 @@ public static class CustomizersConfigurer { * @return the configurer. * @see DefaultErrorHandler */ + @SuppressWarnings("unused") public CustomizersConfigurer customizeErrorHandler(Consumer errorHandlerCustomizer) { this.errorHandlerCustomizer = errorHandlerCustomizer; return this; @@ -433,14 +441,17 @@ public CustomizersConfigurer customizeDeadLetterPublishingRecoverer(Consumer getErrorHandlerCustomizer() { return this.errorHandlerCustomizer; } + @Nullable Consumer> getListenerContainerCustomizer() { return this.listenerContainerCustomizer; } + @Nullable Consumer getDeadLetterPublishingRecovererCustomizer() { return this.deadLetterPublishingRecovererCustomizer; }