Skip to content

Commit

Permalink
GH-3476: Minor Code refactoring
Browse files Browse the repository at this point in the history
Fixes: #3476 

#3476

Following code is refactored.

1) `generateFromSleepingBackOffPolicy` method in `org.springframework.kafka.retrytopic.BackOffValuesGenerator`

Removed the code to set the `maxBackOffPeriod`, if the BackOffPolicy is `UniformRandomBackOffPolicy`. This is not required since, when sleeper is added to the `UniformRandomBackOffPolicy` `maxBackOffPeriod` is not modified. This could be an issue with the earlier versions of spring-retry. But, latest version of spring-kafka 3.3 and spring-retry 2.0.8, this is not an issue.

2) Modified `addIfAbsent` in `org.springframework.kafka.retrytopic.ListenerContainerFactoryResolver.Cache` to use `putIfAbsent`, rather than explictly checking for the key, if it is not there than adding it to the `HashMap`.
  • Loading branch information
LokeshAlamuri authored Sep 4, 2024
1 parent 95b750a commit 9ccd314
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.springframework.retry.backoff.NoBackOffPolicy;
import org.springframework.retry.backoff.Sleeper;
import org.springframework.retry.backoff.SleepingBackOffPolicy;
import org.springframework.retry.backoff.UniformRandomBackOffPolicy;
import org.springframework.retry.support.RetrySynchronizationManager;

/**
Expand Down Expand Up @@ -82,12 +81,6 @@ private List<Long> generateFromSleepingBackOffPolicy(int maxAttempts, BackOffPol
BackoffRetainerSleeper sleeper = new BackoffRetainerSleeper();
SleepingBackOffPolicy<?> retainingBackOffPolicy =
((SleepingBackOffPolicy<?>) providedBackOffPolicy).withSleeper(sleeper);

// UniformRandomBackOffPolicy loses the max value when a sleeper is set.
if (providedBackOffPolicy instanceof UniformRandomBackOffPolicy) {
((UniformRandomBackOffPolicy) retainingBackOffPolicy)
.setMaxBackOffPeriod(((UniformRandomBackOffPolicy) providedBackOffPolicy).getMaxBackOffPeriod());
}
BackOffContext backOffContext = retainingBackOffPolicy.start(RetrySynchronizationManager.getContext());
IntStream.range(0, maxAttempts)
.forEach(index -> retainingBackOffPolicy.backOff(backOffContext));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -205,9 +205,7 @@ static class Cache {
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory) {
synchronized (this.cacheMap) {
Key key = cacheKey(factoryFromKafkaListenerAnnotation, config);
if (!this.cacheMap.containsKey(key)) {
this.cacheMap.put(key, resolvedFactory);
}
this.cacheMap.putIfAbsent(key, resolvedFactory);
return resolvedFactory;
}
}
Expand Down

0 comments on commit 9ccd314

Please sign in to comment.