Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An idea for async retry #176

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,48 @@ The preceding example uses a default `RetryTemplate` inside the interceptor. To
policies or listeners, you need only inject an instance of `RetryTemplate` into the
interceptor.

## Asynchronous retry
### Terms
```java

CompletableFuture<HttpResponse<String>> completableFuture = retryTemplate.execute(
ctx -> httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
);
```
- __async callback__ - a callback that returns one of the supported async types (CompletableFuture, Future). Usually, async retry callback is one, that does not perform a heavy work by itself, but schedules the work to some worker and returns an instance of async type to track the progress. Failure of async callback itself usually means failure of scheduling (but not of actual work).
Failure of async callback (of scheduling) and of actual job will both be retried on a common basis, according to configured policies.

- __job__ - a task with payload, usually heavy, which result will be available through the instance of async type, returned by async callback (and, consequently, by _execute_ method)

- __rescheduling executor__ - an instance of executor, used for scheduling a new retry attempt after a delay (provided by a backoff policy). The type of executor is restricted by ScheduledExecutorService, to take advantage of its "schedule after delay" feature, which allows us to implement backoff without blocking a thread. Rescheduling executor is used for all retries except of initial scheduling retries (initial invocation of async callback).

### Initial invocation of async callback
Invocation of template.execute(asyncCallback) returns when first scheduling of job succeeded, or all initial scheduling attempts failed. Retry template does not produce async containers by itself, therefore there is nothing to return from _execute_ until initial invocation succeed. Backoffs between failing initial scheduling attempts will be performed by default sleeper by means of Thread.sleep() on caller thread. Why this approach is used:
- to be compatible with generic API of RetryOperations (where return type of callback equals to retrun type of execute(...))
- to provide an additional mean of back pressure

### Subsequent invocations of async callback
If the first execution of the _job_ failed and a retry is allowed by the policy, the next invocation of the async callback will be scheduled on _rescheduling executor_

### Async callbacks without executor
If executor is not provided, a backoff will be performed by Thread.sleep() on the client thread (for initial scheduling) or on the worker thread (for job failures, or for subsequent schedulings).

### Configuration example
```java
RetryTemplate.builder()
// activte the async retry feature with an executor
.asyncRetry(Executors.newScheduledThreadPool(1))
.fixedBackoff(1000)
.build();

RetryTemplate.builder()
// activte the async retry feature without an executor.
// Thread.sleep() will be used for backoff.
.asyncRetry()
.fixedBackoff(1000)
.build();
```

## Contributing

Spring Retry is released under the non-restrictive Apache 2.0 license
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.springframework.retry.backoff;

import java.util.function.Supplier;

public interface BackoffPeriodSupplier extends Supplier<Long> {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.springframework.retry.backoff;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RememberPeriodSleeper implements Sleeper, BackoffPeriodSupplier {

private static final Log logger = LogFactory.getLog(RememberPeriodSleeper.class);

private volatile Long lastBackoffPeriod;

@Override
public void sleep(long backOffPeriod) {
logger.debug("Remembering a sleeping period instead of sleeping: " + backOffPeriod);
lastBackoffPeriod = backOffPeriod;
}

@Override
public Long get() {
return lastBackoffPeriod;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import org.springframework.classify.Classifier;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.RetryState;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.retry.support;

import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryException;
import org.springframework.retry.backoff.BackoffPeriodSupplier;

/**
* @author Dave Syer
* @param <T> The result type
*/
public abstract class AsyncRetryResultProcessor<T> implements RetryResultProcessor<T> {

private static final Log logger = LogFactory.getLog(AsyncRetryResultProcessor.class);

protected T doNewAttempt(Supplier<Result<T>> supplier) throws Throwable {
logger.debug("Performing the next async callback invocation...");
return supplier.get().getOrThrow();
}

protected abstract T scheduleNewAttemptAfterDelay(Supplier<Result<T>> supplier,
ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx)
throws Throwable;

protected T handleException(Supplier<Result<T>> supplier, Consumer<Throwable> handler, Throwable throwable,
ScheduledExecutorService reschedulingExecutor, BackoffPeriodSupplier lastBackoffPeriodSupplier,
RetryContext ctx) {
try {
handler.accept(unwrapIfNeed(throwable));

if (reschedulingExecutor == null || lastBackoffPeriodSupplier == null) {
return doNewAttempt(supplier);
}
else {
long rescheduleAfterMillis = lastBackoffPeriodSupplier.get();
logger.debug("Scheduling a next retry with a delay = " + rescheduleAfterMillis + " millis...");
return scheduleNewAttemptAfterDelay(supplier, reschedulingExecutor, rescheduleAfterMillis, ctx);
}
}
catch (Throwable t) {
throw RetryTemplate.runtimeException(unwrapIfNeed(t));
}
}

static Throwable unwrapIfNeed(Throwable throwable) {
if (throwable instanceof ExecutionException || throwable instanceof CompletionException
|| throwable instanceof RetryException) {
return throwable.getCause();
}
else {
return throwable;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.retry.support;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.BackoffPeriodSupplier;

/**
* A {@link RetryResultProcessor} for a {@link CompletableFuture}. If a
* {@link RetryCallback} returns a <code>CompletableFuture</code> this processor can be
* used internally by the {@link RetryTemplate} to wrap it and process the result.
*
* @author Dave Syer
* @param <V> The result type
*/
public class CompletableFutureRetryResultProcessor<V> extends AsyncRetryResultProcessor<CompletableFuture<V>> {

protected final Log logger = LogFactory.getLog(getClass());

@Override
public Result<CompletableFuture<V>> process(CompletableFuture<V> completable,
Supplier<Result<CompletableFuture<V>>> supplier, Consumer<Throwable> handler,
ScheduledExecutorService reschedulingExecutor, BackoffPeriodSupplier lastBackoffPeriodSupplier,
RetryContext ctx) {

CompletableFuture<V> handle = completable
.thenApply(CompletableFuture::completedFuture).exceptionally(throwable -> handleException(supplier,
handler, throwable, reschedulingExecutor, lastBackoffPeriodSupplier, ctx))
.thenCompose(Function.identity());

return new Result<>(handle);
}

protected CompletableFuture<V> scheduleNewAttemptAfterDelay(Supplier<Result<CompletableFuture<V>>> supplier,
ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx) {
CompletableFuture<CompletableFuture<V>> futureOfFurtherScheduling = new CompletableFuture<>();

reschedulingExecutor.schedule(() -> {
try {
RetrySynchronizationManager.register(ctx);
futureOfFurtherScheduling.complete(doNewAttempt(supplier));
}
catch (Throwable t) {
futureOfFurtherScheduling.completeExceptionally(t);
throw RetryTemplate.runtimeException(t);
}
finally {
RetrySynchronizationManager.clear();
}
}, rescheduleAfterMillis, TimeUnit.MILLISECONDS);

return futureOfFurtherScheduling.thenCompose(Function.identity());
}

}
Loading