2025, Nov 18 21:02

Почему зависает Redis Pub/Sub на eventlet и как перейти на gevent в Flask-SocketIO

Разбираем, почему Redis Pub/Sub в фоновом таске Flask-SocketIO на eventlet замирает, и как перейти на gevent с message_queue Redis для стабильных WebSocket.

Когда приложению на Flask-SocketIO нужно одновременно обслуживать WebSocket‑клиентов и принимать сообщения из Redis Pub/Sub, возникает соблазн запустить фоновую задачу с циклом по pubsub.listen(). На первых порах это кажется рабочим решением, но под нагрузкой или со временем слушатель может незаметно зависнуть. Пример ниже показывает такую конфигурацию: она работала под Gunicorn с одним воркером eventlet и в итоге переставала получать сообщения без каких‑либо ошибок. В итоге помогло отказаться от eventlet и поручить межпроцессные сигналы встроенной интеграции Flask‑SocketIO с очередью сообщений Redis.

Постановка задачи

Сервер использовал Flask‑SocketIO поверх WebSocket и должен был реагировать на события Redis Pub/Sub, приходящие из другого пода. В первоначальном варианте к Socket.IO прикреплялась фоновая задача: она открывала Redis pubsub и прокидывала сообщения в веб‑сокет‑сервисы.

# extensions.py
from flask_socketio import SocketIO

ws_node = SocketIO(
    logger=False,
    engineio_logger=False,
    path='/socket.io',
    cors_allowed_origins='*',
    async_mode='eventlet',
    ping_timeout=10,
    ping_interval=60,
)
# redis_bridge.py
from pydantic import TypeAdapter

class RedisLink:
    def __init__(self):
        ...

    def consume_stream(self):
        while True:
            try:
                ps = self.redis.pubsub()
                ps.subscribe(self.websocket_channel)
                try:
                    for raw in ps.listen():
                        try:
                            evt = TypeAdapter(RedisEnvelopeMessage).validate_python(raw)
                            self.logger.debug(f"Received from REDIS: {raw}")
                            if evt.type == 'message':
                                payload = evt.data
                                self.logger.debug(f"Received message from REDIS: {payload}")
                                with self.app.app_context():
                                    svc = self.app.extensions.get(payload.module, "RedisWS").ws_services.get(payload.company_id)
                                    if payload.message is not None:
                                        if isinstance(payload.message, list):
                                            getattr(svc, payload.method)(*payload.message)
                                        elif isinstance(payload.message, dict):
                                            getattr(svc, payload.method)(payload.message)
                                        elif isinstance(payload.message, str):
                                            getattr(svc, payload.method)(payload.message)
                                    else:
                                        getattr(svc, payload.method)()
                            elif evt.type == 'subscribe':
                                self.logger.info(f"Subscribed to REDIS channel: {evt.channel}")
                            else:
                                self.logger.info(f"Received message from REDIS but NOT PROCESSED: {raw}")
                        except Exception as exc:
                            self.logger.catch_exception(exc)
                            self.logger.error(f"Pubsub parsing error: {exc}").save("pubsub_listener")
                except Exception as exc:
                    self.logger.catch_exception(exc, level="critical")
                    self.logger.error(f"Pubsub listener error: {exc}").save("pubsub_listener")
                    # ws_node.sleep(2)
            except Exception as exc:
                self.logger.catch_exception(exc, level="critical")
                self.logger.error(f"Pubsub loop error: {exc}").save("pubsub_listener")
# app_factory.py

def build_app(config_class=Config):
    ...
    ws_node.init_app(app)
    with app.app_context():
        app.extensions["ps"] = ws_node.start_background_task(redis_store_static.consume_stream)

На практике эта схема сначала работала, а затем слушатель pubsub переставал что‑либо получать, при этом исключения не логировались. Перезапуск сервера временно всё исправлял.

Что происходит на самом деле

Корень проблемы — долгоживущий цикл Redis Pub/Sub внутри фоновой задачи Socket.IO, работающей на асинхронном движке eventlet. Симптом — тихая остановка через какое‑то время. В обсуждении такого подхода важно отметить рекомендацию в 2025 году не использовать eventlet и перепроверить приложение с threading или gevent.

