Skip to content

Commit

Permalink
Update RetryWithBackoffEntrypointExecutor docs (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
janbjorge authored Jan 16, 2025
1 parent 5cadbd1 commit 37bb72d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 10 deletions.
63 changes: 56 additions & 7 deletions docs/pgqueuer.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,73 @@ The `RetryWithBackoffEntrypointExecutor` is a specialized custom executor design

#### How Does It Work?

This executor automatically retries jobs that fail during processing. It calculates delays between retries using exponential backoff, where the delay increases with each attempt, and adds jitter to avoid contention when multiple jobs are retried simultaneously.
This executor automatically retries jobs that fail during processing. It calculates delays between retries using exponential backoff, where the delay increases with each attempt, and adds jitter to avoid contention when multiple jobs are retried simultaneously. The retry process ensures that transient issues (e.g., temporary API unavailability or network glitches) do not cause jobs to fail permanently.

#### Features of the RetryWithBackoffEntrypointExecutor

1. **Retry Mechanism**: Automatically attempts to reprocess failed jobs, reducing manual intervention.
2. **Exponential Backoff**: Ensures retries are spaced progressively further apart, minimizing strain on external systems.
3. **Jitter**: Introduces randomness to retry delays, preventing job collisions in high-concurrency scenarios.
1. **Retry Mechanism**: Automatically attempts to reprocess failed jobs, reducing manual intervention and increasing fault tolerance.
2. **Exponential Backoff**: Ensures retries are spaced progressively further apart, minimizing strain on external systems while allowing time for transient issues to resolve.
3. **Jitter**: Introduces randomness to retry delays, preventing job collisions and reducing the risk of contention in high-concurrency scenarios.
4. **Customizable Limits**: Configure the maximum number of retry attempts, the cap on exponential backoff delay, and the total allowed retry time.

#### Example Use Case

The `RetryWithBackoffEntrypointExecutor` is ideal for scenarios like:

- Interacting with unreliable external APIs.
- Handling transient network failures.
- Retrying database operations during temporary outages.
- Interacting with unreliable external APIs prone to rate limiting or downtime.
- Handling transient network failures, such as timeouts or temporary disconnections.
- Retrying database operations during temporary outages or deadlock scenarios.

By using this executor, you can enhance system resilience and maintain smooth operations without overloading resources.

#### Example Implementation

Here’s an example of how to use the `RetryWithBackoffEntrypointExecutor` in a PGQueuer setup:

```python
import asyncpg
from datetime import timedelta
from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.executors import RetryWithBackoffEntrypointExecutor
from pgqueuer.models import Job

async def create_pgqueuer() -> PgQueuer:
# Connect to the PostgreSQL database
connection = await asyncpg.connect()
driver = AsyncpgDriver(connection)
pgq = PgQueuer(driver)

# Define an entrypoint with retry and exponential backoff logic
@pgq.entrypoint(
"retry_with_backoff",
executor_factory=lambda parameters: RetryWithBackoffEntrypointExecutor(
parameters=parameters,
max_attempts=5, # Retry the job up to 5 times
max_delay=timedelta(seconds=0.5), # Cap exponential backoff at 0.5 seconds
max_time=timedelta(seconds=1), # Ensure the entire retry process finishes within 1 second
),
)
async def retry_with_backoff(job: Job) -> None:
# Simulate a transient failure scenario
print(f"Processing job with retry logic: {job!r}")

return pgq
```

#### Explanation of the Example

1. **Executor Configuration**:
- `max_attempts`: Limits the retries to 5 attempts.
- `max_delay`: Caps the exponential backoff delay at 0.5 seconds to prevent excessively long waits.
- `max_time`: Ensures the entire retry process, including all attempts, completes within 1 second to avoid prolonged processing.

2. **Why Use It?**:
- The retry logic handles failures gracefully, especially in scenarios where a brief wait or retry can resolve the issue.
- Exponential backoff with jitter reduces the likelihood of resource contention, ensuring system stability even during high load.

By integrating `RetryWithBackoffEntrypointExecutor`, you can build robust workflows that recover automatically from transient issues, reducing the need for manual intervention.

---

### Scheduler
Expand Down
11 changes: 8 additions & 3 deletions pgqueuer/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class RetryWithBackoffEntrypointExecutor(EntrypointExecutor):
)

# maximum delay for retry
max_delay: float = dataclasses.field(
default=10.0,
max_delay: float | timedelta = dataclasses.field(
default=timedelta(seconds=10),
)

# maximum time used on retry
Expand Down Expand Up @@ -181,7 +181,12 @@ async def execute(self, job: models.Job, context: models.Context) -> None:
if self.max_attempts and attempt >= self.max_attempts:
raise errors.MaxRetriesExceeded(self.max_attempts) from e

await asyncio.sleep(min(self.exponential_delay(attempt), self.max_delay))
max_delay = (
self.max_delay
if isinstance(self.max_delay, float | int)
else self.max_delay.total_seconds()
)
await asyncio.sleep(min(self.exponential_delay(attempt), max_delay))
except (
TimeoutError,
asyncio.exceptions.TimeoutError,
Expand Down

0 comments on commit 37bb72d

Please sign in to comment.