2025, Dec 07 09:02

Как подружить asyncio с синхронными колбэками: weasyprint, aiohttp и run_coroutine_threadsafe

Разбираем паттерн интеграции asyncio и синхронных колбэков: weasyprint в потоке, сеть через aiohttp, мост run_coroutine_threadsafe без блокировки цикла.

Совместить asyncio со строго синхронной сторонней библиотекой — классическая точка трения: ваше приложение крутится на событийном цикле, вы полагаетесь на aiohttp и его middleware, а библиотека вроде weasyprint ждёт синхронного колбэка, например url fetcher. Хочется переиспользовать имеющуюся асинхронную логику, а не тянуть параллельный синхронный путь. Но стоит попытаться склеить всё вместе — и тут же вылезают ошибки про циклы, потоки и границы ответственности.

Где всё ломается

Рассмотрим подход: выносите блокирующую библиотеку в рабочий поток и пытаетесь из синхронного колбэка вызвать асинхронную корутину через loop.run_until_complete. Идея понятна, но событийный цикл уже запущен, так что повторный вход в него срывает весь план.

import asyncio
import io
import aiohttp
from weasyprint import HTML
async def render_pdf_weasy(html: io.IOBase, session: aiohttp.ClientSession):
    loop = asyncio.get_running_loop()
    resolver = _compose_fetcher(session, loop)
    # Запускаем блокирующий рендер вне событийного цикла:
    return await asyncio.to_thread(_render_pdf_sync, resolver, html)
def _render_pdf_sync(resolver, html: io.IOBase):
    sink = io.BytesIO()
    HTML(file_obj=html, url_fetcher=resolver).write_pdf(sink)
    return sink
def _compose_fetcher(session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop):
    def fetch(url, *args, **kwargs):
        # Это не сработает: цикл уже запущен.
        data, mime = loop.run_until_complete(_pull_bytes(session, url))
        return {"string": data, "mime_type": mime}
    return fetch
async def _pull_bytes(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as resp:
        body = await resp.content.read()
        return body, resp.content_type

Это отражает типичный сценарий сбоя. Даже если блокирующая библиотека запускается в рабочем потоке, колбэк выполняется там же, а loop.run_until_complete нацеливается на главный цикл, который уже работает. В другой попытке можно создать второй цикл в отдельном потоке и гонять значения через очередь. Но тут вступает в силу другое ограничение: объекты вроде aiohttp.ClientSession привязаны к тому циклу, где были созданы, и безболезненно на другой цикл не переедут. Итог — целая россыпь ошибок про «не тот событийный цикл» и «запущенный цикл, который нельзя пере-enter’ить».

Суть проблемы

Ваша асинхронная функция вызывает синхронный API с синхронным колбэком, но сама логика внутри колбэка должна быть асинхронной. Если вызвать синхронный API прямо из корутины, это заблокирует цикл; перенос в рабочий поток решает блокировку, но теперь колбэк исполняется вне цикла и не может напрямую await’ить. Попытка использовать loop.run_until_complete из того потока проваливается, потому что главный цикл уже идёт.

Нужно безопасно запланировать корутину на главном цикле из стороннего потока и синхронно дождаться результата в этом потоке. Ровно это делает asyncio.run_coroutine_threadsafe.

«Можете попробовать run_coroutine_threadsafe»

Рабочий паттерн

Держите блокирующий вызов вне цикла через asyncio.to_thread (или asyncio.run_in_executor). Захватите главный событийный цикл, передайте его в рабочий поток и, когда сработает синхронный колбэк, запланируйте настоящую асинхронную логику на этом цикле с помощью asyncio.run_coroutine_threadsafe и дождитесь результата. Событийный цикл продолжает крутиться; рабочий поток блокируется, пока корутина не завершится; для библиотеки колбэк остаётся синхронным.

import asyncio
from collections.abc import Callable
from functools import partial
async def launch_async_entrypoint() -> None:
    payload: int = 1
    # Передаём блокирующий синхронный вызов в поток, чтобы не блокировать цикл:
    print('api call argument:', payload)
    outcome = await asyncio.to_thread(blocking_bridge, asyncio.get_running_loop(), payload)
    print('api call result:', outcome)
def blocking_bridge(loop: asyncio.AbstractEventLoop, payload: int) -> int:
    # Вызываем синхронный API и даём синхронный колбэк, который вернётся в цикл:
    return pretend_api(payload, partial(bridge_callback, loop))
def bridge_callback(loop: asyncio.AbstractEventLoop, value: int) -> None:
    fut = asyncio.run_coroutine_threadsafe(async_handler(value), loop)
    fut.result()  # Ждём завершения асинхронного колбэка
async def async_handler(payload: int) -> None:
    print('callback argument:', payload)
    # ... здесь асинхронная работа
def pretend_api(x: int, cb: Callable[[int], None]) -> int:
    y = x * 2
    cb(y)
    return y
if __name__ == '__main__':
    asyncio.run(launch_async_entrypoint())

Ровно то, что нужно: событийный цикл остаётся отзывчивым, синхронный API работает в отдельном потоке, синхронный колбэк безопасно возвращает работу в цикл, а итоговый результат уходит обратно асинхронному вызывающему.

Как это применить к weasyprint и aiohttp

Если weasyprint — это блокирующая библиотека, а aiohttp отвечает за сеть, держите weasyprint в отдельном потоке с помощью asyncio.to_thread. Постройте синхронный url_fetcher, который захватывает главный событийный цикл, и внутри него вызывайте asyncio.run_coroutine_threadsafe для вашей текущей асинхронной функции загрузки, использующей уже существующую сессию aiohttp. Так вы сохраняете единую точку правды, включая middleware, и избегаете поддержки дублирующего синхронного HTTP-стека.

Почему это важно

Смешение синхронного и асинхронного кода в Python — это не столько про «быстрее», сколько про корректность управления потоком выполнения. Событийный цикл нельзя блокировать длительной синхронной работой. Объекты, привязанные к циклу, нельзя безнаказанно переносить между циклами и потоками. Безопасная передача управления между потоками и циклом критична: без неё вы получите дедлоки, повторный вход в работающий цикл или нарушение привязки асинхронных ресурсов к своему циклу. Этот паттерн решает тот самый узел, не вынуждая переписывать сетевую часть на синхронный клиент.

Выводы

Запускайте блокирующую библиотеку в рабочем потоке, держите главный событийный цикл чистым и используйте asyncio.run_coroutine_threadsafe из потока, чтобы планировать асинхронную логику колбэков обратно на цикл. Если библиотека когда‑нибудь предоставит нативный асинхронный вариант колбэка, выбирайте его; до тех пор этот подход сохраняет целостность кода и здоровье вашего event loop.