2025, Oct 06 21:17
Надёжная остановка asyncio-сервера из другого процесса
Как корректно остановить asyncio-сервер по сигналу multiprocessing.Event без опросов: ожидание в потоке через run_in_executor и мост к asyncio.Event надежно
Остановить сервер на asyncio из другого процесса кажется простым: есть multiprocessing.Event. Но смешение межпроцессной синхронизации с асинхронным циклом нередко превращается в ненадёжный опрос. Ключевая задача — отреагировать на сигнал завершения, не блокируя цикл и не полагаясь на проверки, чувствительные к таймингам.
Постановка задачи
Рассмотрим асинхронный TCP‑сервер, который периодически опрашивает multiprocessing.Event, чтобы понять, когда завершиться:
self._stop_signal = multiprocessing.Event()
tcp_srv = await asyncio.start_server(
    self.on_client,
    self.bind_host,
    self.bind_port,
    backlog=self.backlog_size
)
async with tcp_srv:
    while not self._stop_signal.is_set():
        await asyncio.sleep(0.01)
Ожидается, что другой процесс вызовет set() у события, цикл это увидит, и сервер корректно завершится. На деле такой опрос ведёт себя нестабильно: то срабатывает, то нет.
Что происходит
Опрос multiprocessing.Event с паузами в цикле asyncio по природе чувствителен к таймингам. Цикл просыпается по расписанию и проверяет флаг — это создаёт условия для гонок. И ещё важнее правило: не вызывать напрямую блокирующие примитивы multiprocessing в контексте asyncio. Надёжнее вынести ожидание в поток или «перемостить» его в asyncio.Event, а не крутить постоянные проверки состояния.
Есть и дополнительный нюанс. Простая проверка флага сама по себе синхронна и неблокирующая, но опора на плотный опрос с sleep всё равно хрупка для сигналов завершения. Гораздо чище — отдельное ожидание, которое не блокирует event loop.
Надёжный подход
Запустите блокирующее ожидание в отдельном потоке и ожидайте его из asyncio. Так межпроцессная синхронизация отделяется от event loop, и необходимость в самодельном опросе отпадает.
import asyncio
import multiprocessing
async def await_shutdown(stop_evt):
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, stop_evt.wait)
# In the server task:
async with tcp_srv:
    shutdown_task = asyncio.create_task(
        await_shutdown(self._stop_signal)
    )
    await shutdown_task
Этот приём поддерживает отзывчивость event loop, пока поток ждёт межпроцессное событие. Как только событие установлено, таск завершается, и можно переходить к логике остановки.
Альтернатива: мост к asyncio.Event
Другой вариант — «перемостить» multiprocessing.Event в asyncio.Event через фоновый поток. Поток блокируется на межпроцессном событии, а затем безопасно для цикла сигналит асинхронное событие.
class EventRelay:
    def __init__(self, proc_evt):
        self.proc_evt = proc_evt
        self.aio_evt = asyncio.Event()
        
    def launch_watch(self):
        def watcher():
            self.proc_evt.wait()
            asyncio.run_coroutine_threadsafe(
                self._notify_aio(), asyncio.get_running_loop()
            )
        threading.Thread(target=watcher, daemon=True).start()
        
    async def wait(self):
        await self.aio_evt.wait()
Так сохраняется разделение между блокирующей синхронизацией и асинхронным рантаймом, а остальная система работает с нативным для asyncio примитивом.
Почему это важно
Асинхронные серверы держатся на отзывчивом event loop. Блокирующие вызовы и опора на циклы опроса подрывают это. Вынесите ожидание в поток или свяжите его с примитивами asyncio — так цикл остаётся «здоровым», а остановка становится детерминированной, без произвольных пауз sleep.
Вывод
Если нужно остановить код на asyncio по сигналу multiprocessing.Event, не опрашивайте его внутри цикла. Либо запускайте wait() события в потоке через run_in_executor (или аналогичный помощник), либо свяжите его с asyncio.Event с помощью фонового потока. Оба подхода не блокируют цикл и убирают из управления остановкой зависимость от таймингов.
Статья основана на вопросе на StackOverflow от UnemployedBrat и ответе Mahrez BenHamad.