Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dramatiq retry sends more retries instead of max 3 that are configured #605

Open
bvidovic1 opened this issue Jan 25, 2024 · 4 comments
Open

Comments

@bvidovic1
Copy link

bvidovic1 commented Jan 25, 2024

What OS are you using?

Running on python docker:
python:3.8-slim

What version of Dramatiq are you using?

v1.15.0

What did you do?

I have dramatiq setup to take a message from the queue and send it to relevant endpoint that performs some ML processing.
I start dramatiq with 1 process and 4 threads.

Dramatiq actor is setup like this:

@dramatiq.actor(
    **config.actors.handle_bla.params,
    time_limit=1000 * TIMEOUT,
    min_backoff=1000 * MIN_BACKOFF,
    max_backoff=1000 * MAX_BACKOFF,
    retry_when=should_retry_bla_handler,
)
def handle_bla(message: Mapping[str, Any]):

# code that does some processing and sends a post request to specific endpoint and get the result back.

Implementation of retry handler:

def should_retry_bla_handler(retries_so_far: int, exception: Exception):
    """
    Determines whether to retry processing a job based on the number of retries so far and the type of exception encountered.

    Args:
        retries_so_far (int): The number of retries attempted so far.
        exception (Exception): The exception that was raised.

    Returns:
        bool: True if processing should be retried, False otherwise.
    """
    max_retries = config.actors.handle_bla.get("max_retries", 3)

    logger.info(f"Attempted retries so far: {retries_so_far}")
    if retries_so_far >= max_retries:
        logger.error(f"Max number of job retries exceeded ({max_retries})")
        return False

    # When an invalid message is passed, we do not want to retry processing
    if isinstance(
        exception,
        (
            InvalidMessagePayloadError,
            InvalidProcessingRequestError,
        ),
    ):
        logger.error(f"Invalid job processing arguments provided: {exception}")
        return False
    # When there is a document mismatch between request and response we do not want to retry processing
    elif isinstance(
        exception,
        DocumentsMismatchInResponse,
    ):
        logger.error(f"Documents in response vs documents sent mismatch: {exception}")
        return False
    elif isinstance(
        exception,
        ConfigurationError,
    ):
        logger.error(f"Missing ML endpoint in configuration: {exception}")

    if isinstance(exception, ServiceRequestHandlingError):
        logger.warning("Encountered issue while handling processing request - retrying...")

    return True

What did you expect would happen?

I expect retry to be tried 3 times and if unsuccessful it should exit.

What happened?

Firstly what happens is I receive a message that 2 retries will be tried in a specific amount of milliseconds based on the min_backoff and max_backoff variables. Why 2 right away? Why not 1 by 1 sequentially?

Secondly, request retry happens more times (6,7,8 times) even though max is 3. This is causing a problem for my service, because it sends multiple requests towards the endpoint and clogs the workers there with same request. Is this happening because threads are used? Should I add some kind of threading lock mechanism?

Thanks in advance for any guidance.

@bvidovic1 bvidovic1 changed the title Dramatiq retry sentlot more instead of max 3 that are configured Dramatiq retry sends more retries instead of max 3 that are configured Jan 25, 2024
@kitahkitah
Copy link

@bvidovic1 , Hi, have you found a solution on this issue?

@bvidovic1
Copy link
Author

@kitahkitah

I rewrote retry handler logic, added custom intervals and changed a way a bit how/when retry is trigerred.

If you need it, I can share code snippet when I am at my desk.

@kitahkitah
Copy link

@bvidovic1 , that would be very kind of you, thank you!

@bvidovic1
Copy link
Author

bvidovic1 commented Feb 3, 2025

@kitahkitah

This is my retry method and how I use it in the actor. It is customized to check certain job id and some other stuff but you will get the idea. Head up that version I am using is 1.16.0 and this has been running for a year or so now successfully. I haven't had time to uplift or check if anything has improved recently.

custom retry:

def handle_retry(
    job_id: str,
    e: Exception,
    backoff_values: Dict[int, int],
    error_msg: str,
) -> None:
    """
    Handle exceptions raised during a request to the ML service by managing retries and delays.

    Args:
        job_id (str): The ID of the job being processed.
        e (Exception): The exception raised during the request.
        backoff_values (dict[int, int]): A dictionary mapping retry counts to corresponding backoff delays.
        error_msg (str): A descriptive error message associated with the exception.

    Raises:
        ServiceRequestHandlingError: If the number of retries so far is not in the predefined backoff values.
        dramatiq.errors.Retry: If the exception should trigger a retry with a specified delay.

    Notes:
        This method logs information about the message, retries, and backoff.

    Example:
        try:
            # Code that may raise requests.exceptions.RequestException
        except requests.exceptions.RequestException as e:
            backoff_values = {
                0: 5_000,   # backoff before first retry - 5 seconds
                1: 10_000,  # backoff before second retry - 10 seconds
                2: 60_000,  # backoff before third retry - 60 seconds
            }
            handle_retry(e, backoff_values, "Error in ML service request")
    """
    msg: dramatiq.Message = dramatiq.middleware.CurrentMessage.get_current_message()
    logger.debug(f"Message info:\n{msg.options}")

    retries_so_far: int = msg.options.get("retries", 0)
    logger.debug(f"Retries so far: {retries_so_far}")

    if retries_so_far not in backoff_values:
        logger.error(f"Exceeded maximum number of retries: {retries_so_far}")
        save_status_to_db(job_id, JobProcessingStatus.ERROR, f"{error_msg}")
        raise ServiceRequestHandlingError(error_msg) from e
    # Log the retry and backoff delay
    backoff_delay = backoff_values[retries_so_far]
    logger.warning(f"Retrying after exception: {str(e)} | Backoff delay: {backoff_delay}ms")

    raise dramatiq.errors.Retry(str(e), delay=backoff_delay)

How it is used in the actor:

"""
Important:
* 'broker_priority' refers to global message prioritisation by RMQ, where
  priority is in range [0, 255], `255` being the highest
  local actor `priority` parameter refers to worker-local prioritisation of *prefetched* messages,
  where priority is in range: [255, 0], `0` being the highest (opposite of `broker_priority`)

* throws argument in actor means that retry will not be started if exceptions from given tuple are thrown
"""


@dramatiq.actor(
    **config.actors.handle_actor.params,
    time_limit=1000 * TIMEOUT,
    max_retries=MAX_RETRIES,
    throws=(
        ConfigurationError,
        DocumentsMismatchInResponse,
        InvalidProcessingRequestError,
        InvalidMessagePayloadError,
        ServiceRequestHandlingError,
        ResponseValidationError,
        NoDocumentsInBackofficeError,
    ),
)
def handle_actor(message: Mapping[str, Any]):
.
.
.

So, I do not use default retry from the dramatiq but this one, for some exceptions I do not want to do the retry (stated in throws) and for others where I need it I just call the handle_retry in the except: block.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants