Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crash Loop Due To Posion Message Without Quorum Queues #35

Open
Arseniy-Popov opened this issue Dec 22, 2024 · 1 comment
Open

Crash Loop Due To Posion Message Without Quorum Queues #35

Arseniy-Popov opened this issue Dec 22, 2024 · 1 comment

Comments

@Arseniy-Popov
Copy link

Arseniy-Popov commented Dec 22, 2024

If you get a task that crashes the consumer, for example, due to a memory leak, the consumer will enter a crash loop that it is unable to recover from.

  • The consumer receives a messages and starts a task that eats up memory.
  • The process is killed with SIGKILL due to an OOM condition.
  • Because that is not an exception that can be caught, the consumer is unable to ackowledge the message and, if the retry middleware is used, to update the retry count and send a message with the updated retry count.
  • Instead, the original message is requeued to the queue as is. The redeliver property is set to True, but it doesn't contain the number of deliveries.
  • The consumer picks it up again and the loop continues.

This can be prevented with the use of quorum queues, which keep track of the number deliveries with x-delivery-count and automatically drop or dead-letter a message if that count exceeds the delivery-limit setting.

Here's a reproducible example:

Consumer/producer code:

  import asyncio
  import time
  
  from taskiq import SimpleRetryMiddleware
  from taskiq_aio_pika import AioPikaBroker
  
  
  broker = AioPikaBroker("amqp://user:password@localhost:5672").with_middlewares(
      SimpleRetryMiddleware(default_retry_count=3),
  )
  
  
  def eat_memory(max_gb=1.0, chunk_mb=100.0):
      chunk_size, max_bytes, chunks, total = (
          int(chunk_mb * 1024 * 1024),
          int(max_gb * 1024 * 1024 * 1024),
          [],
          0,
      )
  
      try:
          while True:
              if total >= max_bytes:
                  break
              chunks.append(bytearray(chunk_size))
              total += chunk_size
              time.sleep(0.1)
      except:
          pass
      finally:
          chunks.clear()
  
  
  @broker.task
  async def test(input: int) -> None:
      if input == 3:
          eat_memory()
  
  
  async def main() -> None:
      await broker.startup()
      await test.kiq(1)
      await test.kiq(2)
      await test.kiq(3)
      await broker.shutdown()
  
  
  if __name__ == "__main__":
      asyncio.run(main())
  

docker-compose with a memory limit:

version: '3.8'

services:
  taskiq-consumer:
    build:
      context: .
      dockerfile: Dockerfile_taskiq_poison_message
    volumes:
      - .:/app
    command: poetry run taskiq worker taskiq_poison_message:broker
    restart: unless-stopped
    environment:
      - PYTHONUNBUFFERED=1
    network_mode: "host"
    deploy:
      resources:
        limits:
          memory: 500MB
    depends_on:
      - rabbitmq

  rabbitmq:
    image: 'rabbitmq:4-management'
    ports:
      - '5672:5672'
      - '15672:15672'
    environment:
      - RABBITMQ_DEFAULT_USER=user
      - RABBITMQ_DEFAULT_PASS=password

Logs showing the worker keeps trying to process the third message and crashing indefinitely.

