diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index 673e62b95e..db8c10e01e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -82,8 +82,8 @@ public class RetryableTopicAnnotationProcessor { * @param beanFactory the bean factory. */ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory) { - this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory - ? new BeanExpressionContext((ConfigurableBeanFactory) beanFactory, null) + this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory cbf + ? new BeanExpressionContext(cbf, null) : null); // NOSONAR } @@ -101,7 +101,6 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory, BeanExpression this.expressionContext = expressionContext; } - @SuppressWarnings("deprecation") public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation, Object bean) { @@ -121,16 +120,14 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, traverse = traverseResolved; } else { - traverse = includes.size() > 0 || excludes.size() > 0; + traverse = !includes.isEmpty() || !excludes.isEmpty(); } } Boolean autoStartDlt = null; if (StringUtils.hasText(annotation.autoStartDltHandler())) { autoStartDlt = resolveExpressionAsBoolean(annotation.autoStartDltHandler(), "autoStartDltContainer"); } - return RetryTopicConfigurationBuilder.newInstance() - .maxAttempts(resolveExpressionAsInteger(annotation.attempts(), "attempts", true)) - .concurrency(resolveExpressionAsInteger(annotation.concurrency(), "concurrency", false)) + RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance() .customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory)) .retryTopicSuffix(resolveExpressionAsString(annotation.retryTopicSuffix(), "retryTopicSuffix")) .dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix")) @@ -147,8 +144,17 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, .autoStartDltHandler(autoStartDlt) .setTopicSuffixingStrategy(annotation.topicSuffixingStrategy()) .sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy()) - .timeoutAfter(timeout) - .create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics)); + .timeoutAfter(timeout); + + Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true); + if (attempts != null) { + builder.maxAttempts(attempts); + } + Integer concurrency = resolveExpressionAsInteger(annotation.concurrency(), "concurrency", false); + if (concurrency != null) { + builder.concurrency(concurrency); + } + return builder.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics)); } private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, BeanFactory beanFactory) { // NOSONAR @@ -173,9 +179,13 @@ private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, Be if (backoff.random()) { policy = new ExponentialRandomBackOffPolicy(); } - policy.setInitialInterval(min); + if (min != null) { + policy.setInitialInterval(min); + } policy.setMultiplier(multiplier); - policy.setMaxInterval(max > min ? max : ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL); + if (max != null && min != null && max > min) { + policy.setMaxInterval(max); + } return policy; } if (max != null && min != null && max > min) { @@ -200,7 +210,6 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean .orElse(RetryTopicConfigurer.DEFAULT_DLT_HANDLER); } - @SuppressWarnings("deprecation") private KafkaOperations getKafkaTemplate(String kafkaTemplateName, String[] topics) { if (StringUtils.hasText(kafkaTemplateName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by bean name"); @@ -213,6 +222,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean + " with id '" + kafkaTemplateName + "' was found in the application context", ex); } } + Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by default bean name"); try { return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class); @@ -228,8 +238,8 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean private String resolveExpressionAsString(String value, String attribute) { Object resolved = resolveExpression(value); - if (resolved instanceof String) { - return (String) resolved; + if (resolved instanceof String str) { + return str; } else if (resolved != null) { throw new IllegalStateException(THE_OSQ + attribute + "] must resolve to a String. " @@ -241,16 +251,13 @@ else if (resolved != null) { private Integer resolveExpressionAsInteger(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Integer result = null; - if (resolved instanceof String) { - if (!required && !StringUtils.hasText((String) resolved)) { - result = null; - } - else { - result = Integer.parseInt((String) resolved); + if (resolved instanceof String str) { + if (required || StringUtils.hasText(str)) { + result = Integer.parseInt(str); } } - else if (resolved instanceof Number) { - result = ((Number) resolved).intValue(); + else if (resolved instanceof Number num) { + result = num.intValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -264,16 +271,13 @@ else if (resolved != null || required) { private Short resolveExpressionAsShort(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Short result = null; - if (resolved instanceof String) { - if (!required && !StringUtils.hasText((String) resolved)) { - result = null; - } - else { - result = Short.parseShort((String) resolved); + if (resolved instanceof String str) { + if (required || StringUtils.hasText(str)) { + result = Short.parseShort(str); } } - else if (resolved instanceof Number) { - result = ((Number) resolved).shortValue(); + else if (resolved instanceof Number num) { + result = num.shortValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -287,16 +291,13 @@ else if (resolved != null || required) { private Long resolveExpressionAsLong(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Long result = null; - if (resolved instanceof String) { - if (!required && !StringUtils.hasText((String) resolved)) { - result = null; - } - else { - result = Long.parseLong((String) resolved); + if (resolved instanceof String str) { + if (required || StringUtils.hasText(str)) { + result = Long.parseLong(str); } } - else if (resolved instanceof Number) { - result = ((Number) resolved).longValue(); + else if (resolved instanceof Number num) { + result = num.longValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -310,16 +311,13 @@ else if (resolved != null || required) { private Double resolveExpressionAsDouble(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Double result = null; - if (resolved instanceof String) { - if (!required && !StringUtils.hasText((String) resolved)) { - result = null; - } - else { - result = Double.parseDouble((String) resolved); + if (resolved instanceof String str) { + if (required || StringUtils.hasText(str)) { + result = Double.parseDouble(str); } } - else if (resolved instanceof Number) { - result = ((Number) resolved).doubleValue(); + else if (resolved instanceof Number num) { + result = num.doubleValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -333,11 +331,11 @@ else if (resolved != null || required) { private Boolean resolveExpressionAsBoolean(String value, String attribute) { Object resolved = resolveExpression(value); Boolean result = null; - if (resolved instanceof Boolean) { - result = (Boolean) resolved; + if (resolved instanceof Boolean bool) { + result = bool; } - else if (resolved instanceof String) { - result = Boolean.parseBoolean((String) resolved); + else if (resolved instanceof String str) { + result = Boolean.parseBoolean(str); } else if (resolved != null) { throw new IllegalStateException( @@ -368,8 +366,8 @@ private List> resolveClasses(Class listenerContainerFactory; + @Nullable private String listenerContainerFactoryName; @Nullable @@ -206,7 +209,7 @@ public RetryTopicConfigurationBuilder excludeTopic(String topicName) { * @param suffix the suffix. * @return the builder. */ - public RetryTopicConfigurationBuilder retryTopicSuffix(String suffix) { + public RetryTopicConfigurationBuilder retryTopicSuffix(@Nullable String suffix) { this.retryTopicSuffix = suffix; return this; } @@ -216,7 +219,7 @@ public RetryTopicConfigurationBuilder retryTopicSuffix(String suffix) { * @param suffix the suffix. * @return the builder. */ - public RetryTopicConfigurationBuilder dltSuffix(String suffix) { + public RetryTopicConfigurationBuilder dltSuffix(@Nullable String suffix) { this.dltSuffix = suffix; return this; } @@ -422,7 +425,7 @@ public RetryTopicConfigurationBuilder doNotAutoCreateRetryTopics() { * broker is version 2.4 or later). * @return the builder. */ - public RetryTopicConfigurationBuilder autoCreateTopicsWith(int numPartitions, short replicationFactor) { + public RetryTopicConfigurationBuilder autoCreateTopicsWith(@Nullable Integer numPartitions, @Nullable Short replicationFactor) { this.topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(true, numPartitions, replicationFactor); return this; @@ -437,8 +440,8 @@ public RetryTopicConfigurationBuilder autoCreateTopicsWith(int numPartitions, sh * broker is version 2.4 or later). * @return the builder. */ - public RetryTopicConfigurationBuilder autoCreateTopics(boolean shouldCreate, int numPartitions, - short replicationFactor) { + public RetryTopicConfigurationBuilder autoCreateTopics(@Nullable Boolean shouldCreate, @Nullable Integer numPartitions, + @Nullable Short replicationFactor) { this.topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(shouldCreate, numPartitions, replicationFactor); @@ -535,7 +538,7 @@ public RetryTopicConfigurationBuilder listenerFactory(ConcurrentKafkaListenerCon * @param factoryBeanName the factory bean name. * @return the builder. */ - public RetryTopicConfigurationBuilder listenerFactory(String factoryBeanName) { + public RetryTopicConfigurationBuilder listenerFactory(@Nullable String factoryBeanName) { this.listenerContainerFactoryName = factoryBeanName; return this; }