From 46bbacb5d90472f90b0e142e2626529c395905f0 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Fri, 5 Apr 2019 14:27:46 +0100 Subject: [PATCH 1/3] Support for CompletableFuture and Future async types If a RetryCallback returns an async type it can be wrapped by the RetryTemplate. The template needs to know about all the callback return types that need wrapping, via its RetryResultProcessors property (not set by default). See AsyncRetryTemplateTests for examples of how to do this. --- pom.xml | 4 +- ...CompletableFutureRetryResultProcessor.java | 90 +++++++ .../support/FutureRetryResultProcessor.java | 136 +++++++++++ .../retry/support/RetryResultProcessor.java | 62 +++++ .../retry/support/RetryTemplate.java | 220 ++++++++++++------ .../support/AsyncRetryTemplateTests.java | 215 +++++++++++++++++ 6 files changed, 659 insertions(+), 68 deletions(-) create mode 100644 src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java create mode 100644 src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java create mode 100644 src/main/java/org/springframework/retry/support/RetryResultProcessor.java create mode 100644 src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java diff --git a/pom.xml b/pom.xml index a9389a4f..9791a4da 100644 --- a/pom.xml +++ b/pom.xml @@ -320,8 +320,8 @@ org.apache.maven.plugins maven-compiler-plugin - 1.6 - 1.6 + 1.8 + 1.8 diff --git a/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java new file mode 100644 index 00000000..3f2f4312 --- /dev/null +++ b/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java @@ -0,0 +1,90 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryException; + +/** + * A {@link RetryResultProcessor} for a {@link CompletableFuture}. If a + * {@link RetryCallback} returns a CompletableFuture this processor can be + * used internally by the {@link RetryTemplate} to wrap it and process the result. + * + * @author Dave Syer + */ +public class CompletableFutureRetryResultProcessor + implements RetryResultProcessor> { + + @Override + public Result> process(CompletableFuture completable, + Supplier>> supplier, + Consumer handler) { + @SuppressWarnings("unchecked") + CompletableFuture typed = (CompletableFuture) completable; + CompletableFuture handle = typed + .thenApply(value -> CompletableFuture.completedFuture(value)) + .exceptionally(throwable -> apply(supplier, handler, throwable)) + .thenCompose(Function.identity()); + return new Result<>(handle); + } + + private CompletableFuture apply( + Supplier>> supplier, Consumer handler, + Throwable throwable) { + Throwable error = throwable; + try { + if (throwable instanceof ExecutionException + || throwable instanceof CompletionException) { + error = throwable.getCause(); + } + handler.accept(error); + Result> result = supplier.get(); + if (result.isComplete()) { + @SuppressWarnings("unchecked") + CompletableFuture output = (CompletableFuture) result + .getResult(); + return output; + } + throw result.exception; + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + error = e; + } + catch (CompletionException e) { + error = e.getCause(); + } + catch (ExecutionException e) { + error = e.getCause(); + } + catch (RetryException e) { + error = e.getCause(); + } + catch (Throwable e) { + error = e; + } + throw RetryTemplate.runtimeException(error); + } + +} \ No newline at end of file diff --git a/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java new file mode 100644 index 00000000..dea887cb --- /dev/null +++ b/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java @@ -0,0 +1,136 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryException; + +/** + * A {@link RetryResultProcessor} for a plain {@link Future}. If a {@link RetryCallback} + * returns a Future this processor can be used internally by the + * {@link RetryTemplate} to wrap it and process the result. + * + * @author Dave Syer + */ +public class FutureRetryResultProcessor implements RetryResultProcessor> { + + @Override + public Result> process(Future future, + Supplier>> supplier, Consumer handler) { + return new Result>(new FutureWrapper(future, supplier, handler)); + } + + private class FutureWrapper implements Future { + + private Future delegate; + + private Supplier>> supplier; + + private Consumer handler; + + FutureWrapper(Future delegate, Supplier>> supplier, + Consumer handler) { + this.delegate = delegate; + this.supplier = supplier; + this.handler = handler; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return this.delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return this.delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return this.delegate.isDone(); + } + + @Override + public Object get() throws InterruptedException, ExecutionException { + try { + return this.delegate.get(); + } + catch (ExecutionException e) { + return handle(e); + } + } + + @Override + public Object get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + try { + return this.delegate.get(timeout, unit); + } + catch (ExecutionException e) { + return handle(e, timeout, unit); + } + } + + private Object handle(ExecutionException throwable) { + return handle(throwable, -1, null); + } + + private Object handle(ExecutionException throwable, long timeout, TimeUnit unit) { + Throwable error = throwable.getCause(); + try { + this.handler.accept(error); + Result> result = this.supplier.get(); + if (result.isComplete()) { + if (timeout < 0) { + return result.getResult().get(); + } + else { + return result.getResult().get(timeout, unit); + } + } + throw result.exception; + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + error = e; + } + catch (CompletionException e) { + error = e.getCause(); + } + catch (ExecutionException e) { + error = e.getCause(); + } + catch (RetryException e) { + error = e.getCause(); + } + catch (Throwable e) { + error = e; + } + throw RetryTemplate.runtimeException(error); + } + + } + +} \ No newline at end of file diff --git a/src/main/java/org/springframework/retry/support/RetryResultProcessor.java b/src/main/java/org/springframework/retry/support/RetryResultProcessor.java new file mode 100644 index 00000000..1e192087 --- /dev/null +++ b/src/main/java/org/springframework/retry/support/RetryResultProcessor.java @@ -0,0 +1,62 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * @author Dave Syer + * @param the type of result from the retryable operation + */ +public interface RetryResultProcessor { + + Result process(T input, Supplier> supplier, Consumer handler); + + public static class Result { + + public Throwable exception; + + private T result; + + private boolean complete; + + public Result(Throwable exception) { + this.exception = exception; + this.complete = false; + } + + public Result(T result) { + this.result = result; + this.complete = true; + } + + boolean isComplete() { + return this.complete; + } + + public Throwable getException() { + return exception; + } + + public T getResult() { + return result; + } + + } + +} diff --git a/src/main/java/org/springframework/retry/support/RetryTemplate.java b/src/main/java/org/springframework/retry/support/RetryTemplate.java index cdf6908b..f5897745 100644 --- a/src/main/java/org/springframework/retry/support/RetryTemplate.java +++ b/src/main/java/org/springframework/retry/support/RetryTemplate.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.classify.Classifier; import org.springframework.retry.ExhaustedRetryException; import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryCallback; @@ -40,6 +41,7 @@ import org.springframework.retry.policy.MapRetryContextCache; import org.springframework.retry.policy.RetryContextCache; import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryResultProcessor.Result; /** * Template class that simplifies the execution of operations with retry semantics. @@ -94,6 +96,8 @@ public class RetryTemplate implements RetryOperations { private RetryContextCache retryContextCache = new MapRetryContextCache(); + private Classifier> processors = null; + private boolean throwLastExceptionOnExhausted; /** @@ -132,6 +136,16 @@ public void setRetryContextCache(RetryContextCache retryContextCache) { this.retryContextCache = retryContextCache; } + /** + * Public setter for the retry result processors (if any). Default null (same as + * empty). + * @param processors the processors to set + */ + public void setRetryResultProcessors( + Classifier> processors) { + this.processors = processors; + } + /** * Setter for listeners. The listeners are executed before and after a retry block * (i.e. before and after all the attempts), and on an error (every attempt). @@ -286,89 +300,143 @@ protected T doExecute(RetryCallback retryCallback } } - /* - * We allow the whole loop to be skipped if the policy or context already - * forbid the first try. This is used in the case of external retry to allow a - * recovery in handleRetryExhausted without the callback processing (which - * would throw an exception). - */ - while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { + Result result = loop(retryCallback, state, context, backOffContext); + if (result.isComplete()) { + return result.getResult(); + } + lastException = result.exception; + if (state == null && this.logger.isDebugEnabled()) { + this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount()); + } - try { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Retry: count=" + context.getRetryCount()); - } - // Reset the last exception, so if we are successful - // the close interceptors will not think we failed... - lastException = null; - return retryCallback.doWithRetry(context); - } - catch (Throwable e) { + exhausted = true; + return handleRetryExhausted(recoveryCallback, context, state); + + } + catch (Throwable e) { + lastException = e; + throw RetryTemplate.wrapIfNecessary(e); + } + finally { + close(retryPolicy, context, state, lastException == null || exhausted); + doCloseInterceptors(retryCallback, context, lastException); + RetrySynchronizationManager.clear(); + } - lastException = e; + } - try { - registerThrowable(retryPolicy, state, context, e); - } - catch (Exception ex) { - throw new TerminatedRetryException("Could not register throwable", ex); - } - finally { - doOnErrorInterceptors(retryCallback, context, e); - } + private Result safeLoop(RetryCallback retryCallback, + RetryState state, RetryContext context, BackOffContext backOffContext) { + try { + return loop(retryCallback, state, context, backOffContext); + } + catch (Throwable ex) { + throw runtimeException(ex); + } + } - if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { - try { - backOffPolicy.backOff(backOffContext); - } - catch (BackOffInterruptedException ex) { - lastException = e; - // back off was prevented by another thread - fail the retry - if (this.logger.isDebugEnabled()) { - this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount()); - } - throw ex; - } - } + private Result loop(RetryCallback retryCallback, + RetryState state, RetryContext context, BackOffContext backOffContext) + throws E { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Checking for rethrow: count=" + context.getRetryCount()); - } + Throwable lastException = null; - if (shouldRethrow(retryPolicy, context, state)) { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount()); - } - throw RetryTemplate.wrapIfNecessary(e); - } + /* + * We allow the whole loop to be skipped if the policy or context already forbid + * the first try. This is used in the case of external retry to allow a recovery + * in handleRetryExhausted without the callback processing (which would throw an + * exception). + */ + while (canRetry(this.retryPolicy, context) && !context.isExhaustedOnly()) { - } + try { - /* - * A stateful attempt that can retry may rethrow the exception before now, - * but if we get this far in a stateful retry there's a reason for it, - * like a circuit breaker or a rollback classifier. - */ - if (state != null && context.hasAttribute(GLOBAL_STATE)) { - break; + if (this.logger.isDebugEnabled()) { + this.logger.debug("Retry: count=" + context.getRetryCount()); + } + T result = retryCallback.doWithRetry(context); + if (result != null && this.processors != null) { + @SuppressWarnings("unchecked") + RetryResultProcessor processor = (RetryResultProcessor) this.processors + .classify(result); + if (processor != null) { + return processor.process(result, + () -> safeLoop(retryCallback, state, context, + backOffContext), + error -> safeHandleLoopException(retryCallback, state, + context, backOffContext, error)); + } } + return new Result<>(result); + } + catch (Throwable e) { - if (state == null && this.logger.isDebugEnabled()) { - this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount()); + lastException = e; + handleLoopException(retryCallback, state, context, backOffContext, e); + + } + /* + * A stateful attempt that can retry may rethrow the exception before now, but + * if we get this far in a stateful retry there's a reason for it, like a + * circuit breaker or a rollback classifier. + */ + if (state != null && context.hasAttribute(GLOBAL_STATE)) { + break; } + } + return new Result<>( + lastException == null ? context.getLastThrowable() : lastException); + } - exhausted = true; - return handleRetryExhausted(recoveryCallback, context, state); + private void safeHandleLoopException( + RetryCallback retryCallback, RetryState state, RetryContext context, + BackOffContext backOffContext, Throwable e) { + try { + handleLoopException(retryCallback, state, context, backOffContext, e); + } + catch (Throwable ex) { + throw runtimeException(ex); + } + } + private void handleLoopException( + RetryCallback retryCallback, RetryState state, RetryContext context, + BackOffContext backOffContext, Throwable e) throws E { + try { + registerThrowable(this.retryPolicy, state, context, e); } - catch (Throwable e) { - throw RetryTemplate.wrapIfNecessary(e); + catch (Exception ex) { + throw new TerminatedRetryException("Could not register throwable", ex); } finally { - close(retryPolicy, context, state, lastException == null || exhausted); - doCloseInterceptors(retryCallback, context, lastException); - RetrySynchronizationManager.clear(); + doOnErrorInterceptors(retryCallback, context, e); + } + + if (canRetry(this.retryPolicy, context) && !context.isExhaustedOnly()) { + try { + this.backOffPolicy.backOff(backOffContext); + } + catch (BackOffInterruptedException ex) { + // back off was prevented by another thread - fail the retry + if (this.logger.isDebugEnabled()) { + this.logger.debug("Abort retry because interrupted: count=" + + context.getRetryCount()); + } + throw ex; + } + } + + if (this.logger.isDebugEnabled()) { + this.logger.debug("Checking for rethrow: count=" + context.getRetryCount()); + } + + if (shouldRethrow(this.retryPolicy, context, state)) { + if (this.logger.isDebugEnabled()) { + this.logger.debug( + "Rethrow in retry for policy: count=" + context.getRetryCount()); + } + throw RetryTemplate.wrapIfNecessary(e); } } @@ -590,4 +658,24 @@ else if (throwable instanceof Exception) { } } + /** + * Re-throws the original throwable if it is an RuntimeException, and wraps + * non-exceptions into {@link RetryException}. + * @param throwable the input errror + * @return a RuntimeException if possible + * @throws RetryException if the throwable is checked + */ + static RuntimeException runtimeException(Throwable throwable) throws RetryException { + if (throwable instanceof Error) { + throw (Error) throwable; + } + else if (throwable instanceof RuntimeException) { + RuntimeException rethrow = (RuntimeException) throwable; + return rethrow; + } + else { + throw new RetryException("Exception in retry", throwable); + } + } + } diff --git a/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java b/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java new file mode 100644 index 00000000..dac67e42 --- /dev/null +++ b/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java @@ -0,0 +1,215 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; + +import org.springframework.classify.SubclassClassifier; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.backoff.BackOffContext; +import org.springframework.retry.backoff.BackOffInterruptedException; +import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author Dave Syer + */ +public class AsyncRetryTemplateTests { + + private RetryTemplate retryTemplate; + + @Before + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void init() { + this.retryTemplate = new RetryTemplate(); + Map, RetryResultProcessor> map = new HashMap<>(); + map.put(Future.class, new FutureRetryResultProcessor()); + map.put(CompletableFuture.class, new CompletableFutureRetryResultProcessor()); + SubclassClassifier processors = new SubclassClassifier(map, + (RetryResultProcessor) null); + this.retryTemplate.setRetryResultProcessors(processors); + } + + @Test + public void testSuccessfulRetryCompletable() throws Throwable { + for (int x = 1; x <= 10; x++) { + CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback(); + callback.setAttemptsBeforeSuccess(x); + SimpleRetryPolicy policy = new SimpleRetryPolicy(x); + this.retryTemplate.setRetryPolicy(policy); + CompletableFuture result = this.retryTemplate.execute(callback); + assertEquals(CompletableFutureRetryCallback.RESULT, + result.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals(x, callback.attempts); + } + } + + @Test + public void testSuccessfulRetryFuture() throws Throwable { + for (int x = 1; x <= 10; x++) { + FutureRetryCallback callback = new FutureRetryCallback(); + callback.setAttemptsBeforeSuccess(x); + SimpleRetryPolicy policy = new SimpleRetryPolicy(x); + this.retryTemplate.setRetryPolicy(policy); + Future result = this.retryTemplate.execute(callback); + assertEquals(FutureRetryCallback.RESULT, + result.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals(x, callback.attempts); + } + } + + @Test + public void testBackOffInvoked() throws Throwable { + for (int x = 1; x <= 10; x++) { + CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback(); + MockBackOffStrategy backOff = new MockBackOffStrategy(); + callback.setAttemptsBeforeSuccess(x); + SimpleRetryPolicy policy = new SimpleRetryPolicy(10); + this.retryTemplate.setRetryPolicy(policy); + this.retryTemplate.setBackOffPolicy(backOff); + CompletableFuture result = this.retryTemplate.execute(callback); + assertEquals(CompletableFutureRetryCallback.RESULT, + result.get(1000L, TimeUnit.MILLISECONDS)); + assertEquals(x, callback.attempts); + assertEquals(1, backOff.startCalls); + assertEquals(x - 1, backOff.backOffCalls); + } + } + + @Test + public void testNoSuccessRetry() throws Throwable { + CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback(); + // Something that won't be thrown by JUnit... + callback.setExceptionToThrow(new IllegalArgumentException()); + callback.setAttemptsBeforeSuccess(Integer.MAX_VALUE); + int retryAttempts = 2; + this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retryAttempts)); + try { + CompletableFuture result = this.retryTemplate.execute(callback); + result.get(1000L, TimeUnit.MILLISECONDS); + fail("Expected IllegalArgumentException"); + } + catch (ExecutionException e) { + assertTrue("Expected IllegalArgumentException", + e.getCause() instanceof IllegalArgumentException); + assertEquals(retryAttempts, callback.attempts); + return; + } + fail("Expected IllegalArgumentException"); + } + + private static class CompletableFutureRetryCallback + implements RetryCallback, Exception> { + + public static Object RESULT = new Object(); + + private int attempts; + + private int attemptsBeforeSuccess; + + private RuntimeException exceptionToThrow = new RuntimeException(); + + @Override + public CompletableFuture doWithRetry(RetryContext status) + throws Exception { + // !!!! Don't do this in real life - use a thread pool + return CompletableFuture.supplyAsync(() -> { + this.attempts++; + if (this.attempts < this.attemptsBeforeSuccess) { + throw this.exceptionToThrow; + } + return RESULT; + }); + } + + public void setAttemptsBeforeSuccess(int attemptsBeforeSuccess) { + this.attemptsBeforeSuccess = attemptsBeforeSuccess; + } + + public void setExceptionToThrow(RuntimeException exceptionToThrow) { + this.exceptionToThrow = exceptionToThrow; + } + + } + + private static class FutureRetryCallback + implements RetryCallback, Exception> { + + public static Object RESULT = new Object(); + + private int attempts; + + private int attemptsBeforeSuccess; + + private RuntimeException exceptionToThrow = new RuntimeException(); + + @Override + public Future doWithRetry(RetryContext status) throws Exception { + // !!!! Don't do this in real life - use a thread pool + return ForkJoinTask.adapt(() -> { + this.attempts++; + if (this.attempts < this.attemptsBeforeSuccess) { + throw this.exceptionToThrow; + } + return RESULT; + }).fork(); + } + + public void setAttemptsBeforeSuccess(int attemptsBeforeSuccess) { + this.attemptsBeforeSuccess = attemptsBeforeSuccess; + } + + } + + private static class MockBackOffStrategy implements BackOffPolicy { + + public int backOffCalls; + + public int startCalls; + + @Override + public BackOffContext start(RetryContext status) { + if (!status.hasAttribute(MockBackOffStrategy.class.getName())) { + this.startCalls++; + status.setAttribute(MockBackOffStrategy.class.getName(), true); + } + return null; + } + + @Override + public void backOff(BackOffContext backOffContext) + throws BackOffInterruptedException { + this.backOffCalls++; + } + + } + +} From ba0b6ed6fd826b52b5fc2877ed09367710b6bd1d Mon Sep 17 00:00:00 2001 From: Aleksandr Shamukov Date: Sat, 18 May 2019 15:45:50 +0300 Subject: [PATCH 2/3] Draft for async retry --- README.md | 42 ++ .../backoff/LastBackoffPeriodSupplier.java | 6 + .../retry/backoff/RememberPeriodSleeper.java | 22 + .../support/AsyncRetryResultProcessor.java | 84 ++++ ...CompletableFutureRetryResultProcessor.java | 87 ++-- .../support/FutureRetryResultProcessor.java | 147 ++++-- .../retry/support/RetryResultProcessor.java | 15 +- .../retry/support/RetryTemplate.java | 42 +- .../retry/support/RetryTemplateBuilder.java | 48 ++ .../retry/support/AbstractAsyncRetryTest.java | 227 +++++++++ .../retry/support/AsyncReschedulingTests.java | 435 ++++++++++++++++++ .../support/AsyncRetryTemplateTests.java | 152 ++---- .../support/StatefulRecoveryRetryTests.java | 3 +- 13 files changed, 1096 insertions(+), 214 deletions(-) create mode 100644 src/main/java/org/springframework/retry/backoff/LastBackoffPeriodSupplier.java create mode 100644 src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java create mode 100644 src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java create mode 100644 src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java create mode 100644 src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java diff --git a/README.md b/README.md index 29653150..19a0ed9f 100644 --- a/README.md +++ b/README.md @@ -551,6 +551,48 @@ The preceding example uses a default `RetryTemplate` inside the interceptor. To policies or listeners, you need only inject an instance of `RetryTemplate` into the interceptor. +## Asynchronous retry +### Terms +```java + +CompletableFuture> completableFuture = retryTemplate.execute( + ctx -> httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()) + ); +``` +- __async callback__ - a callback that returns one of the supported async types (CompletableFuture, Future). Usually, async retry callback is one, that does not perform a heavy work by itself, but schedules the work to some worker and returns an instance of async type to track the progress. Failure of async callback itself usually means failure of scheduling (but not of actual work). +Failure of async callback (of scheduling) and of actual job will both be retried on a common basis, according to configured policies. + +- __job__ - a task with payload, usually heavy, which result will be available through the instance of async type, returned by async callback (and, consequently, by _execute_ method) + +- __rescheduling executor__ - an instance of executor, used for scheduling a new retry attempt after a delay (provided by a backoff policy). The type of executor is restricted by ScheduledExecutorService, to take advantage of its "schedule after delay" feature, which allows us to implement backoff without blocking a thread. Rescheduling executor is used for all retries except of initial scheduling retries (initial invocation of async callback). + +### Initial invocation of async callback +Invocation of template.execute(asyncCallback) returns when first scheduling of job succeeded, or all initial scheduling attempts failed. Retry template does not produce async containers by itself, therefore there is nothing to return from _execute_ until initial invocation succeed. Backoffs between failing initial scheduling attempts will be performed by default sleeper by means of Thread.sleep() on caller thread. Why this approach is used: +- to be compatible with generic API of RetryOperations (where return type of callback equals to retrun type of execute(...)) +- to provide an additional mean of back pressure + +### Subsequent invocations of async callback +If the first execution of the _job_ failed and a retry is allowed by the policy, the next invocation of the async callback will be scheduled on _rescheduling executor_ + +### Async callbacks without executor +If executor is not provided, a backoff will be performed by Thread.sleep() on the client thread (for initial scheduling) or on the worker thread (for job failures, or for subsequent schedulings). + +### Configuration example +```java +RetryTemplate.builder() + // activte the async retry feature with an executor + .asyncRetry(Executors.newScheduledThreadPool(1)) + .fixedBackoff(1000) + .build(); + +RetryTemplate.builder() + // activte the async retry feature without an executor. + // Thread.sleep() will be used for backoff. + .asyncRetry() + .fixedBackoff(1000) + .build(); +``` + ## Contributing Spring Retry is released under the non-restrictive Apache 2.0 license diff --git a/src/main/java/org/springframework/retry/backoff/LastBackoffPeriodSupplier.java b/src/main/java/org/springframework/retry/backoff/LastBackoffPeriodSupplier.java new file mode 100644 index 00000000..0487dd61 --- /dev/null +++ b/src/main/java/org/springframework/retry/backoff/LastBackoffPeriodSupplier.java @@ -0,0 +1,6 @@ +package org.springframework.retry.backoff; + +import java.util.function.Supplier; + +public interface LastBackoffPeriodSupplier extends Supplier { +} diff --git a/src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java b/src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java new file mode 100644 index 00000000..4a61d369 --- /dev/null +++ b/src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java @@ -0,0 +1,22 @@ +package org.springframework.retry.backoff; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class RememberPeriodSleeper implements Sleeper, LastBackoffPeriodSupplier { + + private static final Log logger = LogFactory.getLog(RememberPeriodSleeper.class); + + private volatile Long lastBackoffPeriod; + + @Override + public void sleep(long backOffPeriod) { + logger.debug("Remembering a sleeping period instead of sleeping: " + backOffPeriod); + lastBackoffPeriod = backOffPeriod; + } + + @Override + public Long get() { + return lastBackoffPeriod; + } +} diff --git a/src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java new file mode 100644 index 00000000..ab0b1b1a --- /dev/null +++ b/src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java @@ -0,0 +1,84 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryException; +import org.springframework.retry.backoff.LastBackoffPeriodSupplier; + +/** + * @author Dave Syer + */ +public abstract class AsyncRetryResultProcessor implements RetryResultProcessor { + private static final Log logger = LogFactory.getLog(AsyncRetryResultProcessor.class); + + protected T doNewAttempt(Supplier> supplier) throws Throwable { + logger.debug("Performing the next async callback invocation..."); + return supplier.get().getOrThrow(); + } + + protected abstract T scheduleNewAttemptAfterDelay( + Supplier> supplier, + ScheduledExecutorService reschedulingExecutor, + long rescheduleAfterMillis, + RetryContext ctx + ) throws Throwable; + + protected T handleException(Supplier> supplier, + Consumer handler, + Throwable throwable, + ScheduledExecutorService reschedulingExecutor, + LastBackoffPeriodSupplier lastBackoffPeriodSupplier, + RetryContext ctx) { + try { + handler.accept(unwrapIfNeed(throwable)); + + if (reschedulingExecutor == null || lastBackoffPeriodSupplier == null) { + return doNewAttempt(supplier); + } else { + long rescheduleAfterMillis = lastBackoffPeriodSupplier.get(); + logger.debug("Scheduling a next retry with a delay = " + rescheduleAfterMillis + " millis..."); + return scheduleNewAttemptAfterDelay(supplier, reschedulingExecutor, rescheduleAfterMillis, ctx); + } + } + catch (Throwable t) { + throw RetryTemplate.runtimeException(unwrapIfNeed(t)); + } + } + + static Throwable unwrapIfNeed(Throwable throwable) { + if (throwable instanceof ExecutionException + || throwable instanceof CompletionException + || throwable instanceof RetryException) { + return throwable.getCause(); + } else { + return throwable; + } + } +} diff --git a/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java index 3f2f4312..a444f8d6 100644 --- a/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java +++ b/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java @@ -19,12 +19,18 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; import org.springframework.retry.RetryException; +import org.springframework.retry.backoff.LastBackoffPeriodSupplier; /** * A {@link RetryResultProcessor} for a {@link CompletableFuture}. If a @@ -33,58 +39,47 @@ * * @author Dave Syer */ -public class CompletableFutureRetryResultProcessor - implements RetryResultProcessor> { +public class CompletableFutureRetryResultProcessor + extends AsyncRetryResultProcessor> { + + protected final Log logger = LogFactory.getLog(getClass()); @Override - public Result> process(CompletableFuture completable, - Supplier>> supplier, - Consumer handler) { - @SuppressWarnings("unchecked") - CompletableFuture typed = (CompletableFuture) completable; - CompletableFuture handle = typed - .thenApply(value -> CompletableFuture.completedFuture(value)) - .exceptionally(throwable -> apply(supplier, handler, throwable)) + public Result> process(CompletableFuture completable, + Supplier>> supplier, + Consumer handler, ScheduledExecutorService reschedulingExecutor, + LastBackoffPeriodSupplier lastBackoffPeriodSupplier, + RetryContext ctx) { + + CompletableFuture handle = completable + .thenApply(CompletableFuture::completedFuture) + .exceptionally(throwable -> handleException( + supplier, handler, throwable, reschedulingExecutor, lastBackoffPeriodSupplier, ctx) + ) .thenCompose(Function.identity()); + return new Result<>(handle); } - private CompletableFuture apply( - Supplier>> supplier, Consumer handler, - Throwable throwable) { - Throwable error = throwable; - try { - if (throwable instanceof ExecutionException - || throwable instanceof CompletionException) { - error = throwable.getCause(); - } - handler.accept(error); - Result> result = supplier.get(); - if (result.isComplete()) { - @SuppressWarnings("unchecked") - CompletableFuture output = (CompletableFuture) result - .getResult(); - return output; + protected CompletableFuture scheduleNewAttemptAfterDelay( + Supplier>> supplier, + ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, + RetryContext ctx) + { + CompletableFuture> futureOfFurtherScheduling = new CompletableFuture<>(); + + reschedulingExecutor.schedule(() -> { + try { + RetrySynchronizationManager.register(ctx); + futureOfFurtherScheduling.complete(doNewAttempt(supplier)); + } catch (Throwable t) { + futureOfFurtherScheduling.completeExceptionally(t); + throw RetryTemplate.runtimeException(t); + } finally { + RetrySynchronizationManager.clear(); } - throw result.exception; - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - error = e; - } - catch (CompletionException e) { - error = e.getCause(); - } - catch (ExecutionException e) { - error = e.getCause(); - } - catch (RetryException e) { - error = e.getCause(); - } - catch (Throwable e) { - error = e; - } - throw RetryTemplate.runtimeException(error); - } + }, rescheduleAfterMillis, TimeUnit.MILLISECONDS); + return futureOfFurtherScheduling.thenCompose(Function.identity()); + } } \ No newline at end of file diff --git a/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java index dea887cb..3a9534ea 100644 --- a/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java +++ b/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java @@ -16,45 +16,79 @@ package org.springframework.retry.support; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; import org.springframework.retry.RetryCallback; -import org.springframework.retry.RetryException; +import org.springframework.retry.RetryContext; +import org.springframework.retry.backoff.LastBackoffPeriodSupplier; /** + * todo: check or remove after discussion + * * A {@link RetryResultProcessor} for a plain {@link Future}. If a {@link RetryCallback} * returns a Future this processor can be used internally by the * {@link RetryTemplate} to wrap it and process the result. - * + * * @author Dave Syer */ -public class FutureRetryResultProcessor implements RetryResultProcessor> { +public class FutureRetryResultProcessor extends AsyncRetryResultProcessor> { + + @Override + public Result> process(Future future, + Supplier>> supplier, Consumer handler, + ScheduledExecutorService reschedulingExecutor, LastBackoffPeriodSupplier lastBackoffPeriodSupplier, + RetryContext ctx) { + return new Result<>(new FutureWrapper(future, supplier, handler, this, reschedulingExecutor, + lastBackoffPeriodSupplier, ctx)); + } @Override - public Result> process(Future future, - Supplier>> supplier, Consumer handler) { - return new Result>(new FutureWrapper(future, supplier, handler)); + protected Future scheduleNewAttemptAfterDelay(Supplier>> supplier, + ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx) + throws Throwable + { + ScheduledFuture> scheduledFuture = reschedulingExecutor.schedule(() -> { + try { + return doNewAttempt(supplier); + } catch (Throwable t) { + throw RetryTemplate.runtimeException(t); + } + }, rescheduleAfterMillis, TimeUnit.MILLISECONDS); + + return new FutureFlatter(scheduledFuture); } - private class FutureWrapper implements Future { + private class FutureWrapper implements Future { - private Future delegate; + private Future delegate; - private Supplier>> supplier; + private Supplier>> supplier; private Consumer handler; - - FutureWrapper(Future delegate, Supplier>> supplier, - Consumer handler) { + private AsyncRetryResultProcessor> processor; + private final ScheduledExecutorService reschedulingExecutor; + private final LastBackoffPeriodSupplier lastBackoffPeriodSupplier; + private RetryContext ctx; + + FutureWrapper(Future delegate, Supplier>> supplier, + Consumer handler, AsyncRetryResultProcessor> processor, + ScheduledExecutorService reschedulingExecutor, LastBackoffPeriodSupplier lastBackoffPeriodSupplier, + RetryContext ctx) { this.delegate = delegate; this.supplier = supplier; this.handler = handler; + this.processor = processor; + this.reschedulingExecutor = reschedulingExecutor; + this.lastBackoffPeriodSupplier = lastBackoffPeriodSupplier; + this.ctx = ctx; } @Override @@ -73,62 +107,79 @@ public boolean isDone() { } @Override - public Object get() throws InterruptedException, ExecutionException { + public V get() throws InterruptedException, ExecutionException { try { return this.delegate.get(); } - catch (ExecutionException e) { - return handle(e); + catch (Throwable e) { + return processor.handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx) + .get(); } } @Override - public Object get(long timeout, TimeUnit unit) + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { try { return this.delegate.get(timeout, unit); } - catch (ExecutionException e) { - return handle(e, timeout, unit); + catch (Throwable e) { + return processor.handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx) + .get(timeout, unit); } } - private Object handle(ExecutionException throwable) { - return handle(throwable, -1, null); + } + + private class FutureFlatter implements Future { + + private Future> nestedFuture; + + FutureFlatter(Future> nestedFuture) { + this.nestedFuture = nestedFuture; } - private Object handle(ExecutionException throwable, long timeout, TimeUnit unit) { - Throwable error = throwable.getCause(); + @Override + public boolean cancel(boolean mayInterruptIfRunning) { try { - this.handler.accept(error); - Result> result = this.supplier.get(); - if (result.isComplete()) { - if (timeout < 0) { - return result.getResult().get(); - } - else { - return result.getResult().get(timeout, unit); - } - } - throw result.exception; - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - error = e; - } - catch (CompletionException e) { - error = e.getCause(); + if (this.nestedFuture.isDone()) { + return this.nestedFuture.get().cancel(mayInterruptIfRunning); + } else { + return this.nestedFuture.cancel(mayInterruptIfRunning); } - catch (ExecutionException e) { - error = e.getCause(); + } catch (Throwable t) { + throw RetryTemplate.runtimeException(t); } - catch (RetryException e) { - error = e.getCause(); + } + + @Override + public boolean isCancelled() { + try { + return this.nestedFuture.isCancelled() + || (this.nestedFuture.isDone() && this.nestedFuture.get().isCancelled()); + } catch (Throwable t) { + throw RetryTemplate.runtimeException(t); } - catch (Throwable e) { - error = e; + } + + @Override + public boolean isDone() { + try { + return this.nestedFuture.isDone() && this.nestedFuture.get().isDone(); + } catch (Throwable t) { + throw RetryTemplate.runtimeException(t); } - throw RetryTemplate.runtimeException(error); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return this.nestedFuture.get().get(); + } + + @Override + public V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return this.nestedFuture.get(timeout, unit).get(timeout, unit); } } diff --git a/src/main/java/org/springframework/retry/support/RetryResultProcessor.java b/src/main/java/org/springframework/retry/support/RetryResultProcessor.java index 1e192087..dce60354 100644 --- a/src/main/java/org/springframework/retry/support/RetryResultProcessor.java +++ b/src/main/java/org/springframework/retry/support/RetryResultProcessor.java @@ -16,16 +16,23 @@ package org.springframework.retry.support; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import java.util.function.Supplier; +import org.springframework.retry.RetryContext; +import org.springframework.retry.backoff.LastBackoffPeriodSupplier; + /** * @author Dave Syer * @param the type of result from the retryable operation */ public interface RetryResultProcessor { - Result process(T input, Supplier> supplier, Consumer handler); + Result process(T input, Supplier> supplier, Consumer handler, + ScheduledExecutorService reschedulingExecutor, + LastBackoffPeriodSupplier lastBackoffPeriodSupplier, + RetryContext ctx); public static class Result { @@ -57,6 +64,12 @@ public T getResult() { return result; } + public T getOrThrow() throws Throwable { + if (isComplete()) { + return result; + } + throw exception; + } } } diff --git a/src/main/java/org/springframework/retry/support/RetryTemplate.java b/src/main/java/org/springframework/retry/support/RetryTemplate.java index f5897745..025040e1 100644 --- a/src/main/java/org/springframework/retry/support/RetryTemplate.java +++ b/src/main/java/org/springframework/retry/support/RetryTemplate.java @@ -19,6 +19,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,11 +40,15 @@ import org.springframework.retry.backoff.BackOffContext; import org.springframework.retry.backoff.BackOffInterruptedException; import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.backoff.LastBackoffPeriodSupplier; import org.springframework.retry.backoff.NoBackOffPolicy; +import org.springframework.retry.backoff.RememberPeriodSleeper; +import org.springframework.retry.backoff.SleepingBackOffPolicy; import org.springframework.retry.policy.MapRetryContextCache; import org.springframework.retry.policy.RetryContextCache; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryResultProcessor.Result; +import org.springframework.util.Assert; /** * Template class that simplifies the execution of operations with retry semantics. @@ -90,15 +97,19 @@ public class RetryTemplate implements RetryOperations { private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy(); + private volatile LastBackoffPeriodSupplier lastBackoffPeriodSupplier = null; + private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3); private volatile RetryListener[] listeners = new RetryListener[0]; - private RetryContextCache retryContextCache = new MapRetryContextCache(); + private volatile RetryContextCache retryContextCache = new MapRetryContextCache(); + + private volatile Classifier> processors = null; - private Classifier> processors = null; + private volatile ScheduledExecutorService reschedulingExecutor = null; - private boolean throwLastExceptionOnExhausted; + private volatile boolean throwLastExceptionOnExhausted; /** * Main entry point to configure RetryTemplate using fluent API. See @@ -156,6 +167,11 @@ public void setListeners(RetryListener[] listeners) { this.listeners = Arrays.asList(listeners).toArray(new RetryListener[listeners.length]); } + public void setReschedulingExecutor(ScheduledExecutorService reschedulingExecutor) { + this.reschedulingExecutor = reschedulingExecutor; + this.backOffPolicy = replaceSleeperIfNeed(backOffPolicy); + } + /** * Register an additional listener. * @param listener the {@link RetryListener} @@ -172,7 +188,18 @@ public void registerListener(RetryListener listener) { * @param backOffPolicy the {@link BackOffPolicy} */ public void setBackOffPolicy(BackOffPolicy backOffPolicy) { - this.backOffPolicy = backOffPolicy; + this.backOffPolicy = replaceSleeperIfNeed(backOffPolicy); + } + + private BackOffPolicy replaceSleeperIfNeed(BackOffPolicy backOffPolicy) { + if (reschedulingExecutor != null && backOffPolicy instanceof SleepingBackOffPolicy) { + this.logger.debug("Replacing the default sleeper by RememberPeriodSleeper to enable scheduler-based backoff."); + RememberPeriodSleeper rememberPeriodSleeper = new RememberPeriodSleeper(); + lastBackoffPeriodSupplier = rememberPeriodSleeper; + return ((SleepingBackOffPolicy) backOffPolicy).withSleeper(rememberPeriodSleeper); + } else { + return backOffPolicy; + } } /** @@ -364,7 +391,9 @@ private Result loop(RetryCallback retryCallbac () -> safeLoop(retryCallback, state, context, backOffContext), error -> safeHandleLoopException(retryCallback, state, - context, backOffContext, error)); + context, backOffContext, error), reschedulingExecutor, + lastBackoffPeriodSupplier, + context); } } return new Result<>(result); @@ -482,6 +511,7 @@ protected void registerThrowable(RetryPolicy retryPolicy, RetryState state, Retr registerContext(context, state); } + // есть стейт, есть ключ, сохраняем данный контекст в кэш: ключ -> контекст private void registerContext(RetryContext context, RetryState state) { if (state != null) { Object key = state.getKey(); @@ -665,7 +695,7 @@ else if (throwable instanceof Exception) { * @return a RuntimeException if possible * @throws RetryException if the throwable is checked */ - static RuntimeException runtimeException(Throwable throwable) throws RetryException { + public static RuntimeException runtimeException(Throwable throwable) throws RetryException { if (throwable instanceof Error) { throw (Error) throwable; } diff --git a/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java b/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java index dd5bdbad..cfcc5902 100644 --- a/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java +++ b/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java @@ -16,10 +16,17 @@ package org.springframework.retry.support; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; import org.springframework.classify.BinaryExceptionClassifier; import org.springframework.classify.BinaryExceptionClassifierBuilder; +import org.springframework.classify.SubclassClassifier; import org.springframework.retry.RetryListener; import org.springframework.retry.RetryPolicy; import org.springframework.retry.backoff.BackOffPolicy; @@ -27,6 +34,7 @@ import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy; import org.springframework.retry.backoff.FixedBackOffPolicy; import org.springframework.retry.backoff.NoBackOffPolicy; +import org.springframework.retry.backoff.SleepingBackOffPolicy; import org.springframework.retry.backoff.UniformRandomBackOffPolicy; import org.springframework.retry.policy.AlwaysRetryPolicy; import org.springframework.retry.policy.BinaryExceptionClassifierRetryPolicy; @@ -94,6 +102,10 @@ public class RetryTemplateBuilder { private BinaryExceptionClassifierBuilder classifierBuilder; + private ScheduledExecutorService executorService; + + private Map, RetryResultProcessor> processors = new HashMap<>(); + /* ---------------- Configure retry policy -------------- */ /** @@ -356,6 +368,28 @@ public RetryTemplateBuilder withListeners(List listeners) { return this; } + /* ---------------- Async -------------- */ + + public RetryTemplateBuilder asyncRetry(ScheduledExecutorService reschedulingExecutor) { + this.executorService = reschedulingExecutor; + return asyncRetry(); + } + + /** + * Enable async retry feature. + * Due to no rescheduling executor is provided, a potential backoff will be performed + * by Thread.sleep(). + */ + public RetryTemplateBuilder asyncRetry() { + // todo: support interface classification (does not work yet) + this.processors.put(Future.class, new FutureRetryResultProcessor<>()); + this.processors.put(FutureTask.class, new FutureRetryResultProcessor<>()); + this.processors.put(CompletableFuture.class, new CompletableFutureRetryResultProcessor()); + + //todo + return this; + } + /* ---------------- Building -------------- */ /** @@ -399,6 +433,20 @@ public RetryTemplate build() { retryTemplate.setListeners(this.listeners.toArray(new RetryListener[0])); } + // Scheduler + if (this.executorService != null) { + retryTemplate.setReschedulingExecutor(executorService); + + Assert.isTrue(backOffPolicy instanceof SleepingBackOffPolicy, + "Usage of a rescheduling executor makes sense " + + "only with an instance of SleepingBackOffPolicy" + ); + } + + SubclassClassifier> classifier = + new SubclassClassifier<>(processors, null); + retryTemplate.setRetryResultProcessors(classifier); + return retryTemplate; } diff --git a/src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java b/src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java new file mode 100644 index 00000000..ae5bb34e --- /dev/null +++ b/src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java @@ -0,0 +1,227 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.backoff.BackOffContext; +import org.springframework.retry.backoff.BackOffInterruptedException; +import org.springframework.retry.backoff.BackOffPolicy; +import org.springframework.retry.backoff.RememberPeriodSleeper; +import org.springframework.retry.backoff.Sleeper; + +import static org.junit.Assert.assertTrue; +import static org.springframework.retry.util.test.TestUtils.getPropertyValue; + +/** + * @author Dave Syer + */ +public class AbstractAsyncRetryTest { + + /* ---------------- Async callbacks implementations for different types -------------- */ + + static class CompletableFutureRetryCallback + extends AbstractRetryCallback> { + + @Override + public CompletableFuture schedule(Supplier callback, ExecutorService workerExecutor) { + return CompletableFuture.supplyAsync(callback, workerExecutor); + } + + @Override + Object awaitItself(CompletableFuture asyncType) { + return asyncType.join(); + } + } + + static class FutureRetryCallback + extends AbstractRetryCallback> { + + @Override + public Future schedule(Supplier callback, ExecutorService executor) { + return executor.submit(callback::get); + } + + @Override + Object awaitItself(Future asyncType) throws Throwable { + return asyncType.get(); + } + } + + static abstract class AbstractRetryCallback + implements RetryCallback { + + final Object defaultResult = new Object(); + final Log logger = LogFactory.getLog(getClass()); + + final AtomicInteger jobAttempts = new AtomicInteger(); + final AtomicInteger schedulingAttempts = new AtomicInteger(); + + volatile int attemptsBeforeSchedulingSuccess; + volatile int attemptsBeforeJobSuccess; + + volatile RuntimeException exceptionToThrow = new RuntimeException(); + + volatile Function resultSupplier = ctx -> defaultResult; + volatile Consumer customCodeBeforeScheduling = ctx -> {}; + + final List schedulerThreadNames = new CopyOnWriteArrayList<>(); + final List invocationMoments = new CopyOnWriteArrayList<>(); + + final ExecutorService workerExecutor = Executors.newSingleThreadExecutor( + getNamedThreadFactory(WORKER_THREAD_NAME) + ); + + public abstract A schedule(Supplier callback, ExecutorService executor); + + abstract Object awaitItself(A asyncType) throws Throwable; + + @Override + public A doWithRetry(RetryContext ctx) + throws Exception { + rememberThreadName(); + rememberInvocationMoment(); + + throwIfSchedulingTooEarly(); + + customCodeBeforeScheduling.accept(ctx); + + return schedule(() -> { + try { + // a hack to avoid running CompletableFuture#thenApplyAsync in the caller thread + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + throwIfJobTooEarly(); + logger.debug("Succeeding the callback..."); + return resultSupplier.apply(ctx); + }, workerExecutor); + } + + void rememberInvocationMoment() { + invocationMoments.add(System.currentTimeMillis()); + } + + void rememberThreadName() { + schedulerThreadNames.add(Thread.currentThread().getName()); + } + + void throwIfJobTooEarly() { + if (this.jobAttempts.incrementAndGet() < this.attemptsBeforeJobSuccess) { + logger.debug("Failing job..."); + throw this.exceptionToThrow; + } + } + + void throwIfSchedulingTooEarly() { + if (this.schedulingAttempts.incrementAndGet() < this.attemptsBeforeSchedulingSuccess) { + logger.debug("Failing scheduling..."); + throw this.exceptionToThrow; + } + } + + void setAttemptsBeforeJobSuccess(int attemptsBeforeJobSuccess) { + this.attemptsBeforeJobSuccess = attemptsBeforeJobSuccess; + } + + void setAttemptsBeforeSchedulingSuccess(int attemptsBeforeSchedulingSuccess) { + this.attemptsBeforeSchedulingSuccess = attemptsBeforeSchedulingSuccess; + } + + void setExceptionToThrow(RuntimeException exceptionToThrow) { + this.exceptionToThrow = exceptionToThrow; + } + + void setResultSupplier(Function resultSupplier) { + this.resultSupplier = resultSupplier; + } + + void setCustomCodeBeforeScheduling(Consumer customCodeBeforeScheduling) { + this.customCodeBeforeScheduling = customCodeBeforeScheduling; + } + } + + + static class MockBackOffStrategy implements BackOffPolicy { + + public int backOffCalls; + + public int startCalls; + + @Override + public BackOffContext start(RetryContext status) { + if (!status.hasAttribute(MockBackOffStrategy.class.getName())) { + this.startCalls++; + status.setAttribute(MockBackOffStrategy.class.getName(), true); + } + return null; + } + + @Override + public void backOff(BackOffContext backOffContext) + throws BackOffInterruptedException { + this.backOffCalls++; + } + + } + + /* ---------------- Utilities -------------- */ + + static final String SCHEDULER_THREAD_NAME = "scheduler"; + static final String WORKER_THREAD_NAME = "worker"; + + static ScheduledExecutorService getNamedScheduledExecutor() { + return Executors.newScheduledThreadPool( + 1, + getNamedThreadFactory(AbstractAsyncRetryTest.SCHEDULER_THREAD_NAME) + ); + } + + static ThreadFactory getNamedThreadFactory(String threadName) { + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setName(threadName); + return thread; + } + }; + } + + void assertRememberingSleeper(RetryTemplate template) { + // The sleeper of the backoff policy should be an instance of RememberPeriodSleeper, means not Thread.sleep() + BackOffPolicy backOffPolicy = getPropertyValue(template, "backOffPolicy", BackOffPolicy.class); + Sleeper sleeper = getPropertyValue(backOffPolicy, "sleeper", Sleeper.class); + assertTrue(sleeper instanceof RememberPeriodSleeper); + } +} diff --git a/src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java b/src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java new file mode 100644 index 00000000..a5c01195 --- /dev/null +++ b/src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java @@ -0,0 +1,435 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.retry.support; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class AsyncReschedulingTests extends AbstractAsyncRetryTest { + + /** + * Scheduling retry + job immediate success. + * + * - async callback succeeds at 3rd attempt + * - actual job succeeds on 1st attempt + * - no backoff + */ + + @Test + public void testInitialSchedulingEventualSuccessCF() throws Throwable { + doTestInitialSchedulingEventualSuccess(new CompletableFutureRetryCallback()); + } + + @Test + public void testInitialSchedulingEventualSuccessF() throws Throwable { + doTestInitialSchedulingEventualSuccess(new FutureRetryCallback()); + } + + private void doTestInitialSchedulingEventualSuccess(AbstractRetryCallback callback) throws Throwable { + RetryTemplate template = RetryTemplate.builder() + .maxAttempts(5) + .noBackoff() + .asyncRetry() + .build(); + + callback.setAttemptsBeforeSchedulingSuccess(3); + callback.setAttemptsBeforeJobSuccess(1); + assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); + + // All invocations before first successful scheduling should be performed by the caller thread + assertEquals(Collections.nCopies(3, Thread.currentThread().getName()), callback.schedulerThreadNames); + + assertEquals(1, callback.jobAttempts.get()); + } + + /** + * Immediate success of both scheduling and job. + * + * - async callback, that does not fail itself + * - actual job succeeds on 1st attempt + * - backoff is not necessary + */ + + @Test + public void testImmediateSuccessCF() throws Throwable { + doTestImmediateSuccess(new CompletableFutureRetryCallback()); + } + + @Test + public void testImmediateSuccessF() throws Throwable { + doTestImmediateSuccess(new FutureRetryCallback()); + } + + private void doTestImmediateSuccess(AbstractRetryCallback callback) throws Throwable { + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + + RetryTemplate template = RetryTemplate.builder() + .fixedBackoff(10000) + .asyncRetry(executor) + .build(); + + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(1); + assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); + + // Single invocation should be performed by the caller thread + assertEquals(Collections.singletonList(Thread.currentThread().getName()), callback.schedulerThreadNames); + + assertEquals(1, callback.jobAttempts.get()); + + // No interaction with the rescheduling executor should be performed if the first execution of the job succeeds. + verifyZeroInteractions(executor); + } + + /** + * Async retry with rescheduler. + * + * - async callback, that does not fail itself + * - actual job succeeds on 3rd attempt + * - backoff is performed using executor, without Thread.sleep() + */ + + @Test + public void testAsyncRetryWithReschedulerCF() throws Throwable { + doTestAsyncRetryWithRescheduler(new CompletableFutureRetryCallback()); + } + + @Test + public void testAsyncRetryWithReschedulerF() throws Throwable { + doTestAsyncRetryWithRescheduler(new FutureRetryCallback()); + } + + private void doTestAsyncRetryWithRescheduler(AbstractRetryCallback callback) throws Throwable { + + int targetFixedBackoff = 150; + + ScheduledExecutorService executor = getNamedScheduledExecutor(); + + RetryTemplate template = RetryTemplate.builder() + .maxAttempts(4) + .fixedBackoff(targetFixedBackoff) + .asyncRetry(executor) + .build(); + + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(3); + assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); + assertEquals(3, callback.jobAttempts.get()); + + // All invocations after the first successful scheduling should be performed by the the rescheduler thread + assertEquals(Arrays.asList( + Thread.currentThread().getName(), + SCHEDULER_THREAD_NAME, + SCHEDULER_THREAD_NAME + ), callback.schedulerThreadNames); + + assertRememberingSleeper(template); + + // Expected backoff should be performed + List moments = callback.invocationMoments; + for (int i = 0; i < moments.size() - 1; i++) { + long approxBackoff = moments.get(i + 1) - moments.get(i); + assertTrue(approxBackoff > targetFixedBackoff); + } + } + + /** + * Async retry without backoff + * + * - async callback succeeds on 2nd attempt + * - actual job succeeds on 3nd attempt + * - default zero backoff is used (which has no sleeper at all), + * and therefore rescheduler executor is not used at all + */ + + @Test + public void testAsyncRetryWithoutBackoffCF() throws Throwable { + doTestAsyncRetryWithoutBackoff(new CompletableFutureRetryCallback()); + } + + // todo: problem: a Future can start retrying only when user calls get(). Consider to not support Future at all. + /*@Test + public void testAsyncRetryWithoutBackoffF() throws Throwable { + doTestAsyncRetryWithoutBackoff(new FutureRetryCallback()); + }*/ + + private void doTestAsyncRetryWithoutBackoff(AbstractRetryCallback callback) throws Throwable { + RetryTemplate template = RetryTemplate.builder() + .maxAttempts(4) + .asyncRetry() + .build(); + + callback.setAttemptsBeforeSchedulingSuccess(2); + callback.setAttemptsBeforeJobSuccess(3); + assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); + assertEquals(4, callback.schedulingAttempts.get()); + assertEquals(3, callback.jobAttempts.get()); + + // All invocations after the first successful scheduling should be performed by the + // the worker thread (because not backoff and no rescheduler thread) + assertEquals(Arrays.asList( + Thread.currentThread().getName(), + Thread.currentThread().getName(), + WORKER_THREAD_NAME, + WORKER_THREAD_NAME + ), callback.schedulerThreadNames); + } + + /** + * Exhausted on scheduling retries + */ + + @Test + public void testExhaustOnSchedulingCF() throws Throwable { + doTestExhaustOnScheduling(new CompletableFutureRetryCallback()); + } + + @Test + public void testExhaustOnSchedulingF() throws Throwable { + doTestExhaustOnScheduling(new FutureRetryCallback()); + } + + private void doTestExhaustOnScheduling(AbstractRetryCallback callback) throws Throwable { + RetryTemplate template = RetryTemplate.builder() + .maxAttempts(2) + .asyncRetry() + .fixedBackoff(100) + .build(); + + callback.setAttemptsBeforeSchedulingSuccess(5); + callback.setAttemptsBeforeJobSuccess(5); + + try { + callback.awaitItself(template.execute(callback)); + fail("An exception should be thrown above"); + } catch (Exception e) { + assertSame(e, callback.exceptionToThrow); + } + + assertEquals(Arrays.asList( + Thread.currentThread().getName(), + Thread.currentThread().getName() + ), callback.schedulerThreadNames); + } + + /** + * Exhausted on job retries + */ + + @Test + public void testExhaustOnJobWithReschedulerCF() throws Throwable { + doTestExhaustOnJobWithRescheduler(new CompletableFutureRetryCallback()); + } + + @Test + public void testExhaustOnJobWithReschedulerF() throws Throwable { + doTestExhaustOnJobWithRescheduler(new FutureRetryCallback()); + } + + private void doTestExhaustOnJobWithRescheduler(AbstractRetryCallback callback) throws Throwable { + RetryTemplate template = RetryTemplate.builder() + .maxAttempts(5) + .asyncRetry(getNamedScheduledExecutor()) + .exponentialBackoff(10, 2, 100) + .build(); + + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(6); + + try { + Object v = callback.awaitItself(template.execute(callback)); + fail("An exception should be thrown above"); + // Single wrapping by CompletionException is expected by CompletableFuture contract + } catch (Exception ce) { + assertSame(ce.getCause(), callback.exceptionToThrow); + } + + assertEquals(Arrays.asList( + Thread.currentThread().getName(), + SCHEDULER_THREAD_NAME, + SCHEDULER_THREAD_NAME, + SCHEDULER_THREAD_NAME, + SCHEDULER_THREAD_NAME + ), callback.schedulerThreadNames); + } + + // todo: rejected execution + // todo: interrupt executor + // rethrow not too late + + + /* + * Nested rescheduling + */ + + @Test + public void testNested() throws Throwable { + ScheduledExecutorService executor = getNamedScheduledExecutor(); + + RetryTemplate outerTemplate = RetryTemplate.builder() + .infiniteRetry() + .asyncRetry(executor) + .fixedBackoff(10) + .build(); + + RetryTemplate innerTemplate = RetryTemplate.builder() + .infiniteRetry() + .asyncRetry(executor) + .fixedBackoff(10) + .build(); + + CompletableFutureRetryCallback innerCallback = new CompletableFutureRetryCallback(); + innerCallback.setAttemptsBeforeSchedulingSuccess(3); + innerCallback.setAttemptsBeforeJobSuccess(3); + innerCallback.setCustomCodeBeforeScheduling(ctx -> { + // The current context should be available via RetrySynchronizationManager while scheduling + // (withing user's async callback itself) + assertEquals(ctx, RetrySynchronizationManager.getContext()); + + // We have no control over user's worker thread, so we can not implicitly set/get the parent + // context via RetrySynchronizationManager. + assertNull(ctx.getParent()); + }); + innerCallback.setResultSupplier(ctx -> { + // There is no way to implicitly pass the context into the worker thread, because the worker executor, + // thread and callback are fully controlled by the user. The retry engine deals with only + // scheduling/rescheduling and their result (e.g. CompletableFuture) + assertNull(RetrySynchronizationManager.getContext()); + + return innerCallback.defaultResult; + }); + + CompletableFutureRetryCallback outerCallback = new CompletableFutureRetryCallback(); + outerCallback.setAttemptsBeforeSchedulingSuccess(3); + outerCallback.setAttemptsBeforeJobSuccess(3); + outerCallback.setCustomCodeBeforeScheduling(ctx -> { + // The current context should be available via RetrySynchronizationManager while scheduling + // (withing user's async callback itself) + assertEquals(ctx, RetrySynchronizationManager.getContext()); + }); + outerCallback.setResultSupplier(ctx -> { + try { + assertNull(RetrySynchronizationManager.getContext()); + CompletableFuture innerResultFuture = innerTemplate.execute(innerCallback); + assertNull(RetrySynchronizationManager.getContext()); + + Object innerResult = innerCallback.awaitItself(innerResultFuture); + assertNull(RetrySynchronizationManager.getContext()); + + // Return inner result as outer result + return innerResult; + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + + Object outerResult = outerCallback.awaitItself(outerTemplate.execute(outerCallback)); + assertEquals(innerCallback.defaultResult, outerResult); + + assertEquals(Arrays.asList( + // initial scheduling of the outer callback + Thread.currentThread().getName(), + Thread.currentThread().getName(), + Thread.currentThread().getName(), + // rescheduling of the outer callback + SCHEDULER_THREAD_NAME, + SCHEDULER_THREAD_NAME + ), outerCallback.schedulerThreadNames); + + assertEquals(Arrays.asList( + // initial scheduling of the inner callback + WORKER_THREAD_NAME, + WORKER_THREAD_NAME, + WORKER_THREAD_NAME, + // rescheduling of the inner callback + SCHEDULER_THREAD_NAME, + SCHEDULER_THREAD_NAME + ), innerCallback.schedulerThreadNames); + } + + + /** + * Test with additional chained completable futures. + */ + + @Test + public void testAdditionalChainedCF() throws Throwable { + + Object additionalInnerResult = new Object(); + CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback() { + @Override + public CompletableFuture schedule(Supplier callback, ExecutorService workerExecutor) { + return super.schedule(callback, workerExecutor) + // Additional inner cf + .thenApply(r -> { + assertEquals(this.defaultResult, r); + return additionalInnerResult; + }); + } + }; + RetryTemplate template = RetryTemplate.builder() + .maxAttempts(4) + .asyncRetry() + .build(); + + callback.setAttemptsBeforeSchedulingSuccess(2); + callback.setAttemptsBeforeJobSuccess(3); + + Object additionalOuterResult = new Object(); + CompletableFuture cf = template.execute(callback) + // Additional step + .thenApply(r -> { + assertEquals(additionalInnerResult, r); + return additionalOuterResult; + }); + + assertEquals(additionalOuterResult, callback.awaitItself(cf)); + assertEquals(4, callback.schedulingAttempts.get()); + assertEquals(3, callback.jobAttempts.get()); + + // All invocations after the first successful scheduling should be performed by the + // the worker thread (because not backoff and no rescheduler thread) + assertEquals(Arrays.asList( + Thread.currentThread().getName(), + Thread.currentThread().getName(), + WORKER_THREAD_NAME, + WORKER_THREAD_NAME + ), callback.schedulerThreadNames); + } + + + // todo: test stateful rescheduling + // todo: test RejectedExecutionException on rescheduler + // todo: test InterruptedException + // todo: support declarative async +} diff --git a/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java b/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java index dac67e42..eab47805 100644 --- a/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java +++ b/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java @@ -20,35 +20,46 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinTask; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; import org.junit.Before; import org.junit.Test; import org.springframework.classify.SubclassClassifier; -import org.springframework.retry.RetryCallback; -import org.springframework.retry.RetryContext; -import org.springframework.retry.backoff.BackOffContext; -import org.springframework.retry.backoff.BackOffInterruptedException; -import org.springframework.retry.backoff.BackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.springframework.retry.util.test.TestUtils.getPropertyValue; /** * @author Dave Syer */ -public class AsyncRetryTemplateTests { +public class AsyncRetryTemplateTests extends AbstractAsyncRetryTest { private RetryTemplate retryTemplate; - + @Before @SuppressWarnings({ "unchecked", "rawtypes" }) public void init() { +// org.apache.log4j.BasicConfigurator.configure(); + + Logger root = Logger.getRootLogger(); + root.removeAllAppenders(); + root.addAppender(new ConsoleAppender(new PatternLayout("%r [%t] %p %c{1} %x - %m%n"))); + Logger.getRootLogger().setLevel(Level.TRACE); + this.retryTemplate = new RetryTemplate(); Map, RetryResultProcessor> map = new HashMap<>(); map.put(Future.class, new FutureRetryResultProcessor()); @@ -62,43 +73,47 @@ public void init() { public void testSuccessfulRetryCompletable() throws Throwable { for (int x = 1; x <= 10; x++) { CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback(); - callback.setAttemptsBeforeSuccess(x); + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(x); SimpleRetryPolicy policy = new SimpleRetryPolicy(x); this.retryTemplate.setRetryPolicy(policy); CompletableFuture result = this.retryTemplate.execute(callback); - assertEquals(CompletableFutureRetryCallback.RESULT, - result.get(1000L, TimeUnit.MILLISECONDS)); - assertEquals(x, callback.attempts); + assertEquals(callback.defaultResult, + result.get(10000L, TimeUnit.MILLISECONDS)); + assertEquals(x, callback.jobAttempts.get()); } } - @Test + // todo: remove of fix after discussion + /*@Test public void testSuccessfulRetryFuture() throws Throwable { for (int x = 1; x <= 10; x++) { FutureRetryCallback callback = new FutureRetryCallback(); - callback.setAttemptsBeforeSuccess(x); - SimpleRetryPolicy policy = new SimpleRetryPolicy(x); + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(x); + SimpleRetryPolicy policy = new SimpleRetryPolicy(x + 1); this.retryTemplate.setRetryPolicy(policy); Future result = this.retryTemplate.execute(callback); - assertEquals(FutureRetryCallback.RESULT, - result.get(1000L, TimeUnit.MILLISECONDS)); - assertEquals(x, callback.attempts); + assertEquals(callback.defaultResult, + result.get(10000L, TimeUnit.MILLISECONDS)); + assertEquals(x, callback.jobAttempts.get()); } - } + }*/ @Test public void testBackOffInvoked() throws Throwable { for (int x = 1; x <= 10; x++) { CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback(); MockBackOffStrategy backOff = new MockBackOffStrategy(); - callback.setAttemptsBeforeSuccess(x); + callback.setAttemptsBeforeSchedulingSuccess(1); + callback.setAttemptsBeforeJobSuccess(x); SimpleRetryPolicy policy = new SimpleRetryPolicy(10); this.retryTemplate.setRetryPolicy(policy); this.retryTemplate.setBackOffPolicy(backOff); CompletableFuture result = this.retryTemplate.execute(callback); - assertEquals(CompletableFutureRetryCallback.RESULT, - result.get(1000L, TimeUnit.MILLISECONDS)); - assertEquals(x, callback.attempts); + assertEquals(callback.defaultResult, + result.get(10000L, TimeUnit.MILLISECONDS)); + assertEquals(x, callback.jobAttempts.get()); assertEquals(1, backOff.startCalls); assertEquals(x - 1, backOff.backOffCalls); } @@ -109,7 +124,7 @@ public void testNoSuccessRetry() throws Throwable { CompletableFutureRetryCallback callback = new CompletableFutureRetryCallback(); // Something that won't be thrown by JUnit... callback.setExceptionToThrow(new IllegalArgumentException()); - callback.setAttemptsBeforeSuccess(Integer.MAX_VALUE); + callback.setAttemptsBeforeJobSuccess(Integer.MAX_VALUE); int retryAttempts = 2; this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(retryAttempts)); try { @@ -120,96 +135,9 @@ public void testNoSuccessRetry() throws Throwable { catch (ExecutionException e) { assertTrue("Expected IllegalArgumentException", e.getCause() instanceof IllegalArgumentException); - assertEquals(retryAttempts, callback.attempts); + assertEquals(retryAttempts, callback.jobAttempts.get()); return; } fail("Expected IllegalArgumentException"); } - - private static class CompletableFutureRetryCallback - implements RetryCallback, Exception> { - - public static Object RESULT = new Object(); - - private int attempts; - - private int attemptsBeforeSuccess; - - private RuntimeException exceptionToThrow = new RuntimeException(); - - @Override - public CompletableFuture doWithRetry(RetryContext status) - throws Exception { - // !!!! Don't do this in real life - use a thread pool - return CompletableFuture.supplyAsync(() -> { - this.attempts++; - if (this.attempts < this.attemptsBeforeSuccess) { - throw this.exceptionToThrow; - } - return RESULT; - }); - } - - public void setAttemptsBeforeSuccess(int attemptsBeforeSuccess) { - this.attemptsBeforeSuccess = attemptsBeforeSuccess; - } - - public void setExceptionToThrow(RuntimeException exceptionToThrow) { - this.exceptionToThrow = exceptionToThrow; - } - - } - - private static class FutureRetryCallback - implements RetryCallback, Exception> { - - public static Object RESULT = new Object(); - - private int attempts; - - private int attemptsBeforeSuccess; - - private RuntimeException exceptionToThrow = new RuntimeException(); - - @Override - public Future doWithRetry(RetryContext status) throws Exception { - // !!!! Don't do this in real life - use a thread pool - return ForkJoinTask.adapt(() -> { - this.attempts++; - if (this.attempts < this.attemptsBeforeSuccess) { - throw this.exceptionToThrow; - } - return RESULT; - }).fork(); - } - - public void setAttemptsBeforeSuccess(int attemptsBeforeSuccess) { - this.attemptsBeforeSuccess = attemptsBeforeSuccess; - } - - } - - private static class MockBackOffStrategy implements BackOffPolicy { - - public int backOffCalls; - - public int startCalls; - - @Override - public BackOffContext start(RetryContext status) { - if (!status.hasAttribute(MockBackOffStrategy.class.getName())) { - this.startCalls++; - status.setAttribute(MockBackOffStrategy.class.getName(), true); - } - return null; - } - - @Override - public void backOff(BackOffContext backOffContext) - throws BackOffInterruptedException { - this.backOffCalls++; - } - - } - } diff --git a/src/test/java/org/springframework/retry/support/StatefulRecoveryRetryTests.java b/src/test/java/org/springframework/retry/support/StatefulRecoveryRetryTests.java index 77f7e573..63eb46e7 100644 --- a/src/test/java/org/springframework/retry/support/StatefulRecoveryRetryTests.java +++ b/src/test/java/org/springframework/retry/support/StatefulRecoveryRetryTests.java @@ -140,7 +140,8 @@ public String recover(RetryContext context) { } }; Object result = null; - // On the second retry, the recovery path is taken... + // The recovery path is taken just after the first attempt. + // No rethrow, due to no rollback is required for this type of exception. result = this.retryTemplate.execute(callback, recoveryCallback, state); assertEquals(input, result); // default result is the item assertEquals(1, this.count); From bf8dadb4b05fea8fbd1c2da0f8cfeb561c08341c Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Fri, 6 Sep 2019 11:28:41 +0100 Subject: [PATCH 3/3] Tidy up a few compiler warnings --- ...pplier.java => BackoffPeriodSupplier.java} | 3 +- .../retry/backoff/RememberPeriodSleeper.java | 25 +- .../StatefulRetryOperationsInterceptor.java | 1 - .../support/AsyncRetryResultProcessor.java | 85 +++--- ...CompletableFutureRetryResultProcessor.java | 35 +-- .../support/FutureRetryResultProcessor.java | 64 +++-- .../retry/support/RetryResultProcessor.java | 6 +- .../retry/support/RetryTemplate.java | 59 ++-- .../retry/support/RetryTemplateBuilder.java | 17 +- ...odInvocationRetryListenerSupportTests.java | 8 +- .../retry/support/AbstractAsyncRetryTest.java | 91 +++--- .../retry/support/AsyncReschedulingTests.java | 263 +++++++----------- .../support/AsyncRetryTemplateTests.java | 51 ++-- 13 files changed, 314 insertions(+), 394 deletions(-) rename src/main/java/org/springframework/retry/backoff/{LastBackoffPeriodSupplier.java => BackoffPeriodSupplier.java} (54%) diff --git a/src/main/java/org/springframework/retry/backoff/LastBackoffPeriodSupplier.java b/src/main/java/org/springframework/retry/backoff/BackoffPeriodSupplier.java similarity index 54% rename from src/main/java/org/springframework/retry/backoff/LastBackoffPeriodSupplier.java rename to src/main/java/org/springframework/retry/backoff/BackoffPeriodSupplier.java index 0487dd61..bd97cc3e 100644 --- a/src/main/java/org/springframework/retry/backoff/LastBackoffPeriodSupplier.java +++ b/src/main/java/org/springframework/retry/backoff/BackoffPeriodSupplier.java @@ -2,5 +2,6 @@ import java.util.function.Supplier; -public interface LastBackoffPeriodSupplier extends Supplier { +public interface BackoffPeriodSupplier extends Supplier { + } diff --git a/src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java b/src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java index 4a61d369..10a7feae 100644 --- a/src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java +++ b/src/main/java/org/springframework/retry/backoff/RememberPeriodSleeper.java @@ -3,20 +3,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -public class RememberPeriodSleeper implements Sleeper, LastBackoffPeriodSupplier { +public class RememberPeriodSleeper implements Sleeper, BackoffPeriodSupplier { - private static final Log logger = LogFactory.getLog(RememberPeriodSleeper.class); + private static final Log logger = LogFactory.getLog(RememberPeriodSleeper.class); - private volatile Long lastBackoffPeriod; + private volatile Long lastBackoffPeriod; - @Override - public void sleep(long backOffPeriod) { - logger.debug("Remembering a sleeping period instead of sleeping: " + backOffPeriod); - lastBackoffPeriod = backOffPeriod; - } + @Override + public void sleep(long backOffPeriod) { + logger.debug("Remembering a sleeping period instead of sleeping: " + backOffPeriod); + lastBackoffPeriod = backOffPeriod; + } + + @Override + public Long get() { + return lastBackoffPeriod; + } - @Override - public Long get() { - return lastBackoffPeriod; - } } diff --git a/src/main/java/org/springframework/retry/interceptor/StatefulRetryOperationsInterceptor.java b/src/main/java/org/springframework/retry/interceptor/StatefulRetryOperationsInterceptor.java index f769742a..4a540ced 100644 --- a/src/main/java/org/springframework/retry/interceptor/StatefulRetryOperationsInterceptor.java +++ b/src/main/java/org/springframework/retry/interceptor/StatefulRetryOperationsInterceptor.java @@ -25,7 +25,6 @@ import org.springframework.classify.Classifier; import org.springframework.retry.RecoveryCallback; -import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; import org.springframework.retry.RetryOperations; import org.springframework.retry.RetryState; diff --git a/src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java index ab0b1b1a..c9b7e60a 100644 --- a/src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java +++ b/src/main/java/org/springframework/retry/support/AsyncRetryResultProcessor.java @@ -16,69 +16,64 @@ package org.springframework.retry.support; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.retry.RetryContext; import org.springframework.retry.RetryException; -import org.springframework.retry.backoff.LastBackoffPeriodSupplier; +import org.springframework.retry.backoff.BackoffPeriodSupplier; /** * @author Dave Syer + * @param The result type */ public abstract class AsyncRetryResultProcessor implements RetryResultProcessor { - private static final Log logger = LogFactory.getLog(AsyncRetryResultProcessor.class); - protected T doNewAttempt(Supplier> supplier) throws Throwable { - logger.debug("Performing the next async callback invocation..."); - return supplier.get().getOrThrow(); - } + private static final Log logger = LogFactory.getLog(AsyncRetryResultProcessor.class); + + protected T doNewAttempt(Supplier> supplier) throws Throwable { + logger.debug("Performing the next async callback invocation..."); + return supplier.get().getOrThrow(); + } + + protected abstract T scheduleNewAttemptAfterDelay(Supplier> supplier, + ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx) + throws Throwable; - protected abstract T scheduleNewAttemptAfterDelay( - Supplier> supplier, - ScheduledExecutorService reschedulingExecutor, - long rescheduleAfterMillis, - RetryContext ctx - ) throws Throwable; + protected T handleException(Supplier> supplier, Consumer handler, Throwable throwable, + ScheduledExecutorService reschedulingExecutor, BackoffPeriodSupplier lastBackoffPeriodSupplier, + RetryContext ctx) { + try { + handler.accept(unwrapIfNeed(throwable)); - protected T handleException(Supplier> supplier, - Consumer handler, - Throwable throwable, - ScheduledExecutorService reschedulingExecutor, - LastBackoffPeriodSupplier lastBackoffPeriodSupplier, - RetryContext ctx) { - try { - handler.accept(unwrapIfNeed(throwable)); + if (reschedulingExecutor == null || lastBackoffPeriodSupplier == null) { + return doNewAttempt(supplier); + } + else { + long rescheduleAfterMillis = lastBackoffPeriodSupplier.get(); + logger.debug("Scheduling a next retry with a delay = " + rescheduleAfterMillis + " millis..."); + return scheduleNewAttemptAfterDelay(supplier, reschedulingExecutor, rescheduleAfterMillis, ctx); + } + } + catch (Throwable t) { + throw RetryTemplate.runtimeException(unwrapIfNeed(t)); + } + } - if (reschedulingExecutor == null || lastBackoffPeriodSupplier == null) { - return doNewAttempt(supplier); - } else { - long rescheduleAfterMillis = lastBackoffPeriodSupplier.get(); - logger.debug("Scheduling a next retry with a delay = " + rescheduleAfterMillis + " millis..."); - return scheduleNewAttemptAfterDelay(supplier, reschedulingExecutor, rescheduleAfterMillis, ctx); - } - } - catch (Throwable t) { - throw RetryTemplate.runtimeException(unwrapIfNeed(t)); - } - } + static Throwable unwrapIfNeed(Throwable throwable) { + if (throwable instanceof ExecutionException || throwable instanceof CompletionException + || throwable instanceof RetryException) { + return throwable.getCause(); + } + else { + return throwable; + } + } - static Throwable unwrapIfNeed(Throwable throwable) { - if (throwable instanceof ExecutionException - || throwable instanceof CompletionException - || throwable instanceof RetryException) { - return throwable.getCause(); - } else { - return throwable; - } - } } diff --git a/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java index a444f8d6..b5658bc0 100644 --- a/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java +++ b/src/main/java/org/springframework/retry/support/CompletableFutureRetryResultProcessor.java @@ -17,8 +17,6 @@ package org.springframework.retry.support; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -27,10 +25,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; -import org.springframework.retry.RetryException; -import org.springframework.retry.backoff.LastBackoffPeriodSupplier; +import org.springframework.retry.backoff.BackoffPeriodSupplier; /** * A {@link RetryResultProcessor} for a {@link CompletableFuture}. If a @@ -38,48 +36,45 @@ * used internally by the {@link RetryTemplate} to wrap it and process the result. * * @author Dave Syer + * @param The result type */ -public class CompletableFutureRetryResultProcessor - extends AsyncRetryResultProcessor> { +public class CompletableFutureRetryResultProcessor extends AsyncRetryResultProcessor> { protected final Log logger = LogFactory.getLog(getClass()); @Override public Result> process(CompletableFuture completable, - Supplier>> supplier, - Consumer handler, ScheduledExecutorService reschedulingExecutor, - LastBackoffPeriodSupplier lastBackoffPeriodSupplier, + Supplier>> supplier, Consumer handler, + ScheduledExecutorService reschedulingExecutor, BackoffPeriodSupplier lastBackoffPeriodSupplier, RetryContext ctx) { CompletableFuture handle = completable - .thenApply(CompletableFuture::completedFuture) - .exceptionally(throwable -> handleException( - supplier, handler, throwable, reschedulingExecutor, lastBackoffPeriodSupplier, ctx) - ) + .thenApply(CompletableFuture::completedFuture).exceptionally(throwable -> handleException(supplier, + handler, throwable, reschedulingExecutor, lastBackoffPeriodSupplier, ctx)) .thenCompose(Function.identity()); return new Result<>(handle); } - protected CompletableFuture scheduleNewAttemptAfterDelay( - Supplier>> supplier, - ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, - RetryContext ctx) - { + protected CompletableFuture scheduleNewAttemptAfterDelay(Supplier>> supplier, + ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx) { CompletableFuture> futureOfFurtherScheduling = new CompletableFuture<>(); reschedulingExecutor.schedule(() -> { try { RetrySynchronizationManager.register(ctx); futureOfFurtherScheduling.complete(doNewAttempt(supplier)); - } catch (Throwable t) { + } + catch (Throwable t) { futureOfFurtherScheduling.completeExceptionally(t); throw RetryTemplate.runtimeException(t); - } finally { + } + finally { RetrySynchronizationManager.clear(); } }, rescheduleAfterMillis, TimeUnit.MILLISECONDS); return futureOfFurtherScheduling.thenCompose(Function.identity()); } + } \ No newline at end of file diff --git a/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java b/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java index 3a9534ea..5c4f52be 100644 --- a/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java +++ b/src/main/java/org/springframework/retry/support/FutureRetryResultProcessor.java @@ -23,12 +23,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.Supplier; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; -import org.springframework.retry.backoff.LastBackoffPeriodSupplier; +import org.springframework.retry.backoff.BackoffPeriodSupplier; /** * todo: check or remove after discussion @@ -36,16 +35,16 @@ * A {@link RetryResultProcessor} for a plain {@link Future}. If a {@link RetryCallback} * returns a Future this processor can be used internally by the * {@link RetryTemplate} to wrap it and process the result. - * + * * @author Dave Syer + * @param The result type */ public class FutureRetryResultProcessor extends AsyncRetryResultProcessor> { @Override - public Result> process(Future future, - Supplier>> supplier, Consumer handler, - ScheduledExecutorService reschedulingExecutor, LastBackoffPeriodSupplier lastBackoffPeriodSupplier, - RetryContext ctx) { + public Result> process(Future future, Supplier>> supplier, + Consumer handler, ScheduledExecutorService reschedulingExecutor, + BackoffPeriodSupplier lastBackoffPeriodSupplier, RetryContext ctx) { return new Result<>(new FutureWrapper(future, supplier, handler, this, reschedulingExecutor, lastBackoffPeriodSupplier, ctx)); } @@ -53,12 +52,12 @@ public Result> process(Future future, @Override protected Future scheduleNewAttemptAfterDelay(Supplier>> supplier, ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx) - throws Throwable - { + throws Throwable { ScheduledFuture> scheduledFuture = reschedulingExecutor.schedule(() -> { try { return doNewAttempt(supplier); - } catch (Throwable t) { + } + catch (Throwable t) { throw RetryTemplate.runtimeException(t); } }, rescheduleAfterMillis, TimeUnit.MILLISECONDS); @@ -73,15 +72,18 @@ private class FutureWrapper implements Future { private Supplier>> supplier; private Consumer handler; + private AsyncRetryResultProcessor> processor; + private final ScheduledExecutorService reschedulingExecutor; - private final LastBackoffPeriodSupplier lastBackoffPeriodSupplier; + + private final BackoffPeriodSupplier lastBackoffPeriodSupplier; + private RetryContext ctx; - FutureWrapper(Future delegate, Supplier>> supplier, - Consumer handler, AsyncRetryResultProcessor> processor, - ScheduledExecutorService reschedulingExecutor, LastBackoffPeriodSupplier lastBackoffPeriodSupplier, - RetryContext ctx) { + FutureWrapper(Future delegate, Supplier>> supplier, Consumer handler, + AsyncRetryResultProcessor> processor, ScheduledExecutorService reschedulingExecutor, + BackoffPeriodSupplier lastBackoffPeriodSupplier, RetryContext ctx) { this.delegate = delegate; this.supplier = supplier; this.handler = handler; @@ -112,19 +114,20 @@ public V get() throws InterruptedException, ExecutionException { return this.delegate.get(); } catch (Throwable e) { - return processor.handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx) + return processor + .handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx) .get(); } } @Override - public V get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { try { return this.delegate.get(timeout, unit); } catch (Throwable e) { - return processor.handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx) + return processor + .handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx) .get(timeout, unit); } } @@ -142,22 +145,25 @@ private class FutureFlatter implements Future { @Override public boolean cancel(boolean mayInterruptIfRunning) { try { - if (this.nestedFuture.isDone()) { - return this.nestedFuture.get().cancel(mayInterruptIfRunning); - } else { - return this.nestedFuture.cancel(mayInterruptIfRunning); + if (this.nestedFuture.isDone()) { + return this.nestedFuture.get().cancel(mayInterruptIfRunning); + } + else { + return this.nestedFuture.cancel(mayInterruptIfRunning); + } } - } catch (Throwable t) { + catch (Throwable t) { throw RetryTemplate.runtimeException(t); } - } + } @Override public boolean isCancelled() { try { return this.nestedFuture.isCancelled() || (this.nestedFuture.isDone() && this.nestedFuture.get().isCancelled()); - } catch (Throwable t) { + } + catch (Throwable t) { throw RetryTemplate.runtimeException(t); } } @@ -166,7 +172,8 @@ public boolean isCancelled() { public boolean isDone() { try { return this.nestedFuture.isDone() && this.nestedFuture.get().isDone(); - } catch (Throwable t) { + } + catch (Throwable t) { throw RetryTemplate.runtimeException(t); } } @@ -177,8 +184,7 @@ public V get() throws InterruptedException, ExecutionException { } @Override - public V get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return this.nestedFuture.get(timeout, unit).get(timeout, unit); } diff --git a/src/main/java/org/springframework/retry/support/RetryResultProcessor.java b/src/main/java/org/springframework/retry/support/RetryResultProcessor.java index dce60354..eb4e44b0 100644 --- a/src/main/java/org/springframework/retry/support/RetryResultProcessor.java +++ b/src/main/java/org/springframework/retry/support/RetryResultProcessor.java @@ -21,7 +21,7 @@ import java.util.function.Supplier; import org.springframework.retry.RetryContext; -import org.springframework.retry.backoff.LastBackoffPeriodSupplier; +import org.springframework.retry.backoff.BackoffPeriodSupplier; /** * @author Dave Syer @@ -30,8 +30,7 @@ public interface RetryResultProcessor { Result process(T input, Supplier> supplier, Consumer handler, - ScheduledExecutorService reschedulingExecutor, - LastBackoffPeriodSupplier lastBackoffPeriodSupplier, + ScheduledExecutorService reschedulingExecutor, BackoffPeriodSupplier lastBackoffPeriodSupplier, RetryContext ctx); public static class Result { @@ -70,6 +69,7 @@ public T getOrThrow() throws Throwable { } throw exception; } + } } diff --git a/src/main/java/org/springframework/retry/support/RetryTemplate.java b/src/main/java/org/springframework/retry/support/RetryTemplate.java index 025040e1..e3f206fa 100644 --- a/src/main/java/org/springframework/retry/support/RetryTemplate.java +++ b/src/main/java/org/springframework/retry/support/RetryTemplate.java @@ -19,9 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,7 +38,7 @@ import org.springframework.retry.backoff.BackOffContext; import org.springframework.retry.backoff.BackOffInterruptedException; import org.springframework.retry.backoff.BackOffPolicy; -import org.springframework.retry.backoff.LastBackoffPeriodSupplier; +import org.springframework.retry.backoff.BackoffPeriodSupplier; import org.springframework.retry.backoff.NoBackOffPolicy; import org.springframework.retry.backoff.RememberPeriodSleeper; import org.springframework.retry.backoff.SleepingBackOffPolicy; @@ -48,7 +46,6 @@ import org.springframework.retry.policy.RetryContextCache; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryResultProcessor.Result; -import org.springframework.util.Assert; /** * Template class that simplifies the execution of operations with retry semantics. @@ -97,7 +94,7 @@ public class RetryTemplate implements RetryOperations { private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy(); - private volatile LastBackoffPeriodSupplier lastBackoffPeriodSupplier = null; + private volatile BackoffPeriodSupplier lastBackoffPeriodSupplier = null; private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3); @@ -152,8 +149,7 @@ public void setRetryContextCache(RetryContextCache retryContextCache) { * empty). * @param processors the processors to set */ - public void setRetryResultProcessors( - Classifier> processors) { + public void setRetryResultProcessors(Classifier> processors) { this.processors = processors; } @@ -193,11 +189,13 @@ public void setBackOffPolicy(BackOffPolicy backOffPolicy) { private BackOffPolicy replaceSleeperIfNeed(BackOffPolicy backOffPolicy) { if (reschedulingExecutor != null && backOffPolicy instanceof SleepingBackOffPolicy) { - this.logger.debug("Replacing the default sleeper by RememberPeriodSleeper to enable scheduler-based backoff."); + this.logger + .debug("Replacing the default sleeper by RememberPeriodSleeper to enable scheduler-based backoff."); RememberPeriodSleeper rememberPeriodSleeper = new RememberPeriodSleeper(); lastBackoffPeriodSupplier = rememberPeriodSleeper; - return ((SleepingBackOffPolicy) backOffPolicy).withSleeper(rememberPeriodSleeper); - } else { + return ((SleepingBackOffPolicy) backOffPolicy).withSleeper(rememberPeriodSleeper); + } + else { return backOffPolicy; } } @@ -352,8 +350,8 @@ protected T doExecute(RetryCallback retryCallback } - private Result safeLoop(RetryCallback retryCallback, - RetryState state, RetryContext context, BackOffContext backOffContext) { + private Result safeLoop(RetryCallback retryCallback, RetryState state, + RetryContext context, BackOffContext backOffContext) { try { return loop(retryCallback, state, context, backOffContext); } @@ -362,9 +360,8 @@ private Result safeLoop(RetryCallback retryCal } } - private Result loop(RetryCallback retryCallback, - RetryState state, RetryContext context, BackOffContext backOffContext) - throws E { + private Result loop(RetryCallback retryCallback, RetryState state, + RetryContext context, BackOffContext backOffContext) throws E { Throwable lastException = null; @@ -384,16 +381,11 @@ private Result loop(RetryCallback retryCallbac T result = retryCallback.doWithRetry(context); if (result != null && this.processors != null) { @SuppressWarnings("unchecked") - RetryResultProcessor processor = (RetryResultProcessor) this.processors - .classify(result); + RetryResultProcessor processor = (RetryResultProcessor) this.processors.classify(result); if (processor != null) { - return processor.process(result, - () -> safeLoop(retryCallback, state, context, - backOffContext), - error -> safeHandleLoopException(retryCallback, state, - context, backOffContext, error), reschedulingExecutor, - lastBackoffPeriodSupplier, - context); + return processor.process(result, () -> safeLoop(retryCallback, state, context, backOffContext), + error -> safeHandleLoopException(retryCallback, state, context, backOffContext, error), + reschedulingExecutor, lastBackoffPeriodSupplier, context); } } return new Result<>(result); @@ -414,13 +406,11 @@ private Result loop(RetryCallback retryCallbac break; } } - return new Result<>( - lastException == null ? context.getLastThrowable() : lastException); + return new Result<>(lastException == null ? context.getLastThrowable() : lastException); } - private void safeHandleLoopException( - RetryCallback retryCallback, RetryState state, RetryContext context, - BackOffContext backOffContext, Throwable e) { + private void safeHandleLoopException(RetryCallback retryCallback, RetryState state, + RetryContext context, BackOffContext backOffContext, Throwable e) { try { handleLoopException(retryCallback, state, context, backOffContext, e); } @@ -429,9 +419,8 @@ private void safeHandleLoopException( } } - private void handleLoopException( - RetryCallback retryCallback, RetryState state, RetryContext context, - BackOffContext backOffContext, Throwable e) throws E { + private void handleLoopException(RetryCallback retryCallback, RetryState state, + RetryContext context, BackOffContext backOffContext, Throwable e) throws E { try { registerThrowable(this.retryPolicy, state, context, e); } @@ -449,8 +438,7 @@ private void handleLoopException( catch (BackOffInterruptedException ex) { // back off was prevented by another thread - fail the retry if (this.logger.isDebugEnabled()) { - this.logger.debug("Abort retry because interrupted: count=" - + context.getRetryCount()); + this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount()); } throw ex; } @@ -462,8 +450,7 @@ private void handleLoopException( if (shouldRethrow(this.retryPolicy, context, state)) { if (this.logger.isDebugEnabled()) { - this.logger.debug( - "Rethrow in retry for policy: count=" + context.getRetryCount()); + this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount()); } throw RetryTemplate.wrapIfNecessary(e); } diff --git a/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java b/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java index cfcc5902..10e40fbc 100644 --- a/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java +++ b/src/main/java/org/springframework/retry/support/RetryTemplateBuilder.java @@ -376,17 +376,17 @@ public RetryTemplateBuilder asyncRetry(ScheduledExecutorService reschedulingExec } /** - * Enable async retry feature. - * Due to no rescheduling executor is provided, a potential backoff will be performed - * by Thread.sleep(). + * Enable async retry feature. Due to no rescheduling executor is provided, a + * potential backoff will be performed by Thread.sleep(). + * @return A new RetryTemplateBuilder for an async retry */ public RetryTemplateBuilder asyncRetry() { // todo: support interface classification (does not work yet) this.processors.put(Future.class, new FutureRetryResultProcessor<>()); this.processors.put(FutureTask.class, new FutureRetryResultProcessor<>()); - this.processors.put(CompletableFuture.class, new CompletableFutureRetryResultProcessor()); + this.processors.put(CompletableFuture.class, new CompletableFutureRetryResultProcessor()); - //todo + // todo return this; } @@ -438,13 +438,10 @@ public RetryTemplate build() { retryTemplate.setReschedulingExecutor(executorService); Assert.isTrue(backOffPolicy instanceof SleepingBackOffPolicy, - "Usage of a rescheduling executor makes sense " - + "only with an instance of SleepingBackOffPolicy" - ); + "Usage of a rescheduling executor makes sense " + "only with an instance of SleepingBackOffPolicy"); } - SubclassClassifier> classifier = - new SubclassClassifier<>(processors, null); + SubclassClassifier> classifier = new SubclassClassifier<>(processors, null); retryTemplate.setRetryResultProcessors(classifier); return retryTemplate; diff --git a/src/test/java/org/springframework/retry/listener/MethodInvocationRetryListenerSupportTests.java b/src/test/java/org/springframework/retry/listener/MethodInvocationRetryListenerSupportTests.java index 8f36e62d..13bf65bf 100644 --- a/src/test/java/org/springframework/retry/listener/MethodInvocationRetryListenerSupportTests.java +++ b/src/test/java/org/springframework/retry/listener/MethodInvocationRetryListenerSupportTests.java @@ -51,7 +51,7 @@ protected void doClose(RetryContext context, } }; RetryContext context = mock(RetryContext.class); - MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); + MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); support.close(context, callback, null); assertEquals(1, callsOnDoCloseMethod.get()); @@ -68,7 +68,7 @@ protected void doClose(RetryContext context, } }; RetryContext context = mock(RetryContext.class); - RetryCallback callback = mock(RetryCallback.class); + RetryCallback callback = mock(RetryCallback.class); support.close(context, callback, null); assertEquals(0, callsOnDoCloseMethod.get()); @@ -96,7 +96,7 @@ protected void doOnError(RetryContext context, } }; RetryContext context = mock(RetryContext.class); - MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); + MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); support.onError(context, callback, null); assertEquals(1, callsOnDoOnErrorMethod.get()); @@ -120,7 +120,7 @@ protected boolean doOpen(RetryContext context, } }; RetryContext context = mock(RetryContext.class); - MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); + MethodInvocationRetryCallback callback = mock(MethodInvocationRetryCallback.class); assertTrue(support.open(context, callback)); assertEquals(1, callsOnDoOpenMethod.get()); diff --git a/src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java b/src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java index ae5bb34e..5f3c5a51 100644 --- a/src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java +++ b/src/test/java/org/springframework/retry/support/AbstractAsyncRetryTest.java @@ -46,11 +46,12 @@ * @author Dave Syer */ public class AbstractAsyncRetryTest { - - /* ---------------- Async callbacks implementations for different types -------------- */ - static class CompletableFutureRetryCallback - extends AbstractRetryCallback> { + /* + * ---------------- Async callbacks implementations for different types -------------- + */ + + static class CompletableFutureRetryCallback extends AbstractRetryCallback> { @Override public CompletableFuture schedule(Supplier callback, ExecutorService workerExecutor) { @@ -61,11 +62,11 @@ public CompletableFuture schedule(Supplier callback, ExecutorSer Object awaitItself(CompletableFuture asyncType) { return asyncType.join(); } + } - static class FutureRetryCallback - extends AbstractRetryCallback> { - + static class FutureRetryCallback extends AbstractRetryCallback> { + @Override public Future schedule(Supplier callback, ExecutorService executor) { return executor.submit(callback::get); @@ -75,39 +76,43 @@ public Future schedule(Supplier callback, ExecutorService execut Object awaitItself(Future asyncType) throws Throwable { return asyncType.get(); } + } - static abstract class AbstractRetryCallback - implements RetryCallback { + static abstract class AbstractRetryCallback implements RetryCallback { final Object defaultResult = new Object(); + final Log logger = LogFactory.getLog(getClass()); final AtomicInteger jobAttempts = new AtomicInteger(); + final AtomicInteger schedulingAttempts = new AtomicInteger(); volatile int attemptsBeforeSchedulingSuccess; + volatile int attemptsBeforeJobSuccess; volatile RuntimeException exceptionToThrow = new RuntimeException(); volatile Function resultSupplier = ctx -> defaultResult; - volatile Consumer customCodeBeforeScheduling = ctx -> {}; + + volatile Consumer customCodeBeforeScheduling = ctx -> { + }; final List schedulerThreadNames = new CopyOnWriteArrayList<>(); + final List invocationMoments = new CopyOnWriteArrayList<>(); - final ExecutorService workerExecutor = Executors.newSingleThreadExecutor( - getNamedThreadFactory(WORKER_THREAD_NAME) - ); + final ExecutorService workerExecutor = Executors + .newSingleThreadExecutor(getNamedThreadFactory(WORKER_THREAD_NAME)); public abstract A schedule(Supplier callback, ExecutorService executor); abstract Object awaitItself(A asyncType) throws Throwable; @Override - public A doWithRetry(RetryContext ctx) - throws Exception { + public A doWithRetry(RetryContext ctx) throws Exception { rememberThreadName(); rememberInvocationMoment(); @@ -117,9 +122,11 @@ public A doWithRetry(RetryContext ctx) return schedule(() -> { try { - // a hack to avoid running CompletableFuture#thenApplyAsync in the caller thread + // a hack to avoid running CompletableFuture#thenApplyAsync in the + // caller thread Thread.sleep(100L); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { e.printStackTrace(); } throwIfJobTooEarly(); @@ -127,7 +134,7 @@ public A doWithRetry(RetryContext ctx) return resultSupplier.apply(ctx); }, workerExecutor); } - + void rememberInvocationMoment() { invocationMoments.add(System.currentTimeMillis()); } @@ -169,8 +176,8 @@ void setResultSupplier(Function resultSupplier) { void setCustomCodeBeforeScheduling(Consumer customCodeBeforeScheduling) { this.customCodeBeforeScheduling = customCodeBeforeScheduling; } - } + } static class MockBackOffStrategy implements BackOffPolicy { @@ -188,8 +195,7 @@ public BackOffContext start(RetryContext status) { } @Override - public void backOff(BackOffContext backOffContext) - throws BackOffInterruptedException { + public void backOff(BackOffContext backOffContext) throws BackOffInterruptedException { this.backOffCalls++; } @@ -198,30 +204,29 @@ public void backOff(BackOffContext backOffContext) /* ---------------- Utilities -------------- */ static final String SCHEDULER_THREAD_NAME = "scheduler"; - static final String WORKER_THREAD_NAME = "worker"; + static final String WORKER_THREAD_NAME = "worker"; static ScheduledExecutorService getNamedScheduledExecutor() { - return Executors.newScheduledThreadPool( - 1, - getNamedThreadFactory(AbstractAsyncRetryTest.SCHEDULER_THREAD_NAME) - ); + return Executors.newScheduledThreadPool(1, getNamedThreadFactory(AbstractAsyncRetryTest.SCHEDULER_THREAD_NAME)); + } + + static ThreadFactory getNamedThreadFactory(String threadName) { + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r); + thread.setName(threadName); + return thread; + } + }; + } + + void assertRememberingSleeper(RetryTemplate template) { + // The sleeper of the backoff policy should be an instance of + // RememberPeriodSleeper, means not Thread.sleep() + BackOffPolicy backOffPolicy = getPropertyValue(template, "backOffPolicy", BackOffPolicy.class); + Sleeper sleeper = getPropertyValue(backOffPolicy, "sleeper", Sleeper.class); + assertTrue(sleeper instanceof RememberPeriodSleeper); } - static ThreadFactory getNamedThreadFactory(String threadName) { - return new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r); - thread.setName(threadName); - return thread; - } - }; - } - - void assertRememberingSleeper(RetryTemplate template) { - // The sleeper of the backoff policy should be an instance of RememberPeriodSleeper, means not Thread.sleep() - BackOffPolicy backOffPolicy = getPropertyValue(template, "backOffPolicy", BackOffPolicy.class); - Sleeper sleeper = getPropertyValue(backOffPolicy, "sleeper", Sleeper.class); - assertTrue(sleeper instanceof RememberPeriodSleeper); - } } diff --git a/src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java b/src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java index a5c01195..ff77f171 100644 --- a/src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java +++ b/src/test/java/org/springframework/retry/support/AsyncReschedulingTests.java @@ -36,14 +36,12 @@ public class AsyncReschedulingTests extends AbstractAsyncRetryTest { - /** + /* * Scheduling retry + job immediate success. * - * - async callback succeeds at 3rd attempt - * - actual job succeeds on 1st attempt - * - no backoff + * - async callback succeeds at 3rd attempt - actual job succeeds on 1st attempt - no + * backoff */ - @Test public void testInitialSchedulingEventualSuccessCF() throws Throwable { doTestInitialSchedulingEventualSuccess(new CompletableFutureRetryCallback()); @@ -55,30 +53,25 @@ public void testInitialSchedulingEventualSuccessF() throws Throwable { } private void doTestInitialSchedulingEventualSuccess(AbstractRetryCallback callback) throws Throwable { - RetryTemplate template = RetryTemplate.builder() - .maxAttempts(5) - .noBackoff() - .asyncRetry() - .build(); + RetryTemplate template = RetryTemplate.builder().maxAttempts(5).noBackoff().asyncRetry().build(); callback.setAttemptsBeforeSchedulingSuccess(3); callback.setAttemptsBeforeJobSuccess(1); assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); - // All invocations before first successful scheduling should be performed by the caller thread + // All invocations before first successful scheduling should be performed by the + // caller thread assertEquals(Collections.nCopies(3, Thread.currentThread().getName()), callback.schedulerThreadNames); assertEquals(1, callback.jobAttempts.get()); } - /** + /* * Immediate success of both scheduling and job. * - * - async callback, that does not fail itself - * - actual job succeeds on 1st attempt - * - backoff is not necessary - */ - + * - async callback, that does not fail itself - actual job succeeds on 1st attempt - + * backoff is not necessary + */ @Test public void testImmediateSuccessCF() throws Throwable { doTestImmediateSuccess(new CompletableFutureRetryCallback()); @@ -92,10 +85,7 @@ public void testImmediateSuccessF() throws Throwable { private void doTestImmediateSuccess(AbstractRetryCallback callback) throws Throwable { ScheduledExecutorService executor = mock(ScheduledExecutorService.class); - RetryTemplate template = RetryTemplate.builder() - .fixedBackoff(10000) - .asyncRetry(executor) - .build(); + RetryTemplate template = RetryTemplate.builder().fixedBackoff(10000).asyncRetry(executor).build(); callback.setAttemptsBeforeSchedulingSuccess(1); callback.setAttemptsBeforeJobSuccess(1); @@ -106,18 +96,17 @@ private void doTestImmediateSuccess(AbstractRetryCallback callback) throw assertEquals(1, callback.jobAttempts.get()); - // No interaction with the rescheduling executor should be performed if the first execution of the job succeeds. + // No interaction with the rescheduling executor should be performed if the first + // execution of the job succeeds. verifyZeroInteractions(executor); } - /** + /* * Async retry with rescheduler. - * - * - async callback, that does not fail itself - * - actual job succeeds on 3rd attempt - * - backoff is performed using executor, without Thread.sleep() - */ - + * + * - async callback, that does not fail itself - actual job succeeds on 3rd attempt - + * backoff is performed using executor, without Thread.sleep() + */ @Test public void testAsyncRetryWithReschedulerCF() throws Throwable { doTestAsyncRetryWithRescheduler(new CompletableFutureRetryCallback()); @@ -127,87 +116,74 @@ public void testAsyncRetryWithReschedulerCF() throws Throwable { public void testAsyncRetryWithReschedulerF() throws Throwable { doTestAsyncRetryWithRescheduler(new FutureRetryCallback()); } - + private void doTestAsyncRetryWithRescheduler(AbstractRetryCallback callback) throws Throwable { - int targetFixedBackoff = 150; + int targetFixedBackoff = 150; ScheduledExecutorService executor = getNamedScheduledExecutor(); - RetryTemplate template = RetryTemplate.builder() - .maxAttempts(4) - .fixedBackoff(targetFixedBackoff) - .asyncRetry(executor) - .build(); - - callback.setAttemptsBeforeSchedulingSuccess(1); + RetryTemplate template = RetryTemplate.builder().maxAttempts(4).fixedBackoff(targetFixedBackoff) + .asyncRetry(executor).build(); + + callback.setAttemptsBeforeSchedulingSuccess(1); callback.setAttemptsBeforeJobSuccess(3); assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); assertEquals(3, callback.jobAttempts.get()); - // All invocations after the first successful scheduling should be performed by the the rescheduler thread - assertEquals(Arrays.asList( - Thread.currentThread().getName(), - SCHEDULER_THREAD_NAME, - SCHEDULER_THREAD_NAME - ), callback.schedulerThreadNames); + // All invocations after the first successful scheduling should be performed by + // the the rescheduler thread + assertEquals(Arrays.asList(Thread.currentThread().getName(), SCHEDULER_THREAD_NAME, SCHEDULER_THREAD_NAME), + callback.schedulerThreadNames); assertRememberingSleeper(template); - // Expected backoff should be performed - List moments = callback.invocationMoments; - for (int i = 0; i < moments.size() - 1; i++) { - long approxBackoff = moments.get(i + 1) - moments.get(i); - assertTrue(approxBackoff > targetFixedBackoff); - } - } + // Expected backoff should be performed + List moments = callback.invocationMoments; + for (int i = 0; i < moments.size() - 1; i++) { + long approxBackoff = moments.get(i + 1) - moments.get(i); + assertTrue(approxBackoff > targetFixedBackoff); + } + } - /** + /* * Async retry without backoff - * - * - async callback succeeds on 2nd attempt - * - actual job succeeds on 3nd attempt - * - default zero backoff is used (which has no sleeper at all), - * and therefore rescheduler executor is not used at all - */ - + * + * - async callback succeeds on 2nd attempt - actual job succeeds on 3nd attempt - + * default zero backoff is used (which has no sleeper at all), and therefore + * rescheduler executor is not used at all + */ @Test public void testAsyncRetryWithoutBackoffCF() throws Throwable { doTestAsyncRetryWithoutBackoff(new CompletableFutureRetryCallback()); } - // todo: problem: a Future can start retrying only when user calls get(). Consider to not support Future at all. - /*@Test - public void testAsyncRetryWithoutBackoffF() throws Throwable { - doTestAsyncRetryWithoutBackoff(new FutureRetryCallback()); - }*/ - - private void doTestAsyncRetryWithoutBackoff(AbstractRetryCallback callback) throws Throwable { - RetryTemplate template = RetryTemplate.builder() - .maxAttempts(4) - .asyncRetry() - .build(); - - callback.setAttemptsBeforeSchedulingSuccess(2); - callback.setAttemptsBeforeJobSuccess(3); - assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); - assertEquals(4, callback.schedulingAttempts.get()); - assertEquals(3, callback.jobAttempts.get()); - - // All invocations after the first successful scheduling should be performed by the + // todo: problem: a Future can start retrying only when user calls get(). Consider to + // not support Future at all. + /* + * @Test public void testAsyncRetryWithoutBackoffF() throws Throwable { + * doTestAsyncRetryWithoutBackoff(new FutureRetryCallback()); } + */ + + private void doTestAsyncRetryWithoutBackoff(AbstractRetryCallback callback) throws Throwable { + RetryTemplate template = RetryTemplate.builder().maxAttempts(4).asyncRetry().build(); + + callback.setAttemptsBeforeSchedulingSuccess(2); + callback.setAttemptsBeforeJobSuccess(3); + assertEquals(callback.defaultResult, callback.awaitItself(template.execute(callback))); + assertEquals(4, callback.schedulingAttempts.get()); + assertEquals(3, callback.jobAttempts.get()); + + // All invocations after the first successful scheduling should be performed by + // the // the worker thread (because not backoff and no rescheduler thread) - assertEquals(Arrays.asList( - Thread.currentThread().getName(), - Thread.currentThread().getName(), - WORKER_THREAD_NAME, - WORKER_THREAD_NAME - ), callback.schedulerThreadNames); - } - - /** + assertEquals(Arrays.asList(Thread.currentThread().getName(), Thread.currentThread().getName(), + WORKER_THREAD_NAME, WORKER_THREAD_NAME), callback.schedulerThreadNames); + } + + /* * Exhausted on scheduling retries */ - @Test public void testExhaustOnSchedulingCF() throws Throwable { doTestExhaustOnScheduling(new CompletableFutureRetryCallback()); @@ -217,13 +193,9 @@ public void testExhaustOnSchedulingCF() throws Throwable { public void testExhaustOnSchedulingF() throws Throwable { doTestExhaustOnScheduling(new FutureRetryCallback()); } - + private void doTestExhaustOnScheduling(AbstractRetryCallback callback) throws Throwable { - RetryTemplate template = RetryTemplate.builder() - .maxAttempts(2) - .asyncRetry() - .fixedBackoff(100) - .build(); + RetryTemplate template = RetryTemplate.builder().maxAttempts(2).asyncRetry().fixedBackoff(100).build(); callback.setAttemptsBeforeSchedulingSuccess(5); callback.setAttemptsBeforeJobSuccess(5); @@ -231,20 +203,18 @@ private void doTestExhaustOnScheduling(AbstractRetryCallback callback) th try { callback.awaitItself(template.execute(callback)); fail("An exception should be thrown above"); - } catch (Exception e) { + } + catch (Exception e) { assertSame(e, callback.exceptionToThrow); } - assertEquals(Arrays.asList( - Thread.currentThread().getName(), - Thread.currentThread().getName() - ), callback.schedulerThreadNames); + assertEquals(Arrays.asList(Thread.currentThread().getName(), Thread.currentThread().getName()), + callback.schedulerThreadNames); } - /** + /* * Exhausted on job retries */ - @Test public void testExhaustOnJobWithReschedulerCF() throws Throwable { doTestExhaustOnJobWithRescheduler(new CompletableFutureRetryCallback()); @@ -256,37 +226,31 @@ public void testExhaustOnJobWithReschedulerF() throws Throwable { } private void doTestExhaustOnJobWithRescheduler(AbstractRetryCallback callback) throws Throwable { - RetryTemplate template = RetryTemplate.builder() - .maxAttempts(5) - .asyncRetry(getNamedScheduledExecutor()) - .exponentialBackoff(10, 2, 100) - .build(); + RetryTemplate template = RetryTemplate.builder().maxAttempts(5).asyncRetry(getNamedScheduledExecutor()) + .exponentialBackoff(10, 2, 100).build(); callback.setAttemptsBeforeSchedulingSuccess(1); callback.setAttemptsBeforeJobSuccess(6); try { + @SuppressWarnings("unused") Object v = callback.awaitItself(template.execute(callback)); fail("An exception should be thrown above"); - // Single wrapping by CompletionException is expected by CompletableFuture contract - } catch (Exception ce) { + // Single wrapping by CompletionException is expected by CompletableFuture + // contract + } + catch (Exception ce) { assertSame(ce.getCause(), callback.exceptionToThrow); } - assertEquals(Arrays.asList( - Thread.currentThread().getName(), - SCHEDULER_THREAD_NAME, - SCHEDULER_THREAD_NAME, - SCHEDULER_THREAD_NAME, - SCHEDULER_THREAD_NAME - ), callback.schedulerThreadNames); + assertEquals(Arrays.asList(Thread.currentThread().getName(), SCHEDULER_THREAD_NAME, SCHEDULER_THREAD_NAME, + SCHEDULER_THREAD_NAME, SCHEDULER_THREAD_NAME), callback.schedulerThreadNames); } // todo: rejected execution // todo: interrupt executor // rethrow not too late - /* * Nested rescheduling */ @@ -295,33 +259,31 @@ private void doTestExhaustOnJobWithRescheduler(AbstractRetryCallback call public void testNested() throws Throwable { ScheduledExecutorService executor = getNamedScheduledExecutor(); - RetryTemplate outerTemplate = RetryTemplate.builder() - .infiniteRetry() - .asyncRetry(executor) - .fixedBackoff(10) + RetryTemplate outerTemplate = RetryTemplate.builder().infiniteRetry().asyncRetry(executor).fixedBackoff(10) .build(); - RetryTemplate innerTemplate = RetryTemplate.builder() - .infiniteRetry() - .asyncRetry(executor) - .fixedBackoff(10) + RetryTemplate innerTemplate = RetryTemplate.builder().infiniteRetry().asyncRetry(executor).fixedBackoff(10) .build(); CompletableFutureRetryCallback innerCallback = new CompletableFutureRetryCallback(); innerCallback.setAttemptsBeforeSchedulingSuccess(3); innerCallback.setAttemptsBeforeJobSuccess(3); innerCallback.setCustomCodeBeforeScheduling(ctx -> { - // The current context should be available via RetrySynchronizationManager while scheduling + // The current context should be available via RetrySynchronizationManager + // while scheduling // (withing user's async callback itself) assertEquals(ctx, RetrySynchronizationManager.getContext()); - // We have no control over user's worker thread, so we can not implicitly set/get the parent + // We have no control over user's worker thread, so we can not implicitly + // set/get the parent // context via RetrySynchronizationManager. assertNull(ctx.getParent()); }); innerCallback.setResultSupplier(ctx -> { - // There is no way to implicitly pass the context into the worker thread, because the worker executor, - // thread and callback are fully controlled by the user. The retry engine deals with only + // There is no way to implicitly pass the context into the worker thread, + // because the worker executor, + // thread and callback are fully controlled by the user. The retry engine + // deals with only // scheduling/rescheduling and their result (e.g. CompletableFuture) assertNull(RetrySynchronizationManager.getContext()); @@ -332,7 +294,8 @@ public void testNested() throws Throwable { outerCallback.setAttemptsBeforeSchedulingSuccess(3); outerCallback.setAttemptsBeforeJobSuccess(3); outerCallback.setCustomCodeBeforeScheduling(ctx -> { - // The current context should be available via RetrySynchronizationManager while scheduling + // The current context should be available via RetrySynchronizationManager + // while scheduling // (withing user's async callback itself) assertEquals(ctx, RetrySynchronizationManager.getContext()); }); @@ -347,41 +310,31 @@ public void testNested() throws Throwable { // Return inner result as outer result return innerResult; - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException(e); } }); - Object outerResult = outerCallback.awaitItself(outerTemplate.execute(outerCallback)); assertEquals(innerCallback.defaultResult, outerResult); assertEquals(Arrays.asList( // initial scheduling of the outer callback - Thread.currentThread().getName(), - Thread.currentThread().getName(), - Thread.currentThread().getName(), + Thread.currentThread().getName(), Thread.currentThread().getName(), Thread.currentThread().getName(), // rescheduling of the outer callback - SCHEDULER_THREAD_NAME, - SCHEDULER_THREAD_NAME - ), outerCallback.schedulerThreadNames); + SCHEDULER_THREAD_NAME, SCHEDULER_THREAD_NAME), outerCallback.schedulerThreadNames); assertEquals(Arrays.asList( // initial scheduling of the inner callback - WORKER_THREAD_NAME, - WORKER_THREAD_NAME, - WORKER_THREAD_NAME, + WORKER_THREAD_NAME, WORKER_THREAD_NAME, WORKER_THREAD_NAME, // rescheduling of the inner callback - SCHEDULER_THREAD_NAME, - SCHEDULER_THREAD_NAME - ), innerCallback.schedulerThreadNames); + SCHEDULER_THREAD_NAME, SCHEDULER_THREAD_NAME), innerCallback.schedulerThreadNames); } - - /** + /* * Test with additional chained completable futures. */ - @Test public void testAdditionalChainedCF() throws Throwable { @@ -397,10 +350,7 @@ public CompletableFuture schedule(Supplier callback, ExecutorSer }); } }; - RetryTemplate template = RetryTemplate.builder() - .maxAttempts(4) - .asyncRetry() - .build(); + RetryTemplate template = RetryTemplate.builder().maxAttempts(4).asyncRetry().build(); callback.setAttemptsBeforeSchedulingSuccess(2); callback.setAttemptsBeforeJobSuccess(3); @@ -417,19 +367,16 @@ public CompletableFuture schedule(Supplier callback, ExecutorSer assertEquals(4, callback.schedulingAttempts.get()); assertEquals(3, callback.jobAttempts.get()); - // All invocations after the first successful scheduling should be performed by the + // All invocations after the first successful scheduling should be performed by + // the // the worker thread (because not backoff and no rescheduler thread) - assertEquals(Arrays.asList( - Thread.currentThread().getName(), - Thread.currentThread().getName(), - WORKER_THREAD_NAME, - WORKER_THREAD_NAME - ), callback.schedulerThreadNames); + assertEquals(Arrays.asList(Thread.currentThread().getName(), Thread.currentThread().getName(), + WORKER_THREAD_NAME, WORKER_THREAD_NAME), callback.schedulerThreadNames); } - // todo: test stateful rescheduling // todo: test RejectedExecutionException on rescheduler // todo: test InterruptedException // todo: support declarative async + } diff --git a/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java b/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java index eab47805..4a8fc057 100644 --- a/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java +++ b/src/test/java/org/springframework/retry/support/AsyncRetryTemplateTests.java @@ -23,8 +23,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.log4j.ConsoleAppender; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -36,12 +34,8 @@ import org.springframework.retry.policy.SimpleRetryPolicy; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.springframework.retry.util.test.TestUtils.getPropertyValue; /** * @author Dave Syer @@ -49,23 +43,22 @@ public class AsyncRetryTemplateTests extends AbstractAsyncRetryTest { private RetryTemplate retryTemplate; - + @Before @SuppressWarnings({ "unchecked", "rawtypes" }) public void init() { -// org.apache.log4j.BasicConfigurator.configure(); - + // org.apache.log4j.BasicConfigurator.configure(); + Logger root = Logger.getRootLogger(); root.removeAllAppenders(); root.addAppender(new ConsoleAppender(new PatternLayout("%r [%t] %p %c{1} %x - %m%n"))); Logger.getRootLogger().setLevel(Level.TRACE); - + this.retryTemplate = new RetryTemplate(); Map, RetryResultProcessor> map = new HashMap<>(); map.put(Future.class, new FutureRetryResultProcessor()); map.put(CompletableFuture.class, new CompletableFutureRetryResultProcessor()); - SubclassClassifier processors = new SubclassClassifier(map, - (RetryResultProcessor) null); + SubclassClassifier processors = new SubclassClassifier(map, (RetryResultProcessor) null); this.retryTemplate.setRetryResultProcessors(processors); } @@ -78,27 +71,22 @@ public void testSuccessfulRetryCompletable() throws Throwable { SimpleRetryPolicy policy = new SimpleRetryPolicy(x); this.retryTemplate.setRetryPolicy(policy); CompletableFuture result = this.retryTemplate.execute(callback); - assertEquals(callback.defaultResult, - result.get(10000L, TimeUnit.MILLISECONDS)); + assertEquals(callback.defaultResult, result.get(10000L, TimeUnit.MILLISECONDS)); assertEquals(x, callback.jobAttempts.get()); } } // todo: remove of fix after discussion - /*@Test - public void testSuccessfulRetryFuture() throws Throwable { - for (int x = 1; x <= 10; x++) { - FutureRetryCallback callback = new FutureRetryCallback(); - callback.setAttemptsBeforeSchedulingSuccess(1); - callback.setAttemptsBeforeJobSuccess(x); - SimpleRetryPolicy policy = new SimpleRetryPolicy(x + 1); - this.retryTemplate.setRetryPolicy(policy); - Future result = this.retryTemplate.execute(callback); - assertEquals(callback.defaultResult, - result.get(10000L, TimeUnit.MILLISECONDS)); - assertEquals(x, callback.jobAttempts.get()); - } - }*/ + /* + * @Test public void testSuccessfulRetryFuture() throws Throwable { for (int x = 1; x + * <= 10; x++) { FutureRetryCallback callback = new FutureRetryCallback(); + * callback.setAttemptsBeforeSchedulingSuccess(1); + * callback.setAttemptsBeforeJobSuccess(x); SimpleRetryPolicy policy = new + * SimpleRetryPolicy(x + 1); this.retryTemplate.setRetryPolicy(policy); Future + * result = this.retryTemplate.execute(callback); assertEquals(callback.defaultResult, + * result.get(10000L, TimeUnit.MILLISECONDS)); assertEquals(x, + * callback.jobAttempts.get()); } } + */ @Test public void testBackOffInvoked() throws Throwable { @@ -111,8 +99,7 @@ public void testBackOffInvoked() throws Throwable { this.retryTemplate.setRetryPolicy(policy); this.retryTemplate.setBackOffPolicy(backOff); CompletableFuture result = this.retryTemplate.execute(callback); - assertEquals(callback.defaultResult, - result.get(10000L, TimeUnit.MILLISECONDS)); + assertEquals(callback.defaultResult, result.get(10000L, TimeUnit.MILLISECONDS)); assertEquals(x, callback.jobAttempts.get()); assertEquals(1, backOff.startCalls); assertEquals(x - 1, backOff.backOffCalls); @@ -133,11 +120,11 @@ public void testNoSuccessRetry() throws Throwable { fail("Expected IllegalArgumentException"); } catch (ExecutionException e) { - assertTrue("Expected IllegalArgumentException", - e.getCause() instanceof IllegalArgumentException); + assertTrue("Expected IllegalArgumentException", e.getCause() instanceof IllegalArgumentException); assertEquals(retryAttempts, callback.jobAttempts.get()); return; } fail("Expected IllegalArgumentException"); } + }