From 9ccd314f65f5b7275dd458f38c7c7f9717b486c8 Mon Sep 17 00:00:00 2001 From: LokeshAlamuri <32193635+LokeshAlamuri@users.noreply.github.com> Date: Wed, 4 Sep 2024 23:50:57 +0530 Subject: [PATCH] GH-3476: Minor Code refactoring Fixes: #3476 https://github.com/spring-projects/spring-kafka/issues/3476 Following code is refactored. 1) `generateFromSleepingBackOffPolicy` method in `org.springframework.kafka.retrytopic.BackOffValuesGenerator` Removed the code to set the `maxBackOffPeriod`, if the BackOffPolicy is `UniformRandomBackOffPolicy`. This is not required since, when sleeper is added to the `UniformRandomBackOffPolicy` `maxBackOffPeriod` is not modified. This could be an issue with the earlier versions of spring-retry. But, latest version of spring-kafka 3.3 and spring-retry 2.0.8, this is not an issue. 2) Modified `addIfAbsent` in `org.springframework.kafka.retrytopic.ListenerContainerFactoryResolver.Cache` to use `putIfAbsent`, rather than explictly checking for the key, if it is not there than adding it to the `HashMap`. --- .../kafka/retrytopic/BackOffValuesGenerator.java | 7 ------- .../kafka/retrytopic/ListenerContainerFactoryResolver.java | 6 ++---- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/BackOffValuesGenerator.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/BackOffValuesGenerator.java index 21a51987fc..cfa5afa4dc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/BackOffValuesGenerator.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/BackOffValuesGenerator.java @@ -28,7 +28,6 @@ import org.springframework.retry.backoff.NoBackOffPolicy; import org.springframework.retry.backoff.Sleeper; import org.springframework.retry.backoff.SleepingBackOffPolicy; -import org.springframework.retry.backoff.UniformRandomBackOffPolicy; import org.springframework.retry.support.RetrySynchronizationManager; /** @@ -82,12 +81,6 @@ private List generateFromSleepingBackOffPolicy(int maxAttempts, BackOffPol BackoffRetainerSleeper sleeper = new BackoffRetainerSleeper(); SleepingBackOffPolicy retainingBackOffPolicy = ((SleepingBackOffPolicy) providedBackOffPolicy).withSleeper(sleeper); - - // UniformRandomBackOffPolicy loses the max value when a sleeper is set. - if (providedBackOffPolicy instanceof UniformRandomBackOffPolicy) { - ((UniformRandomBackOffPolicy) retainingBackOffPolicy) - .setMaxBackOffPeriod(((UniformRandomBackOffPolicy) providedBackOffPolicy).getMaxBackOffPeriod()); - } BackOffContext backOffContext = retainingBackOffPolicy.start(RetrySynchronizationManager.getContext()); IntStream.range(0, maxAttempts) .forEach(index -> retainingBackOffPolicy.backOff(backOffContext)); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryResolver.java index 8521d70915..c78cec38c0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -205,9 +205,7 @@ static class Cache { ConcurrentKafkaListenerContainerFactory resolvedFactory) { synchronized (this.cacheMap) { Key key = cacheKey(factoryFromKafkaListenerAnnotation, config); - if (!this.cacheMap.containsKey(key)) { - this.cacheMap.put(key, resolvedFactory); - } + this.cacheMap.putIfAbsent(key, resolvedFactory); return resolvedFactory; } }