2025, Oct 30 23:46
Как подружить повторы Celery с dead-letter в RabbitMQ
Разбираем, почему при повторах Celery сообщения не попадают в dead-letter RabbitMQ, и как это исправить: acks_late, поведение Reject, примеры кода и схема.
Когда задачи Celery подключены к очередям «мёртвых» писем (dead-letter) в RabbitMQ, обычные ошибки легко направляются по нужному маршруту. Сложности начинаются, когда в дело вступают повторные попытки: задачи корректно повторяются, затем окончательно падают, но сообщение так и не появляется в очереди dead-letter. Ниже — краткое объяснение, что происходит, и как настроить согласованную работу повторов и dead-letter.
Как воспроизвести проблему
В следующем примере задача падает, запускает повторы, а после их исчерпания поднимает Reject, чтобы отправить сообщение в очередь dead-letter. Однако без правильного поведения подтверждений сообщение в конце не будет отправлено в dead-letter.
from celery import Celery, Task
from celery.exceptions import Reject
app = Celery("dlq_retry_demo")
class DlqAwareTask(Task):
    # acks_late намеренно не задан здесь
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # После исчерпания повторных попыток нужно отправить в DLQ
        raise Reject()
@app.task(bind=True, base=DlqAwareTask, autoretry_for=(RuntimeError,), retry_kwargs={"max_retries": 3, "countdown": 1})
def flaky_work(self):
    # Имитируем сбой, который запустит автоматический повтор
    raise RuntimeError("boom")
Почему так происходит
При включённых повторах Celery подтверждает задачу непосредственно перед выполнением. Если она затем падает, брокер уже считает сообщение обработанным. В этот момент поднятие Reject не отправляет сообщение в очередь dead-letter, потому что у брокера больше нет неподтверждённого сообщения, которое можно отклонить или отправить в dead-letter.
Похоже, вам нужно использовать acks_late. При повторных попытках задача подтверждается непосредственно перед выполнением, даже если затем завершается исключением. См. FAQ. Более подробная документация по acks_late.
Рабочее решение
Включите отложенные подтверждения, чтобы сообщение оставалось неподтверждённым до успешного завершения задачи. С такими подтверждениями явные ошибки или Reject смогут отправить сообщение в вашу очередь dead-letter после исчерпания повторов.
from celery import Celery, Task
from celery.exceptions import Reject
app = Celery("dlq_retry_demo")
# Включаем отложенные подтверждения
app.conf.task_acks_late = True
class DlqAwareTask(Task):
    # Либо задайте на уровне конкретной задачи
    acks_late = True
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # После исчерпания повторов это отправит в DLQ
        raise Reject()
# Автоматические повторы работают с отложенными подтверждениями
@app.task(bind=True, base=DlqAwareTask, autoretry_for=(RuntimeError,), retry_kwargs={"max_retries": 3, "countdown": 1})
def flaky_work(self):
    raise RuntimeError("boom")
# Явный повтор тоже сработает
@app.task(bind=True, base=DlqAwareTask)
def flaky_manual(self):
    try:
        raise RuntimeError("oops")
    except RuntimeError as exc:
        if self.request.retries < 3:
            raise self.retry(exc=exc, countdown=1)
        # После последней попытки — отправляем сообщение в DLQ
        raise Reject()
Есть важный нюанс при сочетании on_failure и повторов. Если on_failure сам инициирует повторную попытку, Reject в этом сценарии обрабатывается иначе, и сообщение не будет направлено в очередь dead-letter. Иными словами, используйте on_failure, чтобы сделать Reject после исчерпания повторов, но не запускайте повтор из on_failure, если ожидаете того же поведения с dead-letter.
class RetryInFailureTask(Task):
    acks_late = True
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # Запуск повтора здесь меняет то, как обрабатывается Reject
        if self.request.retries < 3:
            raise self.retry(exc=exc, countdown=1)
        # После исчерпания повторов этот Reject будет обработан иначе
        raise Reject()
Почему это важно
Повторные попытки — нормальная часть выполнения задач, но они не должны прятать провалившиеся сообщения от вашей dead-letter логики. Включённый acks_late синхронизирует жизненный цикл сообщения с обработкой ошибок, чтобы финальные сбои были видны там, где вы этого ждёте.
Практические выводы
Настройте отложенные подтверждения для задач, чтобы брокер мог отправлять финальные сбои в dead-letter после повторов. Используйте Reject либо внутри самой задачи, либо в on_failure после исчерпания попыток. Если же вы инициируете повтор из on_failure, учитывайте, что Reject будет маршрутизироваться иначе, и сообщение не попадёт в очередь dead-letter.
Статья основана на вопросе на StackOverflow от grahamlyons и ответе grahamlyons.