eventlet — не лучший выбор в 2025 году: пакет в последние годы толком не поддерживается. Я бы предложил перепроверить приложение, используя либо обычные потоки (async_mode='threading'), либо gevent (async_mode='gevent').

Вместо самописного фонового цикла решение опирается на родную очередь сообщений Redis в Flask‑SocketIO, чтобы межпроцессные события обрабатывались самим сервером, а не отдельным пользовательским потоком.

Решение: перейти на gevent и использовать очередь сообщений Socket.IO

Рабочая конфигурация отказывается от eventlet, переключает асинхронный движок на gevent и инициализирует Socket.IO с Redis в роли очереди сообщений. Отдельный поток‑слушатель pubsub из приложения исчезает; события публикуются либо напрямую в WebSocket‑сервисы, либо в канал Redis — в зависимости от сценария.

# extensions.py
from flask_socketio import SocketIO

aio_bus = SocketIO(
    path='/socket.io',
    async_mode='gevent',
    cors_allowed_origins='*',
    ping_timeout=15,
    ping_interval=60,
)
# app_factory.py

def build_app(config_class=Config):
    ...
    io_bus.init_app(
        app,
        message_queue=redis_store_static.redis_url,
        channel=app.config.get("WEBSOCKET_CHANNEL"),
    )

Производство событий, например из воркера Celery, направляет сообщения в WebSocket‑сервисы, когда module == "RedisWS". В остальных случаях используется обычная публикация в Redis. Это сохраняет гибкость работы как с веб‑сокетами, так и с другими потребителями.

# producers.py

class Dispatcher:
    def __init__(self):
        ...

    def broadcast(self, ps_msg: RedisPubSubMessage):
        try:
            if ps_msg.module == "RedisWS":
                svc = self.app.extensions.get("RedisWS").ws_services.get(ps_msg.company_id)
                if ps_msg.message is not None:
                    if isinstance(ps_msg.message, list):
                        getattr(svc, ps_msg.method)(*ps_msg.message)
                    elif isinstance(ps_msg.message, dict):
                        getattr(svc, ps_msg.method)(ps_msg.message)
                    elif isinstance(ps_msg.message, str):
                        getattr(svc, ps_msg.method)(ps_msg.message)
                else:
                    getattr(svc, ps_msg.method)()
                self.logger.debug(f"Event emitted in socketio {self.socketio}: {ps_msg.model_dump()}")
                return "emitted to sockets"
            else:
                return self.redis.publish(self.channel, ps_msg.model_dump_json())
        except Exception as exc:
            self.logger.error(f"Pubsub publish error: {exc}").save("pubsub_published")
# ws_services.py

class WSAgent:
    def __init__(self, tenant, sio_core):
        self._version = '2.2'
        self.socket = sio_core
        self.logger = logger

    def new_message(self, message):
        if message.tracking_status != "hidden":
            message_payload = message.to_dict()
            self.socket.emit('new_message', message_payload, room=message.user.id)

Почему это важно

Инфраструктура WebSocket обычно живёт долго и чувствительна к выбранному асинхронному движку. Если потребитель Redis Pub/Sub замирает без ошибки, это приводит к непонятным сбоям в реальном времени. Переход на gevent и координация Flask‑SocketIO с Redis через очередь сообщений уменьшают объём самописной конкурентности и согласуют стек WebSocket с рекомендованными движками для этой экосистемы. Рекомендация перепроверить работу с threading или gevent особенно актуальна с учётом статуса поддержки eventlet.

Выводы

Если фоновый цикл Pub/Sub на eventlet периодически останавливается, стоит перейти на gevent и подключить Flask‑SocketIO к Redis через параметры message_queue и channel. Держите отправку WebSocket‑событий в своём сервисном слое, а публикацию из воркеров организуйте так, чтобы поддерживались и сокеты, и универсальные каналы. Это упрощает модель исполнения и помогает избежать «тихих» зависаний, сохраняя предсказуемость реального времени.