2025, Nov 15 12:02

Глобальный ограничитель скорости в asyncio для цепочек зависимых HTTP‑запросов

Как удержать 10 запросов/с в aiohttp и сохранить порядок a→b→c. Готовый шаблон: глобальный rate limiter на AsyncLimiter и альтернатива на Semaphore внутри asyncio.

Когда вы разворачиваете матрицу зависимых HTTP‑вызовов, естественно хочется распараллелить работу по категориям, сохраняя при этом строгий порядок a → b → c внутри каждой категории. Загвоздка — в ограничении частоты: сервер допускает до 10 запросов в секунду при ~0,15 с на ответ. Паузы внутри отдельных корутин не гарантируют общий лимит; блокирующий sleep убивает конкурентность. Стабильно работающий в asyncio шаблон — общий ограничитель скорости, через который проходит каждый исходящий запрос.

Постановка задачи

Предположим, в каждой категории k — до трёх шагов, причём последующие зависят от предыдущих. Если бы вызовы были независимыми, их можно было бы отправлять с небольшими интервалами:

gap_per_call = 0.1
async with aiohttp.ClientSession() as http:
    async with asyncio.TaskGroup() as group:
        for cat in [1, 2, 3]:
            group.create_task(grab_a_payload(cat, http))
            await asyncio.sleep(gap_per_call)
            group.create_task(grab_b_payload(cat, http))
            await asyncio.sleep(gap_per_call)
            group.create_task(grab_c_payload(cat, http))
            await asyncio.sleep(gap_per_call)

Однако внутри одной категории порядок a → b → c должен быть последовательным. Логичное преобразование — перенести логику в одну корутину и запускать по такой корутине на категорию:

gap_per_call = 0.1
async with aiohttp.ClientSession() as http:
    async with asyncio.TaskGroup() as group:
        for cat in [1, 2, 3]:
            group.create_task(run_abc_chain(cat, http))
            await asyncio.sleep(gap_per_call)

Теперь возникает вопрос: как удерживать общий темп на уровне 10 запросов в секунду и при этом давать каждой цепочке продвигаться сразу после готовности собственного ответа?

Почему наивные паузы не помогают

Если вставлять await asyncio.sleep перед каждым запросом внутри цепочки, это не ограничит программу целиком: другие задачи продолжат слать запросы, и суммарная частота превысит лимит. Вызов time.sleep действительно замедлит отправку, но он блокирует цикл событий. В итоге 1a, 2a, 3a уйдут в сеть до того, как вы обработаете результат 1a, из‑за чего b и c запоздают, а вся последующая обработка сильно сместится.

Решение: один общий ограничитель скорости вокруг каждого HTTP‑вызова

Надёжный подход — один глобальный ограничитель, который нужно «получить» перед каждым HTTP‑вызовом. Внутри категории последовательность a → b → c остаётся обычной цепочкой await, поэтому зависимости соблюдаются. Все цепочки выполняются параллельно, а ограничитель удерживает общий поток ниже 10 запросов в секунду.

import asyncio, aiohttp
from aiolimiter import AsyncLimiter       # установка: pip install aiolimiter
rate_gate = AsyncLimiter(10, 1)           # 10 запросов каждые 1 с
async def pull_json(path: str, client: aiohttp.ClientSession):
    # ждёт, если за последнюю секунду уже было 10 вызовов
    async with rate_gate:
        async with client.get(path) as resp:
            resp.raise_for_status()
            return await resp.json()
async def run_chain(cat: int, client: aiohttp.ClientSession):
    data_a = await pull_json(f"/data/{cat}a", client)
    if want_b(data_a):                    # ваш предикат
        data_b = await pull_json(f"/data/{cat}b", client)
        if want_c(data_a, data_b):
            data_c = await pull_json(f"/data/{cat}c", client)
    # ... обработка (data_a, data_b, data_c) ...
async def bootstrap():
    send_gap = 0.1                        # необязательная пауза между отправкой задач
    async with aiohttp.ClientSession() as client:
        async with asyncio.TaskGroup() as grp:
            for cat in (1, 2, 3):         # масштабируется до тысяч
                grp.create_task(run_chain(cat, client))
                await asyncio.sleep(send_gap)  # помогает избежать второго типа блокировок, упомянутого ниже
asyncio.run(bootstrap())

Цепочка run_chain соблюдает порядок зависимостей за счёт обычных await: b и c выполняются только тогда и только если этого требует a. Поскольку каждый вызов pull_json сперва захватывает AsyncLimiter, суммарный поток по всем категориям не превышает 10 запросов в секунду.

Если не хочется подключать стороннюю библиотеку, можно запустить фоновую задачу, которая раз в секунду «дозаправляет» asyncio.Semaphore(10); общий принцип остаётся тем же.

О темпе подачи задач

Чтобы избежать второго типа блокировок, описанного выше, полезно добавлять короткий await asyncio.sleep сразу после планирования задачи для категории. Это хорошо сочетается с очередями FIFO в asyncio и aiolimiter и делает продвижение всех цепочек более ровным.

Почему это важно

В задачах с десятками тысяч категорий возможность двигать каждую цепочку сразу по мере прихода её ответов сокращает сквозную задержку до дальнейшей обработки. Одновременно общий ограничитель не даёт случайно перегрузить сервис и удерживает вас в рамках контракта на 10 запросов в секунду. Такая комбинация позволяет получить потенциальное сокращение времени выполнения на 90% благодаря параллелизму, не нарушая гарантии порядка.

Вывод

Ограничивайте глобально, упорядочивайте локально. Оборачивайте каждый HTTP‑вызов общим ограничителем, чтобы удерживать 10 запросов в секунду для всей программы; оставляйте a → b → c в виде простых await внутри категории и при желании добавляйте небольшую паузу после отправки каждой задачи для более плавного планирования. В такой архитектуре asyncio обеспечивает параллельные цепочки, строгий порядок внутри каждой и предсказуемый темп запросов.