2025, Nov 15 12:02
Глобальный ограничитель скорости в asyncio для цепочек зависимых HTTP‑запросов
Как удержать 10 запросов/с в aiohttp и сохранить порядок a→b→c. Готовый шаблон: глобальный rate limiter на AsyncLimiter и альтернатива на Semaphore внутри asyncio.
Когда вы разворачиваете матрицу зависимых HTTP‑вызовов, естественно хочется распараллелить работу по категориям, сохраняя при этом строгий порядок a → b → c внутри каждой категории. Загвоздка — в ограничении частоты: сервер допускает до 10 запросов в секунду при ~0,15 с на ответ. Паузы внутри отдельных корутин не гарантируют общий лимит; блокирующий sleep убивает конкурентность. Стабильно работающий в asyncio шаблон — общий ограничитель скорости, через который проходит каждый исходящий запрос.
Постановка задачи
Предположим, в каждой категории k — до трёх шагов, причём последующие зависят от предыдущих. Если бы вызовы были независимыми, их можно было бы отправлять с небольшими интервалами:
gap_per_call = 0.1
async with aiohttp.ClientSession() as http:
async with asyncio.TaskGroup() as group:
for cat in [1, 2, 3]:
group.create_task(grab_a_payload(cat, http))
await asyncio.sleep(gap_per_call)
group.create_task(grab_b_payload(cat, http))
await asyncio.sleep(gap_per_call)
group.create_task(grab_c_payload(cat, http))
await asyncio.sleep(gap_per_call)
Однако внутри одной категории порядок a → b → c должен быть последовательным. Логичное преобразование — перенести логику в одну корутину и запускать по такой корутине на категорию:
gap_per_call = 0.1
async with aiohttp.ClientSession() as http:
async with asyncio.TaskGroup() as group:
for cat in [1, 2, 3]:
group.create_task(run_abc_chain(cat, http))
await asyncio.sleep(gap_per_call)
Теперь возникает вопрос: как удерживать общий темп на уровне 10 запросов в секунду и при этом давать каждой цепочке продвигаться сразу после готовности собственного ответа?
Почему наивные паузы не помогают
Если вставлять await asyncio.sleep перед каждым запросом внутри цепочки, это не ограничит программу целиком: другие задачи продолжат слать запросы, и суммарная частота превысит лимит. Вызов time.sleep действительно замедлит отправку, но он блокирует цикл событий. В итоге 1a, 2a, 3a уйдут в сеть до того, как вы обработаете результат 1a, из‑за чего b и c запоздают, а вся последующая обработка сильно сместится.
Решение: один общий ограничитель скорости вокруг каждого HTTP‑вызова
Надёжный подход — один глобальный ограничитель, который нужно «получить» перед каждым HTTP‑вызовом. Внутри категории последовательность a → b → c остаётся обычной цепочкой await, поэтому зависимости соблюдаются. Все цепочки выполняются параллельно, а ограничитель удерживает общий поток ниже 10 запросов в секунду.
import asyncio, aiohttp
from aiolimiter import AsyncLimiter # установка: pip install aiolimiter
rate_gate = AsyncLimiter(10, 1) # 10 запросов каждые 1 с
async def pull_json(path: str, client: aiohttp.ClientSession):
# ждёт, если за последнюю секунду уже было 10 вызовов
async with rate_gate:
async with client.get(path) as resp:
resp.raise_for_status()
return await resp.json()
async def run_chain(cat: int, client: aiohttp.ClientSession):
data_a = await pull_json(f"/data/{cat}a", client)
if want_b(data_a): # ваш предикат
data_b = await pull_json(f"/data/{cat}b", client)
if want_c(data_a, data_b):
data_c = await pull_json(f"/data/{cat}c", client)
# ... обработка (data_a, data_b, data_c) ...
async def bootstrap():
send_gap = 0.1 # необязательная пауза между отправкой задач
async with aiohttp.ClientSession() as client:
async with asyncio.TaskGroup() as grp:
for cat in (1, 2, 3): # масштабируется до тысяч
grp.create_task(run_chain(cat, client))
await asyncio.sleep(send_gap) # помогает избежать второго типа блокировок, упомянутого ниже
asyncio.run(bootstrap())
Цепочка run_chain соблюдает порядок зависимостей за счёт обычных await: b и c выполняются только тогда и только если этого требует a. Поскольку каждый вызов pull_json сперва захватывает AsyncLimiter, суммарный поток по всем категориям не превышает 10 запросов в секунду.
Если не хочется подключать стороннюю библиотеку, можно запустить фоновую задачу, которая раз в секунду «дозаправляет» asyncio.Semaphore(10); общий принцип остаётся тем же.
О темпе подачи задач
Чтобы избежать второго типа блокировок, описанного выше, полезно добавлять короткий await asyncio.sleep сразу после планирования задачи для категории. Это хорошо сочетается с очередями FIFO в asyncio и aiolimiter и делает продвижение всех цепочек более ровным.
Почему это важно
В задачах с десятками тысяч категорий возможность двигать каждую цепочку сразу по мере прихода её ответов сокращает сквозную задержку до дальнейшей обработки. Одновременно общий ограничитель не даёт случайно перегрузить сервис и удерживает вас в рамках контракта на 10 запросов в секунду. Такая комбинация позволяет получить потенциальное сокращение времени выполнения на 90% благодаря параллелизму, не нарушая гарантии порядка.
Вывод
Ограничивайте глобально, упорядочивайте локально. Оборачивайте каждый HTTP‑вызов общим ограничителем, чтобы удерживать 10 запросов в секунду для всей программы; оставляйте a → b → c в виде простых await внутри категории и при желании добавляйте небольшую паузу после отправки каждой задачи для более плавного планирования. В такой архитектуре asyncio обеспечивает параллельные цепочки, строгий порядок внутри каждой и предсказуемый темп запросов.