2025, Dec 12 06:01

Как из синхронной функции вызывать корутину в asyncio без блокировок

Почему Thread.join блокирует событийный цикл и ведёт к дедлокам. Правильный вариант: run_in_executor и run_coroutine_threadsafe для запуска корутин из sync-кода

Вызывать корутину asyncio из синхронной функции, не переводя саму функцию в async, — типичная задача в смешанных кодовых базах. Заманчиво уйти в потоки и связать всё через run_coroutine_threadsafe. Но если добавить к этому привычный Thread.join, программа может зависнуть. Ниже — почему так происходит и как сделать правильно.

Постановка задачи

Нужно запустить асинхронную корутину из обычной функции, не объявляя её async и без await. Первая мысль — создать поток и воспользоваться run_coroutine_threadsafe, но в итоге приложение клинит. Новый поток не спасает, если вы всё равно блокируете событийный цикл вызовом join.

Воспроизведение: блокирующий вариант

import asyncio
from threading import Thread
async def job_b():
    print("starting job_b...")
    await asyncio.sleep(1)
    print("finished job_b.")
    return "done"
def sync_task(hub=None):
    print("running sync_task...")
    if not hub:
        hub = asyncio.get_running_loop()
    assert hub
    fut = asyncio.run_coroutine_threadsafe(
        job_b(),
        hub)
    outcome = fut.result()
    print(f"{outcome=}")
    print("done sync_task...")
async def job_a():
    print("starting job_a...")
    await asyncio.sleep(1)
    loop = asyncio.get_running_loop()
    t = Thread(target=sync_task, args=(loop,))
    t.start()
    t.join()
    print("finished job_a.")
if __name__ == '__main__':
    asyncio.run(job_a())

Почему возникает дедлок

Синхронизационные примитивы потоков Python, например Thread.join, приостанавливают текущий поток. Если вызвать join из корутины, которая выполняется в потоке событийного цикла, вы блокируете этот цикл. Заблокированный цикл не сможет исполнять корутину, запланированную через run_coroutine_threadsafe, поэтому Future никогда не завершится, и программа «зависнет». Отказ от отдельного потока и блокировка прямо в цикле приводят к тому же результату по той же причине.

Правильный подход: используйте run_in_executor

Вместо того чтобы блокировать событийный цикл join'ом, вынесите синхронную функцию в поток через loop.run_in_executor. Тогда цикл останется отзывчивым, пока поток вызывает run_coroutine_threadsafe и ждёт результата. Future продвигается, потому что цикл продолжает работать.

import asyncio
async def job_b():
    print("starting job_b...")
    await asyncio.sleep(1)
    print("finished job_b.")
    return "done"
def sync_task(hub: asyncio.AbstractEventLoop):
    print("running sync_task...")
    fut = asyncio.run_coroutine_threadsafe(
        job_b(),
        hub)
    outcome = fut.result()
    print(f"{outcome=}")
    print("done sync_task...")
async def job_a():
    print("starting job_a...")
    await asyncio.sleep(1)
    loop = asyncio.get_running_loop()
    task = loop.run_in_executor(None, sync_task, loop)
    await task  # the event loop keeps running
    print("finished job_a.")
if __name__ == '__main__':
    asyncio.run(job_a())

У каждого событийного цикла есть встроенный ThreadPoolExecutor по умолчанию — он берётся, когда вы передаёте None в качестве executora. В нём ограниченное число рабочих потоков; он рассчитан на вычислительные и неблокирующие задачи, тогда как приведённый шаблон применяет его для блокирующей работы. Можно увеличить число воркеров через loop.set_default_executor или выделить отдельные экземпляры ThreadPoolExecutor для разных подсистем, чтобы одна часть не «съедала» все потоки. Потоки создаются по требованию.

Альтернативы

Можно вызвать asyncio.run и создать новый событийный цикл в текущем потоке, запустив корутину там. Делайте это только из потока, в котором ещё не работает собственный цикл, иначе получите исключение. Пересылать корутину в исходный цикл, как показано выше, — тоже корректный и эффективный вариант. Ещё один подход — держать общий демон-поток со своим циклом событий и отправлять корутины туда.

Зачем это важно

Понимание того, как блокирующие вызовы взаимодействуют с asyncio, критично для надёжности и производительности. Случайная блокировка событийного цикла отключает планирование, таймеры и обработку ввода-вывода, что ведёт к дедлокам и тайм-аутам. Использование run_in_executor сохраняет отзывчивость и позволяет безопасно стыковать наследуемый или просто синхронный код с асинхронным.

Итоги

Не блокируйте событийный цикл с помощью Thread.join или других блокирующих примитивов, когда запускаете корутины из синхронного кода. Передайте входную синхронную функцию в отдельный поток через loop.run_in_executor, а общайтесь с работающим циклом через run_coroutine_threadsafe. Если нужен более тонкий контроль конкуренции, настройте executor по умолчанию или используйте отдельные экземпляры ThreadPoolExecutor. Когда уместно, создавайте новый цикл в отдельном потоке с asyncio.run, убедившись, что в этом потоке ещё не запущен цикл. Держите цикл незаблокированным — и всё заработает как надо.