Skip to content

Commit

Permalink
GH-2967: RetryableTopic related code cleanup
Browse files Browse the repository at this point in the history
Fixes: #2967 

* some minor improvements on RetryableTopicAnnotationProcessor
* further improvement on nullness checking
  • Loading branch information
NathanQingyangXu authored Dec 22, 2023
1 parent 2cf5fbb commit 7cbef70
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public class RetryableTopicAnnotationProcessor {
* @param beanFactory the bean factory.
*/
public RetryableTopicAnnotationProcessor(BeanFactory beanFactory) {
this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory
? new BeanExpressionContext((ConfigurableBeanFactory) beanFactory, null)
this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory cbf
? new BeanExpressionContext(cbf, null)
: null); // NOSONAR
}

Expand All @@ -101,7 +101,6 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory, BeanExpression
this.expressionContext = expressionContext;
}

@SuppressWarnings("deprecation")
public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation,
Object bean) {

Expand All @@ -121,16 +120,14 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
traverse = traverseResolved;
}
else {
traverse = includes.size() > 0 || excludes.size() > 0;
traverse = !includes.isEmpty() || !excludes.isEmpty();
}
}
Boolean autoStartDlt = null;
if (StringUtils.hasText(annotation.autoStartDltHandler())) {
autoStartDlt = resolveExpressionAsBoolean(annotation.autoStartDltHandler(), "autoStartDltContainer");
}
return RetryTopicConfigurationBuilder.newInstance()
.maxAttempts(resolveExpressionAsInteger(annotation.attempts(), "attempts", true))
.concurrency(resolveExpressionAsInteger(annotation.concurrency(), "concurrency", false))
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
.customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory))
.retryTopicSuffix(resolveExpressionAsString(annotation.retryTopicSuffix(), "retryTopicSuffix"))
.dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix"))
Expand All @@ -147,8 +144,17 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
.autoStartDltHandler(autoStartDlt)
.setTopicSuffixingStrategy(annotation.topicSuffixingStrategy())
.sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy())
.timeoutAfter(timeout)
.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics));
.timeoutAfter(timeout);

Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true);
if (attempts != null) {
builder.maxAttempts(attempts);
}
Integer concurrency = resolveExpressionAsInteger(annotation.concurrency(), "concurrency", false);
if (concurrency != null) {
builder.concurrency(concurrency);
}
return builder.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics));
}

private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, BeanFactory beanFactory) { // NOSONAR
Expand All @@ -173,9 +179,13 @@ private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, Be
if (backoff.random()) {
policy = new ExponentialRandomBackOffPolicy();
}
policy.setInitialInterval(min);
if (min != null) {
policy.setInitialInterval(min);
}
policy.setMultiplier(multiplier);
policy.setMaxInterval(max > min ? max : ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL);
if (max != null && min != null && max > min) {
policy.setMaxInterval(max);
}
return policy;
}
if (max != null && min != null && max > min) {
Expand All @@ -200,7 +210,6 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
.orElse(RetryTopicConfigurer.DEFAULT_DLT_HANDLER);
}

@SuppressWarnings("deprecation")
private KafkaOperations<?, ?> getKafkaTemplate(String kafkaTemplateName, String[] topics) {
if (StringUtils.hasText(kafkaTemplateName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by bean name");
Expand All @@ -213,6 +222,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
+ " with id '" + kafkaTemplateName + "' was found in the application context", ex);
}
}
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by default bean name");
try {
return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
KafkaOperations.class);
Expand All @@ -228,8 +238,8 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean

private String resolveExpressionAsString(String value, String attribute) {
Object resolved = resolveExpression(value);
if (resolved instanceof String) {
return (String) resolved;
if (resolved instanceof String str) {
return str;
}
else if (resolved != null) {
throw new IllegalStateException(THE_OSQ + attribute + "] must resolve to a String. "
Expand All @@ -241,16 +251,13 @@ else if (resolved != null) {
private Integer resolveExpressionAsInteger(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Integer result = null;
if (resolved instanceof String) {
if (!required && !StringUtils.hasText((String) resolved)) {
result = null;
}
else {
result = Integer.parseInt((String) resolved);
if (resolved instanceof String str) {
if (required || StringUtils.hasText(str)) {
result = Integer.parseInt(str);
}
}
else if (resolved instanceof Number) {
result = ((Number) resolved).intValue();
else if (resolved instanceof Number num) {
result = num.intValue();
}
else if (resolved != null || required) {
throw new IllegalStateException(
Expand All @@ -264,16 +271,13 @@ else if (resolved != null || required) {
private Short resolveExpressionAsShort(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Short result = null;
if (resolved instanceof String) {
if (!required && !StringUtils.hasText((String) resolved)) {
result = null;
}
else {
result = Short.parseShort((String) resolved);
if (resolved instanceof String str) {
if (required || StringUtils.hasText(str)) {
result = Short.parseShort(str);
}
}
else if (resolved instanceof Number) {
result = ((Number) resolved).shortValue();
else if (resolved instanceof Number num) {
result = num.shortValue();
}
else if (resolved != null || required) {
throw new IllegalStateException(
Expand All @@ -287,16 +291,13 @@ else if (resolved != null || required) {
private Long resolveExpressionAsLong(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Long result = null;
if (resolved instanceof String) {
if (!required && !StringUtils.hasText((String) resolved)) {
result = null;
}
else {
result = Long.parseLong((String) resolved);
if (resolved instanceof String str) {
if (required || StringUtils.hasText(str)) {
result = Long.parseLong(str);
}
}
else if (resolved instanceof Number) {
result = ((Number) resolved).longValue();
else if (resolved instanceof Number num) {
result = num.longValue();
}
else if (resolved != null || required) {
throw new IllegalStateException(
Expand All @@ -310,16 +311,13 @@ else if (resolved != null || required) {
private Double resolveExpressionAsDouble(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Double result = null;
if (resolved instanceof String) {
if (!required && !StringUtils.hasText((String) resolved)) {
result = null;
}
else {
result = Double.parseDouble((String) resolved);
if (resolved instanceof String str) {
if (required || StringUtils.hasText(str)) {
result = Double.parseDouble(str);
}
}
else if (resolved instanceof Number) {
result = ((Number) resolved).doubleValue();
else if (resolved instanceof Number num) {
result = num.doubleValue();
}
else if (resolved != null || required) {
throw new IllegalStateException(
Expand All @@ -333,11 +331,11 @@ else if (resolved != null || required) {
private Boolean resolveExpressionAsBoolean(String value, String attribute) {
Object resolved = resolveExpression(value);
Boolean result = null;
if (resolved instanceof Boolean) {
result = (Boolean) resolved;
if (resolved instanceof Boolean bool) {
result = bool;
}
else if (resolved instanceof String) {
result = Boolean.parseBoolean((String) resolved);
else if (resolved instanceof String str) {
result = Boolean.parseBoolean(str);
}
else if (resolved != null) {
throw new IllegalStateException(
Expand Down Expand Up @@ -368,8 +366,8 @@ private List<Class<? extends Throwable>> resolveClasses(Class<? extends Throwabl
}

private Object resolveExpression(String value) {
String resolved = resolve(value);
if (this.expressionContext != null) {
String resolved = resolve(value);
return this.resolver.evaluate(resolved, this.expressionContext);
}
else {
Expand All @@ -378,8 +376,8 @@ private Object resolveExpression(String value) {
}

private String resolve(String value) {
if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {
return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
if (this.beanFactory instanceof ConfigurableBeanFactory cbf) {
return cbf.resolveEmbeddedValue(value);
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -100,10 +100,10 @@ static class TopicCreation {
private final int numPartitions;
private final short replicationFactor;

TopicCreation(boolean shouldCreate, int numPartitions, short replicationFactor) {
this.shouldCreateTopics = shouldCreate;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
TopicCreation(@Nullable Boolean shouldCreate, @Nullable Integer numPartitions, @Nullable Short replicationFactor) {
this.shouldCreateTopics = shouldCreate == null || shouldCreate;
this.numPartitions = numPartitions == null ? 1 : numPartitions;
this.replicationFactor = replicationFactor == null ? -1 : replicationFactor;
}

TopicCreation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,17 @@ public class RetryTopicConfigurationBuilder {

private EndpointHandlerMethod dltHandlerMethod;

@Nullable
private String retryTopicSuffix;

@Nullable
private String dltSuffix;

private RetryTopicConfiguration.TopicCreation topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation();

private ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainerFactory;

@Nullable
private String listenerContainerFactoryName;

@Nullable
Expand Down Expand Up @@ -206,7 +209,7 @@ public RetryTopicConfigurationBuilder excludeTopic(String topicName) {
* @param suffix the suffix.
* @return the builder.
*/
public RetryTopicConfigurationBuilder retryTopicSuffix(String suffix) {
public RetryTopicConfigurationBuilder retryTopicSuffix(@Nullable String suffix) {
this.retryTopicSuffix = suffix;
return this;
}
Expand All @@ -216,7 +219,7 @@ public RetryTopicConfigurationBuilder retryTopicSuffix(String suffix) {
* @param suffix the suffix.
* @return the builder.
*/
public RetryTopicConfigurationBuilder dltSuffix(String suffix) {
public RetryTopicConfigurationBuilder dltSuffix(@Nullable String suffix) {
this.dltSuffix = suffix;
return this;
}
Expand Down Expand Up @@ -422,7 +425,7 @@ public RetryTopicConfigurationBuilder doNotAutoCreateRetryTopics() {
* broker is version 2.4 or later).
* @return the builder.
*/
public RetryTopicConfigurationBuilder autoCreateTopicsWith(int numPartitions, short replicationFactor) {
public RetryTopicConfigurationBuilder autoCreateTopicsWith(@Nullable Integer numPartitions, @Nullable Short replicationFactor) {
this.topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(true, numPartitions,
replicationFactor);
return this;
Expand All @@ -437,8 +440,8 @@ public RetryTopicConfigurationBuilder autoCreateTopicsWith(int numPartitions, sh
* broker is version 2.4 or later).
* @return the builder.
*/
public RetryTopicConfigurationBuilder autoCreateTopics(boolean shouldCreate, int numPartitions,
short replicationFactor) {
public RetryTopicConfigurationBuilder autoCreateTopics(@Nullable Boolean shouldCreate, @Nullable Integer numPartitions,
@Nullable Short replicationFactor) {

this.topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(shouldCreate, numPartitions,
replicationFactor);
Expand Down Expand Up @@ -535,7 +538,7 @@ public RetryTopicConfigurationBuilder listenerFactory(ConcurrentKafkaListenerCon
* @param factoryBeanName the factory bean name.
* @return the builder.
*/
public RetryTopicConfigurationBuilder listenerFactory(String factoryBeanName) {
public RetryTopicConfigurationBuilder listenerFactory(@Nullable String factoryBeanName) {
this.listenerContainerFactoryName = factoryBeanName;
return this;
}
Expand Down

0 comments on commit 7cbef70

Please sign in to comment.