2025, Nov 13 07:00
Stop Flask-SocketIO Redis Pub/Sub from stalling: switch from eventlet to gevent and use the built-in message queue
Fix silent stalls in Flask-SocketIO consuming Redis Pub/Sub: drop eventlet, switch to gevent, and use the Socket.IO Redis message queue for reliable WebSockets.
When a Flask-SocketIO app must both serve WebSocket clients and consume Redis Pub/Sub, it’s tempting to spin up a background task that loops over pubsub.listen(). That approach can appear fine in early tests, but under load or over time the listener may silently stall. The example below illustrates such a setup that ran on Gunicorn with a single eventlet worker and eventually stopped consuming messages without errors. The resolution was to move away from eventlet and delegate cross-process signaling to Flask-SocketIO’s Redis message queue integration.
Problem setup
The server ran Flask-SocketIO over WebSockets and also needed to react to Redis Pub/Sub events coming from another pod. The initial design attached a background task to Socket.IO that opened a Redis pubsub and dispatched messages into websocket services.
# extensions.py
from flask_socketio import SocketIO
ws_node = SocketIO(
logger=False,
engineio_logger=False,
path='/socket.io',
cors_allowed_origins='*',
async_mode='eventlet',
ping_timeout=10,
ping_interval=60,
)
# redis_bridge.py
from pydantic import TypeAdapter
class RedisLink:
def __init__(self):
...
def consume_stream(self):
while True:
try:
ps = self.redis.pubsub()
ps.subscribe(self.websocket_channel)
try:
for raw in ps.listen():
try:
evt = TypeAdapter(RedisEnvelopeMessage).validate_python(raw)
self.logger.debug(f"Received from REDIS: {raw}")
if evt.type == 'message':
payload = evt.data
self.logger.debug(f"Received message from REDIS: {payload}")
with self.app.app_context():
svc = self.app.extensions.get(payload.module, "RedisWS").ws_services.get(payload.company_id)
if payload.message is not None:
if isinstance(payload.message, list):
getattr(svc, payload.method)(*payload.message)
elif isinstance(payload.message, dict):
getattr(svc, payload.method)(payload.message)
elif isinstance(payload.message, str):
getattr(svc, payload.method)(payload.message)
else:
getattr(svc, payload.method)()
elif evt.type == 'subscribe':
self.logger.info(f"Subscribed to REDIS channel: {evt.channel}")
else:
self.logger.info(f"Received message from REDIS but NOT PROCESSED: {raw}")
except Exception as exc:
self.logger.catch_exception(exc)
self.logger.error(f"Pubsub parsing error: {exc}").save("pubsub_listener")
except Exception as exc:
self.logger.catch_exception(exc, level="critical")
self.logger.error(f"Pubsub listener error: {exc}").save("pubsub_listener")
# ws_node.sleep(2)
except Exception as exc:
self.logger.catch_exception(exc, level="critical")
self.logger.error(f"Pubsub loop error: {exc}").save("pubsub_listener")
# app_factory.py
def build_app(config_class=Config):
...
ws_node.init_app(app)
with app.app_context():
app.extensions["ps"] = ws_node.start_background_task(redis_store_static.consume_stream)
In practice, this configuration worked at first and then the pubsub listener stopped receiving, with no exceptions logged. Restarting the server brought it back.
What’s really happening
The core of the issue is a long-running Redis Pub/Sub loop running inside a Socket.IO background task under the eventlet async engine. The symptom was a silent stall after some time. An important note from the discussion around this pattern is the recommendation to avoid eventlet in 2025 and retest with threading or gevent.
eventlet is not a great package to use in 2025 because it hasn’t been properly maintained in the last few years. I would suggest you retest your application using either regular threads (async_mode='threading') or gevent (async_mode='gevent').
Instead of keeping a bespoke background loop, the fix leaned on Flask-SocketIO’s native Redis message queue so that cross-process events are handled by the server itself, not by a custom thread.
Solution: switch to gevent and use the Socket.IO message queue
The working configuration drops eventlet, switches the async engine to gevent, and initializes Socket.IO with Redis as its message queue. The dedicated pubsub listener thread disappears from the app; events are published either directly to WebSocket services or to a Redis channel, depending on the use case.
# extensions.py
from flask_socketio import SocketIO
aio_bus = SocketIO(
path='/socket.io',
async_mode='gevent',
cors_allowed_origins='*',
ping_timeout=15,
ping_interval=60,
)
# app_factory.py
def build_app(config_class=Config):
...
io_bus.init_app(
app,
message_queue=redis_store_static.redis_url,
channel=app.config.get("WEBSOCKET_CHANNEL"),
)
Event production, for example from a Celery worker, routes messages into WebSocket services when module == "RedisWS". Otherwise, a generic Redis publish is used. This preserves the flexibility to work with WebSockets or other consumers.
# producers.py
class Dispatcher:
def __init__(self):
...
def broadcast(self, ps_msg: RedisPubSubMessage):
try:
if ps_msg.module == "RedisWS":
svc = self.app.extensions.get("RedisWS").ws_services.get(ps_msg.company_id)
if ps_msg.message is not None:
if isinstance(ps_msg.message, list):
getattr(svc, ps_msg.method)(*ps_msg.message)
elif isinstance(ps_msg.message, dict):
getattr(svc, ps_msg.method)(ps_msg.message)
elif isinstance(ps_msg.message, str):
getattr(svc, ps_msg.method)(ps_msg.message)
else:
getattr(svc, ps_msg.method)()
self.logger.debug(f"Event emitted in socketio {self.socketio}: {ps_msg.model_dump()}")
return "emitted to sockets"
else:
return self.redis.publish(self.channel, ps_msg.model_dump_json())
except Exception as exc:
self.logger.error(f"Pubsub publish error: {exc}").save("pubsub_published")
# ws_services.py
class WSAgent:
def __init__(self, tenant, sio_core):
self._version = '2.2'
self.socket = sio_core
self.logger = logger
def new_message(self, message):
if message.tracking_status != "hidden":
message_payload = message.to_dict()
self.socket.emit('new_message', message_payload, room=message.user.id)
Why this matters
WebSocket infrastructure is often long-lived and sensitive to the async engine underneath. A Redis Pub/Sub consumer that stalls without an error can create puzzling outages in real-time features. Moving to gevent and letting Flask-SocketIO coordinate with Redis via its message queue reduces custom concurrency code and aligns the WebSocket stack with recommended async engines for this ecosystem. The guidance to retest with threading or gevent is particularly relevant given the maintenance status of eventlet.
Takeaways
If a background Pub/Sub loop under eventlet intermittently stops, consider switching to gevent and wiring Flask-SocketIO to Redis through message_queue and channel. Keep WebSocket dispatching in your service layer, and publish events from workers in a way that supports both sockets and generic channels. This simplifies the runtime model and helps avoid silent stalls, while keeping your real-time pipeline predictable.