2025, Dec 07 09:02
Как подружить asyncio с синхронными колбэками: weasyprint, aiohttp и run_coroutine_threadsafe
Разбираем паттерн интеграции asyncio и синхронных колбэков: weasyprint в потоке, сеть через aiohttp, мост run_coroutine_threadsafe без блокировки цикла.
Совместить asyncio со строго синхронной сторонней библиотекой — классическая точка трения: ваше приложение крутится на событийном цикле, вы полагаетесь на aiohttp и его middleware, а библиотека вроде weasyprint ждёт синхронного колбэка, например url fetcher. Хочется переиспользовать имеющуюся асинхронную логику, а не тянуть параллельный синхронный путь. Но стоит попытаться склеить всё вместе — и тут же вылезают ошибки про циклы, потоки и границы ответственности.
Где всё ломается
Рассмотрим подход: выносите блокирующую библиотеку в рабочий поток и пытаетесь из синхронного колбэка вызвать асинхронную корутину через loop.run_until_complete. Идея понятна, но событийный цикл уже запущен, так что повторный вход в него срывает весь план.
import asyncio
import io
import aiohttp
from weasyprint import HTML
async def render_pdf_weasy(html: io.IOBase, session: aiohttp.ClientSession):
loop = asyncio.get_running_loop()
resolver = _compose_fetcher(session, loop)
# Запускаем блокирующий рендер вне событийного цикла:
return await asyncio.to_thread(_render_pdf_sync, resolver, html)
def _render_pdf_sync(resolver, html: io.IOBase):
sink = io.BytesIO()
HTML(file_obj=html, url_fetcher=resolver).write_pdf(sink)
return sink
def _compose_fetcher(session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop):
def fetch(url, *args, **kwargs):
# Это не сработает: цикл уже запущен.
data, mime = loop.run_until_complete(_pull_bytes(session, url))
return {"string": data, "mime_type": mime}
return fetch
async def _pull_bytes(session: aiohttp.ClientSession, url: str):
async with session.get(url) as resp:
body = await resp.content.read()
return body, resp.content_type
Это отражает типичный сценарий сбоя. Даже если блокирующая библиотека запускается в рабочем потоке, колбэк выполняется там же, а loop.run_until_complete нацеливается на главный цикл, который уже работает. В другой попытке можно создать второй цикл в отдельном потоке и гонять значения через очередь. Но тут вступает в силу другое ограничение: объекты вроде aiohttp.ClientSession привязаны к тому циклу, где были созданы, и безболезненно на другой цикл не переедут. Итог — целая россыпь ошибок про «не тот событийный цикл» и «запущенный цикл, который нельзя пере-enter’ить».
Суть проблемы
Ваша асинхронная функция вызывает синхронный API с синхронным колбэком, но сама логика внутри колбэка должна быть асинхронной. Если вызвать синхронный API прямо из корутины, это заблокирует цикл; перенос в рабочий поток решает блокировку, но теперь колбэк исполняется вне цикла и не может напрямую await’ить. Попытка использовать loop.run_until_complete из того потока проваливается, потому что главный цикл уже идёт.
Нужно безопасно запланировать корутину на главном цикле из стороннего потока и синхронно дождаться результата в этом потоке. Ровно это делает asyncio.run_coroutine_threadsafe.
«Можете попробовать run_coroutine_threadsafe»
Рабочий паттерн
Держите блокирующий вызов вне цикла через asyncio.to_thread (или asyncio.run_in_executor). Захватите главный событийный цикл, передайте его в рабочий поток и, когда сработает синхронный колбэк, запланируйте настоящую асинхронную логику на этом цикле с помощью asyncio.run_coroutine_threadsafe и дождитесь результата. Событийный цикл продолжает крутиться; рабочий поток блокируется, пока корутина не завершится; для библиотеки колбэк остаётся синхронным.
import asyncio
from collections.abc import Callable
from functools import partial
async def launch_async_entrypoint() -> None:
payload: int = 1
# Передаём блокирующий синхронный вызов в поток, чтобы не блокировать цикл:
print('api call argument:', payload)
outcome = await asyncio.to_thread(blocking_bridge, asyncio.get_running_loop(), payload)
print('api call result:', outcome)
def blocking_bridge(loop: asyncio.AbstractEventLoop, payload: int) -> int:
# Вызываем синхронный API и даём синхронный колбэк, который вернётся в цикл:
return pretend_api(payload, partial(bridge_callback, loop))
def bridge_callback(loop: asyncio.AbstractEventLoop, value: int) -> None:
fut = asyncio.run_coroutine_threadsafe(async_handler(value), loop)
fut.result() # Ждём завершения асинхронного колбэка
async def async_handler(payload: int) -> None:
print('callback argument:', payload)
# ... здесь асинхронная работа
def pretend_api(x: int, cb: Callable[[int], None]) -> int:
y = x * 2
cb(y)
return y
if __name__ == '__main__':
asyncio.run(launch_async_entrypoint())
Ровно то, что нужно: событийный цикл остаётся отзывчивым, синхронный API работает в отдельном потоке, синхронный колбэк безопасно возвращает работу в цикл, а итоговый результат уходит обратно асинхронному вызывающему.
Как это применить к weasyprint и aiohttp
Если weasyprint — это блокирующая библиотека, а aiohttp отвечает за сеть, держите weasyprint в отдельном потоке с помощью asyncio.to_thread. Постройте синхронный url_fetcher, который захватывает главный событийный цикл, и внутри него вызывайте asyncio.run_coroutine_threadsafe для вашей текущей асинхронной функции загрузки, использующей уже существующую сессию aiohttp. Так вы сохраняете единую точку правды, включая middleware, и избегаете поддержки дублирующего синхронного HTTP-стека.
Почему это важно
Смешение синхронного и асинхронного кода в Python — это не столько про «быстрее», сколько про корректность управления потоком выполнения. Событийный цикл нельзя блокировать длительной синхронной работой. Объекты, привязанные к циклу, нельзя безнаказанно переносить между циклами и потоками. Безопасная передача управления между потоками и циклом критична: без неё вы получите дедлоки, повторный вход в работающий цикл или нарушение привязки асинхронных ресурсов к своему циклу. Этот паттерн решает тот самый узел, не вынуждая переписывать сетевую часть на синхронный клиент.
Выводы
Запускайте блокирующую библиотеку в рабочем потоке, держите главный событийный цикл чистым и используйте asyncio.run_coroutine_threadsafe из потока, чтобы планировать асинхронную логику колбэков обратно на цикл. Если библиотека когда‑нибудь предоставит нативный асинхронный вариант колбэка, выбирайте его; до тех пор этот подход сохраняет целостность кода и здоровье вашего event loop.