2025, Oct 06 21:17

Надёжная остановка asyncio-сервера из другого процесса

Как корректно остановить asyncio-сервер по сигналу multiprocessing.Event без опросов: ожидание в потоке через run_in_executor и мост к asyncio.Event надежно

Остановить сервер на asyncio из другого процесса кажется простым: есть multiprocessing.Event. Но смешение межпроцессной синхронизации с асинхронным циклом нередко превращается в ненадёжный опрос. Ключевая задача — отреагировать на сигнал завершения, не блокируя цикл и не полагаясь на проверки, чувствительные к таймингам.

Постановка задачи

Рассмотрим асинхронный TCP‑сервер, который периодически опрашивает multiprocessing.Event, чтобы понять, когда завершиться:

self._stop_signal = multiprocessing.Event()
tcp_srv = await asyncio.start_server(
    self.on_client,
    self.bind_host,
    self.bind_port,
    backlog=self.backlog_size
)

async with tcp_srv:
    while not self._stop_signal.is_set():
        await asyncio.sleep(0.01)

Ожидается, что другой процесс вызовет set() у события, цикл это увидит, и сервер корректно завершится. На деле такой опрос ведёт себя нестабильно: то срабатывает, то нет.

Что происходит

Опрос multiprocessing.Event с паузами в цикле asyncio по природе чувствителен к таймингам. Цикл просыпается по расписанию и проверяет флаг — это создаёт условия для гонок. И ещё важнее правило: не вызывать напрямую блокирующие примитивы multiprocessing в контексте asyncio. Надёжнее вынести ожидание в поток или «перемостить» его в asyncio.Event, а не крутить постоянные проверки состояния.

Есть и дополнительный нюанс. Простая проверка флага сама по себе синхронна и неблокирующая, но опора на плотный опрос с sleep всё равно хрупка для сигналов завершения. Гораздо чище — отдельное ожидание, которое не блокирует event loop.

Надёжный подход

Запустите блокирующее ожидание в отдельном потоке и ожидайте его из asyncio. Так межпроцессная синхронизация отделяется от event loop, и необходимость в самодельном опросе отпадает.

import asyncio
import multiprocessing

async def await_shutdown(stop_evt):
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, stop_evt.wait)

# In the server task:
async with tcp_srv:
    shutdown_task = asyncio.create_task(
        await_shutdown(self._stop_signal)
    )
    await shutdown_task

Этот приём поддерживает отзывчивость event loop, пока поток ждёт межпроцессное событие. Как только событие установлено, таск завершается, и можно переходить к логике остановки.

Альтернатива: мост к asyncio.Event

Другой вариант — «перемостить» multiprocessing.Event в asyncio.Event через фоновый поток. Поток блокируется на межпроцессном событии, а затем безопасно для цикла сигналит асинхронное событие.

class EventRelay:
    def __init__(self, proc_evt):
        self.proc_evt = proc_evt
        self.aio_evt = asyncio.Event()
        
    def launch_watch(self):
        def watcher():
            self.proc_evt.wait()
            asyncio.run_coroutine_threadsafe(
                self._notify_aio(), asyncio.get_running_loop()
            )
        threading.Thread(target=watcher, daemon=True).start()
        
    async def wait(self):
        await self.aio_evt.wait()

Так сохраняется разделение между блокирующей синхронизацией и асинхронным рантаймом, а остальная система работает с нативным для asyncio примитивом.

Почему это важно

Асинхронные серверы держатся на отзывчивом event loop. Блокирующие вызовы и опора на циклы опроса подрывают это. Вынесите ожидание в поток или свяжите его с примитивами asyncio — так цикл остаётся «здоровым», а остановка становится детерминированной, без произвольных пауз sleep.

Вывод

Если нужно остановить код на asyncio по сигналу multiprocessing.Event, не опрашивайте его внутри цикла. Либо запускайте wait() события в потоке через run_in_executor (или аналогичный помощник), либо свяжите его с asyncio.Event с помощью фонового потока. Оба подхода не блокируют цикл и убирают из управления остановкой зависимость от таймингов.

Статья основана на вопросе на StackOverflow от UnemployedBrat и ответе Mahrez BenHamad.