2026, Jan 08 03:02

Как отправлять результаты задач Celery в Redis Streams без опроса

Переход от опроса к push: свяжите сигналы Celery (task_success и task_failure) с Redis Streams и получайте события завершения задач без запросов к API.

Опрос состояния задач Celery через endpoint FastAPI работает, но когда продюсеры пишут напрямую в Redis, а результаты хранятся как строки Redis, постоянные проверки статуса становятся избыточными. Требование смещается от pull к push: как только worker завершил работу, клиент должен получить уведомление без обращений к вашему API каждые несколько секунд. Аккуратный способ для Redis-центричной архитектуры — публиковать исходы задач как события. Для этой модели отлично подходят Redis Streams.

Контекст: от опроса к событийным обновлениям

Классический паттерн такой: FastAPI планирует работу через Celery, брокер ставит задачи в очередь, а API открывает endpoint, который запрашивает AsyncResult по id. Это и есть опрос. В новой схеме продюсеры отправляют данные прямо в Redis. Задачи попадают в списки Redis, результаты записываются строками Redis. Узлов меньше, но команде всё равно нужно знать, когда задача завершилась. Вместо вопросов каждые N секунд публикуйте исход как событие — и потребители будут реагировать автоматически.

К делу: отправляйте результаты через Redis Streams

Исходы задач известны на стороне worker’а. Celery генерирует сигналы при успешном выполнении и ошибках, и эти сигналы можно прокинуть в Redis Stream. Если по контракту вы продолжаете записывать итоговый payload как строку Redis — сохраняйте это. Но вдобавок добавляйте в поток компактную запись. Фронтенд или любой внешний потребитель может подписаться на этот поток и запускать обновления сразу по приходу новых записей.

"""
Событийная доставка результатов задач через Redis Streams.

Фиксирует исходы задач Celery по сигналам и отправляет их в поток Redis.
"""

from datetime import datetime
import json
from typing import Any
import redis
from celery import Task
from celery.signals import task_success, task_failure

# Держатель соединения Redis
_redis_pool = None

def acquire_redis_conn() -> redis.Redis:
    """Вернуть клиент Redis с теми же параметрами подключения, что и у Celery."""
    global _redis_pool
    if _redis_pool is None:
        redis_dsn = f"rediss://:{youpass}@{yourhost}:{yourport}/0"
        _redis_pool = redis.Redis.from_url(
            redis_dsn,
            ssl_cert_reqs=None,
            decode_responses=True
        )
    return _redis_pool

STREAM_KEY = "your_redis_prefix"

@task_success.connect
def on_task_ok(sender: Task | None = None, result: Any | None = None, **kwargs) -> None:
    """Публиковать успешные исходы задач в поток Redis."""
    try:
        job_id = sender.request.id if sender else "unknown"
        job_name = sender.name if sender else "unknown"

        rds = acquire_redis_conn()

        payload = {
            "task_id": job_id,
            "task_name": job_name,
            "status": "SUCCESS",
            "timestamp": datetime.now().isoformat(),
        }

        try:
            if isinstance(result, (dict, list)):
                payload["result"] = json.dumps(result)
            else:
                payload["result"] = str(result)
        except Exception as exc:
            payload["result"] = f"Error serializing: {str(exc)}"

        rds.xadd(
            STREAM_KEY,
            payload,
            maxlen=10000,
            approximate=True
        )
    except Exception:
        log.error("Error publishing task success to Redis stream with task_id: %s", job_id)

@task_failure.connect
def on_task_error(sender: Task | None = None, exception: Exception | None = None, **kwargs) -> None:
    """Публиковать неуспешные исходы задач в поток Redis."""
    try:
        job_id = sender.request.id if sender else "unknown"
        job_name = sender.name if sender else "unknown"

        rds = acquire_redis_conn()

        payload = {
            "task_id": job_id,
            "task_name": job_name,
            "status": "FAILURE",
            "timestamp": datetime.now().isoformat(),
            "error": str(exception) if exception else "Unknown error"
        }

        rds.xadd(
            STREAM_KEY,
            payload,
            maxlen=10000,
            approximate=True
        )
    except Exception:
        log.error("Error publishing task failure to Redis stream with task_id: %s", job_id)

Обработчик публикует в поток компактный JSON‑подобный payload, фиксируя task_id, имя, статус, метку времени и либо результат, либо ошибку. Поток хранит ограниченную историю благодаря maxlen с приблизительной обрезкой — практичный дефолт для операционных лент.

Подключение к воркеру Celery

Мост остаётся пассивным, пока воркер его не импортирует. Загрузите его на этапе инициализации приложения — так обработчики сигналов гарантированно зарегистрируются.

"""
Инициализация приложения Celery и обнаружение задач.
"""

import config as workerconf
from celery import Celery

worker_app = Celery("YOUR_APP_NAME")
worker_app.config_from_object(workerconf)
worker_app.autodiscover_tasks(["DIR_TO_YOUR_TASKS"])

# Зарегистрируйте обработчики сигналов, импортировав модуль-мост
import job_stream_bridge

# Предоставьте экземпляр приложения
process_bus = worker_app

Какую именно проблему это решает

Без событийного канала потребителям приходится опрашивать строку Redis или спрашивать статус у API. Переместив уведомление о результате в Redis Stream, вы превращаете завершение в событие, которое клиенты могут непрерывно потреблять. Фронтенд может подписаться на поток и автоматически обновлять интерфейс в момент появления записей, при необходимости по‑прежнему читая эталонный результат из строки Redis.

Замечания о соседних паттернах Redis

В Redis‑first дизайне есть и другие механизмы уведомления о завершении. Pub/Sub даёт «fire‑and‑forget» темы, а блокирующие операции вроде BRPOP уместны в некоторых сценариях. Приведённый подход делает ставку на Redis Streams, где потребители слушают добавляемые записи, а лента хранит небольшой скользящий бэклог.

Почему это важно

Событийное завершение убирает периодический трафик опроса и сокращает субъективную задержку для последующих систем. Подключив Redis Streams к сигналам Celery, команды, которые уже публикуют задачи в списки Redis и записывают результаты строками, могут добавить push‑уведомления без изменений пути выполнения задач. Это также аккуратно разделяет обязанности: воркеры фиксируют исходы, клиенты решают, что с ними делать.

Выводы

Если продюсеры пишут напрямую в Redis и вам нужно уведомлять о завершении без опроса, свяжите сигналы Celery task_success и task_failure с Redis Stream. Сохраняйте текущий контракт с результатом‑строкой, если он важен для интеграции, а потребителям позвольте слушать поток и автоматически запускать свои процессы. Для изучения вариантов в арсенале Redis есть и pub/sub, и BRPOP, но потоковая лента органично решает задачу событийных обновлений с ограниченным бэклогом.