2025, Dec 16 15:00
Event-Driven Celery Task Notifications with Redis Streams in FastAPI: Replace Polling with Push
Stream Celery task outcomes to Redis Streams in FastAPI to eliminate polling. Push events, keep results as Redis strings, and update clients in real time.
Polling Celery task state from a FastAPI endpoint works, but once producers write directly to Redis and you store results as Redis strings, constant status checks become wasteful. The requirement shifts from pull to push: as soon as the worker finishes, the client should be notified without hitting your API every few seconds. The clean way to do this in a Redis-centric setup is to emit task outcomes as events. Redis Streams fit that model well.
Context: from polling to event-driven updates
The classic pattern looks like this: FastAPI schedules work via Celery, the broker queues tasks, and the API exposes an endpoint that queries AsyncResult by id. That’s polling. In the new setup, producers push their payloads straight into Redis. Tasks land in Redis lists, results are written as Redis strings. This reduces moving parts, but the team still needs to know when a job finishes. Instead of asking every N seconds, publish the outcome as an event and let consumers react automatically.
Straight to the point: push results via Redis Streams
Task outcomes already exist at the worker boundary. Celery raises signals when jobs succeed or fail, and those signals can be bridged into a Redis Stream. Keep writing the final payload as a Redis string if that’s part of your contract, but additionally append a compact record into a stream. The frontend or any external consumer can listen to that stream and trigger updates as soon as entries arrive.
"""
Event-style task result propagation via Redis Streams.
Captures Celery task outcomes with signals and pushes them into a Redis stream.
"""
from datetime import datetime
import json
from typing import Any
import redis
from celery import Task
from celery.signals import task_success, task_failure
# Redis connection holder
_redis_pool = None
def acquire_redis_conn() -> redis.Redis:
"""Return a Redis client tied to the same connection settings as Celery."""
global _redis_pool
if _redis_pool is None:
redis_dsn = f"rediss://:{youpass}@{yourhost}:{yourport}/0"
_redis_pool = redis.Redis.from_url(
redis_dsn,
ssl_cert_reqs=None,
decode_responses=True
)
return _redis_pool
STREAM_KEY = "your_redis_prefix"
@task_success.connect
def on_task_ok(sender: Task | None = None, result: Any | None = None, **kwargs) -> None:
"""Publish successful task outcomes to a Redis stream."""
try:
job_id = sender.request.id if sender else "unknown"
job_name = sender.name if sender else "unknown"
rds = acquire_redis_conn()
payload = {
"task_id": job_id,
"task_name": job_name,
"status": "SUCCESS",
"timestamp": datetime.now().isoformat(),
}
try:
if isinstance(result, (dict, list)):
payload["result"] = json.dumps(result)
else:
payload["result"] = str(result)
except Exception as exc:
payload["result"] = f"Error serializing: {str(exc)}"
rds.xadd(
STREAM_KEY,
payload,
maxlen=10000,
approximate=True
)
except Exception:
log.error("Error publishing task success to Redis stream with task_id: %s", job_id)
@task_failure.connect
def on_task_error(sender: Task | None = None, exception: Exception | None = None, **kwargs) -> None:
"""Publish failed task outcomes to a Redis stream."""
try:
job_id = sender.request.id if sender else "unknown"
job_name = sender.name if sender else "unknown"
rds = acquire_redis_conn()
payload = {
"task_id": job_id,
"task_name": job_name,
"status": "FAILURE",
"timestamp": datetime.now().isoformat(),
"error": str(exception) if exception else "Unknown error"
}
rds.xadd(
STREAM_KEY,
payload,
maxlen=10000,
approximate=True
)
except Exception:
log.error("Error publishing task failure to Redis stream with task_id: %s", job_id)
The handler publishes a compact JSON-like payload to a stream, capturing task_id, name, status, timestamp, and either result or error. The stream retains a bounded history through maxlen with approximate trimming, which is a pragmatic default for operational feeds.
Wiring it into the Celery worker
The bridge is passive until the worker imports it. Loading it during the app bootstrap ensures the signal handlers are registered.
"""
Celery app bootstrap and discovery.
"""
import config as workerconf
from celery import Celery
worker_app = Celery("YOUR_APP_NAME")
worker_app.config_from_object(workerconf)
worker_app.autodiscover_tasks(["DIR_TO_YOUR_TASKS"])
# Register signal handlers by importing the bridge module
import job_stream_bridge
# Expose the app instance
process_bus = worker_app
What problem does this solve, exactly
Without an event channel, consumers must poll a Redis string or ask an API for status. Moving the result notification into a Redis Stream turns completion into an event that clients can consume continuously. The frontend can subscribe to the stream and trigger UI updates automatically the moment entries appear, while still reading the authoritative result from the Redis string if needed.
Notes on adjacent Redis patterns
In a Redis-first design, there are other mechanisms that could be considered for notifying completion. Pub/sub provides fire-and-forget topics, and blocking operations like BRPOP can be used in certain workflows. The approach above focuses on Redis Streams, where consumers can listen and respond to appended entries and the feed keeps a short rolling history.
Why this matters
Evented completion eliminates periodic polling traffic and shortens perceived latency for downstream systems. With Redis Streams wired to Celery signals, teams that already publish tasks to Redis lists and write results as strings can add push-based notifications without changing the task execution path. This also cleanly separates concerns: workers emit outcomes, and clients decide how to handle them.
Takeaways
If producers are writing directly into Redis and you want to notify on completion without polling, bridge Celery task_success and task_failure to a Redis Stream. Keep the existing result-as-string contract if it’s part of your integration, and let consumers listen to the stream to trigger their flows automatically. For teams exploring patterns, pub/sub and BRPOP are relevant alternatives in the Redis toolbox, but a stream-based feed works naturally when you want event-driven updates with a bounded backlog.