[2024-12-20 12:51:57,539][taskiq.process-manager][INFO   ][MainProcess] Process worker-0 restarted with pid 22
[2024-12-20 12:51:57,619][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2024-12-20 12:51:57,641][taskiq.process-manager][INFO   ][MainProcess] Process worker-1 restarted with pid 24
[2024-12-20 12:51:57,704][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
[2024-12-20 12:58:17,253][taskiq.receiver.receiver][INFO   ][worker-0] Executing task taskiq_poison_message:test with ID: 7dbe4ff8d044446aa9a3fbafec4ad25b
[2024-12-20 12:58:17,255][taskiq.receiver.receiver][INFO   ][worker-1] Executing task taskiq_poison_message:test with ID: 87f46763d49b420f8b0b7216055e118f
[2024-12-20 12:58:17,256][taskiq.receiver.receiver][INFO   ][worker-0] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:18,858][taskiq.process-manager][INFO   ][MainProcess] worker-0 is dead. Scheduling reload.
[2024-12-20 12:58:18,976][taskiq.receiver.receiver][INFO   ][worker-1] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:19,963][taskiq.process-manager][INFO   ][MainProcess] Process worker-0 restarted with pid 26
[2024-12-20 12:58:21,153][taskiq.process-manager][INFO   ][MainProcess] worker-1 is dead. Scheduling reload.
[2024-12-20 12:58:22,083][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2024-12-20 12:58:22,111][taskiq.receiver.receiver][INFO   ][worker-0] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:22,232][taskiq.process-manager][INFO   ][MainProcess] Process worker-1 restarted with pid 28
[2024-12-20 12:58:22,348][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
[2024-12-20 12:58:23,794][taskiq.receiver.receiver][INFO   ][worker-1] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:24,365][taskiq.process-manager][INFO   ][MainProcess] worker-0 is dead. Scheduling reload.
[2024-12-20 12:58:25,464][taskiq.process-manager][INFO   ][MainProcess] Process worker-0 restarted with pid 30
[2024-12-20 12:58:25,573][taskiq.process-manager][INFO   ][MainProcess] worker-1 is dead. Scheduling reload.
[2024-12-20 12:58:26,002][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2024-12-20 12:58:26,022][taskiq.receiver.receiver][INFO   ][worker-0] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:26,631][taskiq.process-manager][INFO   ][MainProcess] Process worker-1 restarted with pid 32
[2024-12-20 12:58:27,310][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
[2024-12-20 12:58:27,564][taskiq.receiver.receiver][INFO   ][worker-1] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:27,757][taskiq.process-manager][INFO   ][MainProcess] worker-0 is dead. Scheduling reload.
[2024-12-20 12:58:28,825][taskiq.process-manager][INFO   ][MainProcess] Process worker-0 restarted with pid 34
[2024-12-20 12:58:29,545][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2024-12-20 12:58:29,570][taskiq.receiver.receiver][INFO   ][worker-0] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
[2024-12-20 12:58:29,931][taskiq.process-manager][INFO   ][MainProcess] worker-1 is dead. Scheduling reload.
[2024-12-20 12:58:30,994][taskiq.process-manager][INFO   ][MainProcess] Process worker-1 restarted with pid 36
[2024-12-20 12:58:31,687][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
[2024-12-20 12:58:31,713][taskiq.receiver.receiver][INFO   ][worker-1] Executing task taskiq_poison_message:test with ID: 37f99831dc1e41f78bd54c7c342452fe
@Arseniy-Popov
Copy link
Author

Arseniy-Popov commented Dec 29, 2024

I've added two PRs that address this issue by

  • Adding to taskiq-aio-pika support for RabbitMQ's quorum queues, which keep track of the number of message deliveries.
  • Adding to taskiq a parameter for maximum permitted number of message processing attempts. If that number is reached, the message is logged and ACK'ed upon being received.

The logs from the above example now look like this:

broker = AioPikaBroker(
    "amqp://guest:guest@localhost:5672",
    queue_type=QueueType.QUORUM,
    max_attempts_at_message=3,
)
[2024-12-29 14:33:01,263][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 9
[2024-12-29 14:33:01,265][taskiq.process-manager][INFO   ][MainProcess] Started process worker-1 with pid 10
[2024-12-29 14:33:01,388][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2024-12-29 14:33:01,388][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
[2024-12-29 14:33:15,762][taskiq.receiver.receiver][INFO   ][worker-1] Executing task taskiq_produce:test with ID: 4d3adb8415b341a191d0e96d4f77d38d
[2024-12-29 14:33:15,768][taskiq.receiver.receiver][INFO   ][worker-0] Executing task taskiq_produce:test with ID: 531d7932ca0f4d56b08042970c497275
[2024-12-29 14:33:15,776][taskiq.receiver.receiver][INFO   ][worker-1] Executing task taskiq_produce:test with ID: cd02b77d1c8c4c5d94ab7ab5a4b9c2a2
[2024-12-29 14:33:15,782][taskiq.receiver.receiver][INFO   ][worker-0] Executing task taskiq_produce:test with ID: 72b0212e0f874b3e87d47818d225ee8b
[2024-12-29 14:33:17,462][taskiq.receiver.receiver][INFO   ][worker-0] Executing task taskiq_produce:test with ID: cd02b77d1c8c4c5d94ab7ab5a4b9c2a2
[2024-12-29 14:33:17,547][taskiq.process-manager][INFO   ][MainProcess] worker-1 is dead. Scheduling reload.
[2024-12-29 14:33:18,699][taskiq.process-manager][INFO   ][MainProcess] Process worker-1 restarted with pid 14
[2024-12-29 14:33:19,457][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
[2024-12-29 14:33:19,502][taskiq.receiver.receiver][INFO   ][worker-1] Executing task taskiq_produce:test with ID: cd02b77d1c8c4c5d94ab7ab5a4b9c2a2
[2024-12-29 14:33:19,812][taskiq.process-manager][INFO   ][MainProcess] worker-0 is dead. Scheduling reload.
[2024-12-29 14:33:21,044][taskiq.process-manager][INFO   ][MainProcess] Process worker-0 restarted with pid 16
[2024-12-29 14:33:21,220][taskiq.process-manager][INFO   ][MainProcess] worker-1 is dead. Scheduling reload.
[2024-12-29 14:33:21,806][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2024-12-29 14:33:21,839][taskiq.receiver.receiver][ERROR  ][worker-0] Permitted number of attempts at processing message b'{"task_id": "cd02b77d1c8c4c5d94ab7ab5a4b9c2a2", "task_name": "taskiq_produce:test", "labels": {}, "labels_types": {}, "args": [3], "kwargs": {}}' has been exhausted after 3 attempts.
[2024-12-29 14:33:22,223][taskiq.process-manager][INFO   ][MainProcess] Process worker-1 restarted with pid 18
[2024-12-29 14:33:22,304][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.

Instead of the 4th processing attempt it logs Permitted number of attempts at processing message b'{"task_id": "cd02b77d1c8c4c5d94ab7ab5a4b9c2a2", "task_name": "taskiq_produce:test", "labels": {}, "labels_types": {}, "args": [3], "kwargs": {}}' has been exhausted after 3 attempts. and acks the message.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant