Skip to content

GH-2967: Some minor improvements in RetryableTopicAnnotationProcessor #2967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public class RetryableTopicAnnotationProcessor {
* @param beanFactory the bean factory.
*/
public RetryableTopicAnnotationProcessor(BeanFactory beanFactory) {
this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory
? new BeanExpressionContext((ConfigurableBeanFactory) beanFactory, null)
this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory cbf
? new BeanExpressionContext(cbf, null)
: null); // NOSONAR
}

Expand All @@ -101,7 +101,6 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory, BeanExpression
this.expressionContext = expressionContext;
}

@SuppressWarnings("deprecation")
public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation,
Object bean) {

Expand All @@ -121,16 +120,14 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
traverse = traverseResolved;
}
else {
traverse = includes.size() > 0 || excludes.size() > 0;
traverse = !includes.isEmpty() || !excludes.isEmpty();
}
}
Boolean autoStartDlt = null;
if (StringUtils.hasText(annotation.autoStartDltHandler())) {
autoStartDlt = resolveExpressionAsBoolean(annotation.autoStartDltHandler(), "autoStartDltContainer");
}
return RetryTopicConfigurationBuilder.newInstance()
.maxAttempts(resolveExpressionAsInteger(annotation.attempts(), "attempts", true))
.concurrency(resolveExpressionAsInteger(annotation.concurrency(), "concurrency", false))
RetryTopicConfigurationBuilder builder = RetryTopicConfigurationBuilder.newInstance()
.customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory))
.retryTopicSuffix(resolveExpressionAsString(annotation.retryTopicSuffix(), "retryTopicSuffix"))
.dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix"))
Expand All @@ -147,8 +144,17 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
.autoStartDltHandler(autoStartDlt)
.setTopicSuffixingStrategy(annotation.topicSuffixingStrategy())
.sameIntervalTopicReuseStrategy(annotation.sameIntervalTopicReuseStrategy())
.timeoutAfter(timeout)
.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics));
.timeoutAfter(timeout);

Integer attempts = resolveExpressionAsInteger(annotation.attempts(), "attempts", true);
if (attempts != null) {
builder.maxAttempts(attempts);
}
Integer concurrency = resolveExpressionAsInteger(annotation.concurrency(), "concurrency", false);
if (concurrency != null) {
builder.concurrency(concurrency);
}
return builder.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are many loose ends regarding nullness safety net. Not sure why there is no usage of the following two entries in the package-info.java within the package of annotation:

@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields

if we added them, many NPE issues in this package would be better exposed.

Copy link
Contributor

@sobychacko sobychacko Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Feel free to send another PR with that change if you are up to it.

}

private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, BeanFactory beanFactory) { // NOSONAR
Expand All @@ -173,9 +179,13 @@ private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, Be
if (backoff.random()) {
policy = new ExponentialRandomBackOffPolicy();
}
policy.setInitialInterval(min);
if (min != null) {
policy.setInitialInterval(min);
}
policy.setMultiplier(multiplier);
policy.setMaxInterval(max > min ? max : ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL);
if (max != null && min != null && max > min) {
Copy link
Contributor

@sobychacko sobychacko Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will max be if max <= min? In this case, currently, we set it to ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL. The new code seems to change that semantics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comment! Below is the relevant code snippet:

public class ExponentialBackOffPolicy implements SleepingBackOffPolicy<ExponentialBackOffPolicy> {
        ... ...
	/**
	 * The maximum value of the backoff period in milliseconds.
	 */
	private long maxInterval = DEFAULT_MAX_INTERVAL;

so regardless of whether we set the same default value or not in RetryableTopicAnnotationProcessor above, the end result is the same. I avoided duplication to make it more elegant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know whether we need to revert back old behaviour. I am perfectly fine with either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But don't we need to set that value in RetryableTopicAnnotationProcessor? Otherwise, how does this max variable get to that value? I am fine with cleaning it up if it is redundant. I am trying to see the connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I showed above, the policy class will set the fields to default values already just after the instance was initiated but before constructor is invoked, so after the default constructor was invoked, we have ended up with valid state already.

Am I missing something here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I got it. Thanks for the explanation.

policy.setMaxInterval(max);
}
return policy;
}
if (max != null && min != null && max > min) {
Expand All @@ -200,7 +210,6 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
.orElse(RetryTopicConfigurer.DEFAULT_DLT_HANDLER);
}

@SuppressWarnings("deprecation")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems leftover of previous JDK. In JDK17, I saw no compiling warnings after the removal of it.

private KafkaOperations<?, ?> getKafkaTemplate(String kafkaTemplateName, String[] topics) {
if (StringUtils.hasText(kafkaTemplateName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by bean name");
Expand All @@ -213,6 +222,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
+ " with id '" + kafkaTemplateName + "' was found in the application context", ex);
}
}
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by default bean name");
try {
return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
KafkaOperations.class);
Expand All @@ -228,8 +238,8 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean

private String resolveExpressionAsString(String value, String attribute) {
Object resolved = resolveExpression(value);
if (resolved instanceof String) {
return (String) resolved;
if (resolved instanceof String str) {
return str;
}
else if (resolved != null) {
throw new IllegalStateException(THE_OSQ + attribute + "] must resolve to a String. "
Expand All @@ -241,16 +251,13 @@ else if (resolved != null) {
private Integer resolveExpressionAsInteger(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Integer result = null;
if (resolved instanceof String) {
if (!required && !StringUtils.hasText((String) resolved)) {
result = null;
}
else {
result = Integer.parseInt((String) resolved);
if (resolved instanceof String str) {
if (required || StringUtils.hasText(str)) {
result = Integer.parseInt(str);
}
}
else if (resolved instanceof Number) {
result = ((Number) resolved).intValue();
else if (resolved instanceof Number num) {
result = num.intValue();
}
else if (resolved != null || required) {
throw new IllegalStateException(
Expand All @@ -264,16 +271,13 @@ else if (resolved != null || required) {
private Short resolveExpressionAsShort(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Short result = null;
if (resolved instanceof String) {
if (!required && !StringUtils.hasText((String) resolved)) {
result = null;
}
else {
result = Short.parseShort((String) resolved);
if (resolved instanceof String str) {
if (required || StringUtils.hasText(str)) {
result = Short.parseShort(str);
}
}
else if (resolved instanceof Number) {
result = ((Number) resolved).shortValue();
else if (resolved instanceof Number num) {
result = num.shortValue();
}
else if (resolved != null || required) {
throw new IllegalStateException(
Expand All @@ -287,16 +291,13 @@ else if (resolved != null || required) {
private Long resolveExpressionAsLong(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Long result = null;
if (resolved instanceof String) {
if (!required && !StringUtils.hasText((String) resolved)) {
result = null;
}
else {
result = Long.parseLong((String) resolved);
if (resolved instanceof String str) {
if (required || StringUtils.hasText(str)) {
result = Long.parseLong(str);
}
}
else if (resolved instanceof Number) {
result = ((Number) resolved).longValue();
else if (resolved instanceof Number num) {
result = num.longValue();
}
else if (resolved != null || required) {
throw new IllegalStateException(
Expand All @@ -310,16 +311,13 @@ else if (resolved != null || required) {
private Double resolveExpressionAsDouble(String value, String attribute, boolean required) {
Object resolved = resolveExpression(value);
Double result = null;
if (resolved instanceof String) {
if (!required && !StringUtils.hasText((String) resolved)) {
result = null;
}
else {
result = Double.parseDouble((String) resolved);
if (resolved instanceof String str) {
if (required || StringUtils.hasText(str)) {
result = Double.parseDouble(str);
}
}
else if (resolved instanceof Number) {
result = ((Number) resolved).doubleValue();
else if (resolved instanceof Number num) {
result = num.doubleValue();
}
else if (resolved != null || required) {
throw new IllegalStateException(
Expand All @@ -333,11 +331,11 @@ else if (resolved != null || required) {
private Boolean resolveExpressionAsBoolean(String value, String attribute) {
Object resolved = resolveExpression(value);
Boolean result = null;
if (resolved instanceof Boolean) {
result = (Boolean) resolved;
if (resolved instanceof Boolean bool) {
result = bool;
}
else if (resolved instanceof String) {
result = Boolean.parseBoolean((String) resolved);
else if (resolved instanceof String str) {
result = Boolean.parseBoolean(str);
}
else if (resolved != null) {
throw new IllegalStateException(
Expand Down Expand Up @@ -368,8 +366,8 @@ private List<Class<? extends Throwable>> resolveClasses(Class<? extends Throwabl
}

private Object resolveExpression(String value) {
String resolved = resolve(value);
if (this.expressionContext != null) {
String resolved = resolve(value);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems the resolved is only used within the if block, so it makes more sense to move it within, so when expressionContext is null, no unnecessary overhead will incur.

return this.resolver.evaluate(resolved, this.expressionContext);
}
else {
Expand All @@ -378,8 +376,8 @@ private Object resolveExpression(String value) {
}

private String resolve(String value) {
if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {
return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
if (this.beanFactory instanceof ConfigurableBeanFactory cbf) {
return cbf.resolveEmbeddedValue(value);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

of course, the original nullness checking is unnecessary for instanceof has included it already.

}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -100,10 +100,10 @@ static class TopicCreation {
private final int numPartitions;
private final short replicationFactor;

TopicCreation(boolean shouldCreate, int numPartitions, short replicationFactor) {
this.shouldCreateTopics = shouldCreate;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
TopicCreation(@Nullable Boolean shouldCreate, @Nullable Integer numPartitions, @Nullable Short replicationFactor) {
this.shouldCreateTopics = shouldCreate == null || shouldCreate;
this.numPartitions = numPartitions == null ? 1 : numPartitions;
this.replicationFactor = replicationFactor == null ? -1 : replicationFactor;
}

TopicCreation() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,17 @@ public class RetryTopicConfigurationBuilder {

private EndpointHandlerMethod dltHandlerMethod;

@Nullable
private String retryTopicSuffix;

@Nullable
private String dltSuffix;

private RetryTopicConfiguration.TopicCreation topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation();

private ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainerFactory;

@Nullable
private String listenerContainerFactoryName;

@Nullable
Expand Down Expand Up @@ -206,7 +209,7 @@ public RetryTopicConfigurationBuilder excludeTopic(String topicName) {
* @param suffix the suffix.
* @return the builder.
*/
public RetryTopicConfigurationBuilder retryTopicSuffix(String suffix) {
public RetryTopicConfigurationBuilder retryTopicSuffix(@Nullable String suffix) {
this.retryTopicSuffix = suffix;
return this;
}
Expand All @@ -216,7 +219,7 @@ public RetryTopicConfigurationBuilder retryTopicSuffix(String suffix) {
* @param suffix the suffix.
* @return the builder.
*/
public RetryTopicConfigurationBuilder dltSuffix(String suffix) {
public RetryTopicConfigurationBuilder dltSuffix(@Nullable String suffix) {
this.dltSuffix = suffix;
return this;
}
Expand Down Expand Up @@ -422,7 +425,7 @@ public RetryTopicConfigurationBuilder doNotAutoCreateRetryTopics() {
* broker is version 2.4 or later).
* @return the builder.
*/
public RetryTopicConfigurationBuilder autoCreateTopicsWith(int numPartitions, short replicationFactor) {
public RetryTopicConfigurationBuilder autoCreateTopicsWith(@Nullable Integer numPartitions, @Nullable Short replicationFactor) {
this.topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(true, numPartitions,
replicationFactor);
return this;
Expand All @@ -437,8 +440,8 @@ public RetryTopicConfigurationBuilder autoCreateTopicsWith(int numPartitions, sh
* broker is version 2.4 or later).
* @return the builder.
*/
public RetryTopicConfigurationBuilder autoCreateTopics(boolean shouldCreate, int numPartitions,
short replicationFactor) {
public RetryTopicConfigurationBuilder autoCreateTopics(@Nullable Boolean shouldCreate, @Nullable Integer numPartitions,
@Nullable Short replicationFactor) {

this.topicCreationConfiguration = new RetryTopicConfiguration.TopicCreation(shouldCreate, numPartitions,
replicationFactor);
Expand Down Expand Up @@ -535,7 +538,7 @@ public RetryTopicConfigurationBuilder listenerFactory(ConcurrentKafkaListenerCon
* @param factoryBeanName the factory bean name.
* @return the builder.
*/
public RetryTopicConfigurationBuilder listenerFactory(String factoryBeanName) {
public RetryTopicConfigurationBuilder listenerFactory(@Nullable String factoryBeanName) {
this.listenerContainerFactoryName = factoryBeanName;
return this;
}
Expand Down