From 930bbee7db7b501350e67383a1172b2098cf4795 Mon Sep 17 00:00:00 2001 From: Nathan Xu Date: Thu, 21 Dec 2023 21:46:58 -0500 Subject: [PATCH 1/2] some minor improvements on RetryableTopicAnnotationProcessor --- .../RetryableTopicAnnotationProcessor.java | 87 +++++++++---------- 1 file changed, 39 insertions(+), 48 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index 673e62b95e..ac9c0ebdea 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -82,8 +82,8 @@ public class RetryableTopicAnnotationProcessor { * @param beanFactory the bean factory. */ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory) { - this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory - ? new BeanExpressionContext((ConfigurableBeanFactory) beanFactory, null) + this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory cbf + ? new BeanExpressionContext(cbf, null) : null); // NOSONAR } @@ -101,7 +101,6 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory, BeanExpression this.expressionContext = expressionContext; } - @SuppressWarnings("deprecation") public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation, Object bean) { @@ -121,7 +120,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, traverse = traverseResolved; } else { - traverse = includes.size() > 0 || excludes.size() > 0; + traverse = !includes.isEmpty() || !excludes.isEmpty(); } } Boolean autoStartDlt = null; @@ -173,9 +172,13 @@ private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, Be if (backoff.random()) { policy = new ExponentialRandomBackOffPolicy(); } - policy.setInitialInterval(min); + if (min != null) { + policy.setInitialInterval(min); + } policy.setMultiplier(multiplier); - policy.setMaxInterval(max > min ? max : ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL); + if (max != null && min != null && max > min) { + policy.setMaxInterval(max); + } return policy; } if (max != null && min != null && max > min) { @@ -200,7 +203,6 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean .orElse(RetryTopicConfigurer.DEFAULT_DLT_HANDLER); } - @SuppressWarnings("deprecation") private KafkaOperations getKafkaTemplate(String kafkaTemplateName, String[] topics) { if (StringUtils.hasText(kafkaTemplateName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by bean name"); @@ -213,6 +215,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean + " with id '" + kafkaTemplateName + "' was found in the application context", ex); } } + Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by default bean name"); try { return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class); @@ -228,8 +231,8 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean private String resolveExpressionAsString(String value, String attribute) { Object resolved = resolveExpression(value); - if (resolved instanceof String) { - return (String) resolved; + if (resolved instanceof String sResolved) { + return sResolved; } else if (resolved != null) { throw new IllegalStateException(THE_OSQ + attribute + "] must resolve to a String. " @@ -241,16 +244,13 @@ else if (resolved != null) { private Integer resolveExpressionAsInteger(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Integer result = null; - if (resolved instanceof String) { - if (!required && !StringUtils.hasText((String) resolved)) { - result = null; - } - else { - result = Integer.parseInt((String) resolved); + if (resolved instanceof String sResolved) { + if (required || StringUtils.hasText(sResolved)) { + result = Integer.parseInt(sResolved); } } - else if (resolved instanceof Number) { - result = ((Number) resolved).intValue(); + else if (resolved instanceof Number nResolved) { + result = nResolved.intValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -264,16 +264,13 @@ else if (resolved != null || required) { private Short resolveExpressionAsShort(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Short result = null; - if (resolved instanceof String) { - if (!required && !StringUtils.hasText((String) resolved)) { - result = null; - } - else { - result = Short.parseShort((String) resolved); + if (resolved instanceof String sResolved) { + if (required || StringUtils.hasText(sResolved)) { + result = Short.parseShort(sResolved); } } - else if (resolved instanceof Number) { - result = ((Number) resolved).shortValue(); + else if (resolved instanceof Number nResolved) { + result = nResolved.shortValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -287,16 +284,13 @@ else if (resolved != null || required) { private Long resolveExpressionAsLong(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Long result = null; - if (resolved instanceof String) { - if (!required && !StringUtils.hasText((String) resolved)) { - result = null; - } - else { - result = Long.parseLong((String) resolved); + if (resolved instanceof String sResolved) { + if (required || StringUtils.hasText(sResolved)) { + result = Long.parseLong(sResolved); } } - else if (resolved instanceof Number) { - result = ((Number) resolved).longValue(); + else if (resolved instanceof Number nResolved) { + result = nResolved.longValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -310,16 +304,13 @@ else if (resolved != null || required) { private Double resolveExpressionAsDouble(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Double result = null; - if (resolved instanceof String) { - if (!required && !StringUtils.hasText((String) resolved)) { - result = null; - } - else { - result = Double.parseDouble((String) resolved); + if (resolved instanceof String sResolved) { + if (required || StringUtils.hasText(sResolved)) { + result = Double.parseDouble(sResolved); } } - else if (resolved instanceof Number) { - result = ((Number) resolved).doubleValue(); + else if (resolved instanceof Number nResolved) { + result = nResolved.doubleValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -333,11 +324,11 @@ else if (resolved != null || required) { private Boolean resolveExpressionAsBoolean(String value, String attribute) { Object resolved = resolveExpression(value); Boolean result = null; - if (resolved instanceof Boolean) { - result = (Boolean) resolved; + if (resolved instanceof Boolean bResolved) { + result = bResolved; } - else if (resolved instanceof String) { - result = Boolean.parseBoolean((String) resolved); + else if (resolved instanceof String sResolved) { + result = Boolean.parseBoolean(sResolved); } else if (resolved != null) { throw new IllegalStateException( @@ -368,8 +359,8 @@ private List> resolveClasses(Class Date: Fri, 22 Dec 2023 05:56:25 -0500 Subject: [PATCH 2/2] further improvement on nullness checking --- .../RetryableTopicAnnotationProcessor.java | 69 ++++++++++--------- .../retrytopic/RetryTopicConfiguration.java | 10 +-- .../RetryTopicConfigurationBuilder.java | 15 ++-- 3 files changed, 52 insertions(+), 42 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index ac9c0ebdea..db8c10e01e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -127,9 +127,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, if (StringUtils.hasText(annotation.autoStartDltHandler())) { autoStartDlt = resolveExpressionAsBoolean(annotation.autoStartDltHandler(), "autoStartDltContainer"); } - return RetryTopicConfigurationBuilder.newInstance() - .maxAttempts(resolveExpressionAsInteger(annotation.attempts(), "attempts", true)) - .concurrency(resolveExpressionAsInteger(annotation.concurrency(), "concurrency", false)) + RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance() .customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory)) .retryTopicSuffix(resolveExpressionAsString(annotation.retryTopicSuffix(), "retryTopicSuffix")) .dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix")) @@ -146,8 +144,17 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, .autoStartDltHandler(autoStartDlt) .setTopicSuffixingStrategy(annotation.topicSuffixingStrategy()) .sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy()) - .timeoutAfter(timeout) - .create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics)); + .timeoutAfter(timeout); + + Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true); + if (attempts != null) { + builder.maxAttempts(attempts); + } + Integer concurrency = resolveExpressionAsInteger(annotation.concurrency(), "concurrency", false); + if (concurrency != null) { + builder.concurrency(concurrency); + } + return builder.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics)); } private SleepingBackOffPolicy createBackoffFromAnnotation(Backoff backoff, BeanFactory beanFactory) { // NOSONAR @@ -231,8 +238,8 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean private String resolveExpressionAsString(String value, String attribute) { Object resolved = resolveExpression(value); - if (resolved instanceof String sResolved) { - return sResolved; + if (resolved instanceof String str) { + return str; } else if (resolved != null) { throw new IllegalStateException(THE_OSQ + attribute + "] must resolve to a String. " @@ -244,13 +251,13 @@ else if (resolved != null) { private Integer resolveExpressionAsInteger(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Integer result = null; - if (resolved instanceof String sResolved) { - if (required || StringUtils.hasText(sResolved)) { - result = Integer.parseInt(sResolved); + if (resolved instanceof String str) { + if (required || StringUtils.hasText(str)) { + result = Integer.parseInt(str); } } - else if (resolved instanceof Number nResolved) { - result = nResolved.intValue(); + else if (resolved instanceof Number num) { + result = num.intValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -264,13 +271,13 @@ else if (resolved != null || required) { private Short resolveExpressionAsShort(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Short result = null; - if (resolved instanceof String sResolved) { - if (required || StringUtils.hasText(sResolved)) { - result = Short.parseShort(sResolved); + if (resolved instanceof String str) { + if (required || StringUtils.hasText(str)) { + result = Short.parseShort(str); } } - else if (resolved instanceof Number nResolved) { - result = nResolved.shortValue(); + else if (resolved instanceof Number num) { + result = num.shortValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -284,13 +291,13 @@ else if (resolved != null || required) { private Long resolveExpressionAsLong(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Long result = null; - if (resolved instanceof String sResolved) { - if (required || StringUtils.hasText(sResolved)) { - result = Long.parseLong(sResolved); + if (resolved instanceof String str) { + if (required || StringUtils.hasText(str)) { + result = Long.parseLong(str); } } - else if (resolved instanceof Number nResolved) { - result = nResolved.longValue(); + else if (resolved instanceof Number num) { + result = num.longValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -304,13 +311,13 @@ else if (resolved != null || required) { private Double resolveExpressionAsDouble(String value, String attribute, boolean required) { Object resolved = resolveExpression(value); Double result = null; - if (resolved instanceof String sResolved) { - if (required || StringUtils.hasText(sResolved)) { - result = Double.parseDouble(sResolved); + if (resolved instanceof String str) { + if (required || StringUtils.hasText(str)) { + result = Double.parseDouble(str); } } - else if (resolved instanceof Number nResolved) { - result = nResolved.doubleValue(); + else if (resolved instanceof Number num) { + result = num.doubleValue(); } else if (resolved != null || required) { throw new IllegalStateException( @@ -324,11 +331,11 @@ else if (resolved != null || required) { private Boolean resolveExpressionAsBoolean(String value, String attribute) { Object resolved = resolveExpression(value); Boolean result = null; - if (resolved instanceof Boolean bResolved) { - result = bResolved; + if (resolved instanceof Boolean bool) { + result = bool; } - else if (resolved instanceof String sResolved) { - result = Boolean.parseBoolean(sResolved); + else if (resolved instanceof String str) { + result = Boolean.parseBoolean(str); } else if (resolved != null) { throw new IllegalStateException( diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfiguration.java index f8a217e787..426d5ec467 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfiguration.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -100,10 +100,10 @@ static class TopicCreation { private final int numPartitions; private final short replicationFactor; - TopicCreation(boolean shouldCreate, int numPartitions, short replicationFactor) { - this.shouldCreateTopics = shouldCreate; - this.numPartitions = numPartitions; - this.replicationFactor = replicationFactor; + TopicCreation(@Nullable Boolean shouldCreate, @Nullable Integer numPartitions, @Nullable Short replicationFactor) { + this.shouldCreateTopics = shouldCreate == null || shouldCreate; + this.numPartitions = numPartitions == null ? 1 : numPartitions; + this.replicationFactor = replicationFactor == null ? -1 : replicationFactor; } TopicCreation() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java index 186ece3a77..4e24b1deb7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationBuilder.java @@ -58,14 +58,17 @@ public class RetryTopicConfigurationBuilder { private EndpointHandlerMethod dltHandlerMethod; + @Nullable private String retryTopicSuffix; + @Nullable private String dltSuffix; private RetryTopicConfiguration.TopicCreation topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(); private ConcurrentKafkaListenerContainerFactory listenerContainerFactory; + @Nullable private String listenerContainerFactoryName; @Nullable @@ -206,7 +209,7 @@ public RetryTopicConfigurationBuilder excludeTopic(String topicName) { * @param suffix the suffix. * @return the builder. */ - public RetryTopicConfigurationBuilder retryTopicSuffix(String suffix) { + public RetryTopicConfigurationBuilder retryTopicSuffix(@Nullable String suffix) { this.retryTopicSuffix = suffix; return this; } @@ -216,7 +219,7 @@ public RetryTopicConfigurationBuilder retryTopicSuffix(String suffix) { * @param suffix the suffix. * @return the builder. */ - public RetryTopicConfigurationBuilder dltSuffix(String suffix) { + public RetryTopicConfigurationBuilder dltSuffix(@Nullable String suffix) { this.dltSuffix = suffix; return this; } @@ -422,7 +425,7 @@ public RetryTopicConfigurationBuilder doNotAutoCreateRetryTopics() { * broker is version 2.4 or later). * @return the builder. */ - public RetryTopicConfigurationBuilder autoCreateTopicsWith(int numPartitions, short replicationFactor) { + public RetryTopicConfigurationBuilder autoCreateTopicsWith(@Nullable Integer numPartitions, @Nullable Short replicationFactor) { this.topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(true, numPartitions, replicationFactor); return this; @@ -437,8 +440,8 @@ public RetryTopicConfigurationBuilder autoCreateTopicsWith(int numPartitions, sh * broker is version 2.4 or later). * @return the builder. */ - public RetryTopicConfigurationBuilder autoCreateTopics(boolean shouldCreate, int numPartitions, - short replicationFactor) { + public RetryTopicConfigurationBuilder autoCreateTopics(@Nullable Boolean shouldCreate, @Nullable Integer numPartitions, + @Nullable Short replicationFactor) { this.topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(shouldCreate, numPartitions, replicationFactor); @@ -535,7 +538,7 @@ public RetryTopicConfigurationBuilder listenerFactory(ConcurrentKafkaListenerCon * @param factoryBeanName the factory bean name. * @return the builder. */ - public RetryTopicConfigurationBuilder listenerFactory(String factoryBeanName) { + public RetryTopicConfigurationBuilder listenerFactory(@Nullable String factoryBeanName) { this.listenerContainerFactoryName = factoryBeanName; return this; }