2025, Nov 18 21:02
Почему зависает Redis Pub/Sub на eventlet и как перейти на gevent в Flask-SocketIO
Разбираем, почему Redis Pub/Sub в фоновом таске Flask-SocketIO на eventlet замирает, и как перейти на gevent с message_queue Redis для стабильных WebSocket.
Когда приложению на Flask-SocketIO нужно одновременно обслуживать WebSocket‑клиентов и принимать сообщения из Redis Pub/Sub, возникает соблазн запустить фоновую задачу с циклом по pubsub.listen(). На первых порах это кажется рабочим решением, но под нагрузкой или со временем слушатель может незаметно зависнуть. Пример ниже показывает такую конфигурацию: она работала под Gunicorn с одним воркером eventlet и в итоге переставала получать сообщения без каких‑либо ошибок. В итоге помогло отказаться от eventlet и поручить межпроцессные сигналы встроенной интеграции Flask‑SocketIO с очередью сообщений Redis.
Постановка задачи
Сервер использовал Flask‑SocketIO поверх WebSocket и должен был реагировать на события Redis Pub/Sub, приходящие из другого пода. В первоначальном варианте к Socket.IO прикреплялась фоновая задача: она открывала Redis pubsub и прокидывала сообщения в веб‑сокет‑сервисы.
# extensions.py
from flask_socketio import SocketIO
ws_node = SocketIO(
logger=False,
engineio_logger=False,
path='/socket.io',
cors_allowed_origins='*',
async_mode='eventlet',
ping_timeout=10,
ping_interval=60,
)
# redis_bridge.py
from pydantic import TypeAdapter
class RedisLink:
def __init__(self):
...
def consume_stream(self):
while True:
try:
ps = self.redis.pubsub()
ps.subscribe(self.websocket_channel)
try:
for raw in ps.listen():
try:
evt = TypeAdapter(RedisEnvelopeMessage).validate_python(raw)
self.logger.debug(f"Received from REDIS: {raw}")
if evt.type == 'message':
payload = evt.data
self.logger.debug(f"Received message from REDIS: {payload}")
with self.app.app_context():
svc = self.app.extensions.get(payload.module, "RedisWS").ws_services.get(payload.company_id)
if payload.message is not None:
if isinstance(payload.message, list):
getattr(svc, payload.method)(*payload.message)
elif isinstance(payload.message, dict):
getattr(svc, payload.method)(payload.message)
elif isinstance(payload.message, str):
getattr(svc, payload.method)(payload.message)
else:
getattr(svc, payload.method)()
elif evt.type == 'subscribe':
self.logger.info(f"Subscribed to REDIS channel: {evt.channel}")
else:
self.logger.info(f"Received message from REDIS but NOT PROCESSED: {raw}")
except Exception as exc:
self.logger.catch_exception(exc)
self.logger.error(f"Pubsub parsing error: {exc}").save("pubsub_listener")
except Exception as exc:
self.logger.catch_exception(exc, level="critical")
self.logger.error(f"Pubsub listener error: {exc}").save("pubsub_listener")
# ws_node.sleep(2)
except Exception as exc:
self.logger.catch_exception(exc, level="critical")
self.logger.error(f"Pubsub loop error: {exc}").save("pubsub_listener")
# app_factory.py
def build_app(config_class=Config):
...
ws_node.init_app(app)
with app.app_context():
app.extensions["ps"] = ws_node.start_background_task(redis_store_static.consume_stream)
На практике эта схема сначала работала, а затем слушатель pubsub переставал что‑либо получать, при этом исключения не логировались. Перезапуск сервера временно всё исправлял.
Что происходит на самом деле
Корень проблемы — долгоживущий цикл Redis Pub/Sub внутри фоновой задачи Socket.IO, работающей на асинхронном движке eventlet. Симптом — тихая остановка через какое‑то время. В обсуждении такого подхода важно отметить рекомендацию в 2025 году не использовать eventlet и перепроверить приложение с threading или gevent.
eventlet — не лучший выбор в 2025 году: пакет в последние годы толком не поддерживается. Я бы предложил перепроверить приложение, используя либо обычные потоки (async_mode='threading'), либо gevent (async_mode='gevent').
Вместо самописного фонового цикла решение опирается на родную очередь сообщений Redis в Flask‑SocketIO, чтобы межпроцессные события обрабатывались самим сервером, а не отдельным пользовательским потоком.
Решение: перейти на gevent и использовать очередь сообщений Socket.IO
Рабочая конфигурация отказывается от eventlet, переключает асинхронный движок на gevent и инициализирует Socket.IO с Redis в роли очереди сообщений. Отдельный поток‑слушатель pubsub из приложения исчезает; события публикуются либо напрямую в WebSocket‑сервисы, либо в канал Redis — в зависимости от сценария.
# extensions.py
from flask_socketio import SocketIO
aio_bus = SocketIO(
path='/socket.io',
async_mode='gevent',
cors_allowed_origins='*',
ping_timeout=15,
ping_interval=60,
)
# app_factory.py
def build_app(config_class=Config):
...
io_bus.init_app(
app,
message_queue=redis_store_static.redis_url,
channel=app.config.get("WEBSOCKET_CHANNEL"),
)
Производство событий, например из воркера Celery, направляет сообщения в WebSocket‑сервисы, когда module == "RedisWS". В остальных случаях используется обычная публикация в Redis. Это сохраняет гибкость работы как с веб‑сокетами, так и с другими потребителями.
# producers.py
class Dispatcher:
def __init__(self):
...
def broadcast(self, ps_msg: RedisPubSubMessage):
try:
if ps_msg.module == "RedisWS":
svc = self.app.extensions.get("RedisWS").ws_services.get(ps_msg.company_id)
if ps_msg.message is not None:
if isinstance(ps_msg.message, list):
getattr(svc, ps_msg.method)(*ps_msg.message)
elif isinstance(ps_msg.message, dict):
getattr(svc, ps_msg.method)(ps_msg.message)
elif isinstance(ps_msg.message, str):
getattr(svc, ps_msg.method)(ps_msg.message)
else:
getattr(svc, ps_msg.method)()
self.logger.debug(f"Event emitted in socketio {self.socketio}: {ps_msg.model_dump()}")
return "emitted to sockets"
else:
return self.redis.publish(self.channel, ps_msg.model_dump_json())
except Exception as exc:
self.logger.error(f"Pubsub publish error: {exc}").save("pubsub_published")
# ws_services.py
class WSAgent:
def __init__(self, tenant, sio_core):
self._version = '2.2'
self.socket = sio_core
self.logger = logger
def new_message(self, message):
if message.tracking_status != "hidden":
message_payload = message.to_dict()
self.socket.emit('new_message', message_payload, room=message.user.id)
Почему это важно
Инфраструктура WebSocket обычно живёт долго и чувствительна к выбранному асинхронному движку. Если потребитель Redis Pub/Sub замирает без ошибки, это приводит к непонятным сбоям в реальном времени. Переход на gevent и координация Flask‑SocketIO с Redis через очередь сообщений уменьшают объём самописной конкурентности и согласуют стек WebSocket с рекомендованными движками для этой экосистемы. Рекомендация перепроверить работу с threading или gevent особенно актуальна с учётом статуса поддержки eventlet.
Выводы
Если фоновый цикл Pub/Sub на eventlet периодически останавливается, стоит перейти на gevent и подключить Flask‑SocketIO к Redis через параметры message_queue и channel. Держите отправку WebSocket‑событий в своём сервисном слое, а публикацию из воркеров организуйте так, чтобы поддерживались и сокеты, и универсальные каналы. Это упрощает модель исполнения и помогает избежать «тихих» зависаний, сохраняя предсказуемость реального времени.