2025, Sep 30 15:18

Как параллелить IO-нагруженные циклы в Python asyncio: create_task, gather и Semaphore

Как ускорить IO в asyncio: запускаем create_task и gather, ограничиваем до 4 запросов через Semaphore, обрабатываем результаты as_completed. Практика.

Параллелизация IO-нагруженных циклов в asyncio — это часто малозатратная по усилиям и ощутимая по результату оптимизация. Когда корутина в плотном цикле ожидает каждую операцию, она по определению обрабатывает элементы по одному. Если задача упирается в сеть, значительная часть пропускной способности простаивает. Ниже — сфокусированный разбор того, как отправлять запросы для нескольких координат параллельно и как ограничить параллелизм четырьмя одновременными запросами.

Базовая версия: ожидание по одному

Исходная версия обрабатывает по одной координате за итерацию и сохраняет результат в словарь. Используются aiohttp и вызов find_panorama_async для получения метаданных панорамы. Логика проста, но цикл дожидается каждого вызова, прежде чем переходить к следующему.

import asyncio
from aiohttp import ClientSession
from tqdm.asyncio import tqdm
import pandas as pd
from streetlevel import streetview


geo_map = {
    1: {'country': 'AZE', 'longitude': 48.84328388521507, 'latitude': 39.75349231850633},
    2: {'country': 'AZE', 'longitude': 46.42568983461019, 'latitude': 41.74686028542068},
    3: {'country': 'AZE', 'longitude': 46.77391084838791, 'latitude': 41.05880534123746},
    4: {'country': 'AZE', 'longitude': 46.57032078287734, 'latitude': 39.312871420751485},
    5: {'country': 'AZE', 'longitude': 47.26319069021316, 'latitude': 41.28950907274436},
    6: {'country': 'AZE', 'longitude': 46.49956247696345, 'latitude': 40.96062005058899},
    7: {'country': 'AZE', 'longitude': 48.291171317357815, 'latitude': 38.939108897065445},
    8: {'country': 'AZE', 'longitude': 47.12081533506723, 'latitude': 39.681052295694656},
}


async def scan_points(geo_map):
    pano_map = {}

    async with ClientSession() as http:
        for ident, meta in tqdm(geo_map.items(), desc="Parsing coordinates", unit="coordinate"):
            snap = await streetview.find_panorama_async(meta['latitude'], meta['longitude'], http)

            if snap is not None:
                pano_map[ident] = {
                    'longitude': snap.lon,
                    'latitude': snap.lat,
                    'date': str(snap.date)
                }
            else:
                pano_map[ident] = {
                    'longitude': None,
                    'latitude': None,
                    'date': None
                }

    return pano_map


async def run_pipeline(geo_map):
    outcome = await scan_points(geo_map)

    frame = pd.DataFrame.from_dict(outcome, orient='index')
    frame.reset_index(inplace=True)
    frame.rename(columns={'index': 'uuid'}, inplace=True)
    frame.to_parquet('results.parquet', index=False)


asyncio.run(run_pipeline(geo_map))

Что замедляет

await блокирует корутину до завершения ожидаемой операции. Внутри цикла это не позволяет запускать другие сетевые вызовы, пока не завершится текущий. Подход корректный, но по сути последовательный. Чтобы увеличить пропускную способность, имеет смысл запустить несколько корутин и подождать их результаты позже.

Запускаем сразу много, ждём позже с create_task

Один из способов распараллелить IO — планировать сразу много корутин через create_task и ожидать их потом. Так они стартуют немедленно, а цикл событий чередует их выполнение.

async def scan_points_parallel_fire_and_wait(geo_map):
    results_map = {}

    async with ClientSession() as http:
        pending = []

        # запускаем все задачи сразу
        for ident, meta in tqdm(geo_map.items(), desc="Parsing coordinates", unit="coordinate"):
            fut = asyncio.create_task(
                streetview.find_panorama_async(meta['latitude'], meta['longitude'], http)
            )
            pending.append(fut)

        # позже дожидаемся результата каждой задачи
        for idx, fut in enumerate(pending):
            snap = await fut
            if snap is not None:
                results_map[idx] = {
                    'longitude': snap.lon,
                    'latitude': snap.lat,
                    'date': str(snap.date)
                }
            else:
                results_map[idx] = {
                    'longitude': None,
                    'latitude': None,
                    'date': None
                }

    return results_map

Такой подход резко повышает скорость, отображаемую в tqdm. Важно понимать, что этот показатель отражает темп итераций цикла, а не общее время выполнения от начала до конца. Тем не менее разница заметна: в одном из запусков — от 27.03 coordinate/s до шестизначных значений в прогресс-баре.

Запускаем одновременно через gather

Можно собрать корутины в список без create_task и запустить их все разом через asyncio.gather — результаты вернутся в том же порядке.

async def scan_points_parallel_gather(geo_map):
    results_map = {}

    async with ClientSession() as http:
        batch = []

        # подготавливаем все корутины
        for ident, meta in tqdm(geo_map.items(), desc="Parsing coordinates", unit="coordinate"):
            coro = streetview.find_panorama_async(meta['latitude'], meta['longitude'], http)
            batch.append(coro)

        # запускаем все сразу и собираем результаты
        payloads = await asyncio.gather(*batch)

        for idx, snap in enumerate(payloads):
            if snap is not None:
                results_map[idx] = {
                    'longitude': snap.lon,
                    'latitude': snap.lat,
                    'date': str(snap.date)
                }
            else:
                results_map[idx] = {
                    'longitude': None,
                    'latitude': None,
                    'date': None
                }

    return results_map

В другом запуске отчётная скорость выросла ещё сильнее. И снова: счётчик — не метрика общего времени, но хорошо показывает, сколько простоя удаётся убрать благодаря перекрытию IO.

Ограничиваем параллелизм до четырёх с Semaphore

Неограниченная параллельность подходит не всегда. Чтобы держать в полёте только четыре запроса, оберните каждую корутину семафором. Он дозирует выполнение и гарантирует, что одновременно запущено не больше заданного числа задач.

async def within_limit(lock, label, job):
    async with lock:
        return await job

async def scan_points_bounded(geo_map, max_parallel=4):
    results_map = {}

    async with ClientSession() as http:
        token = asyncio.Semaphore(max_parallel)
        batch = []

        for ident, meta in tqdm(geo_map.items(), desc="Parsing coordinates", unit="coordinate"):
            job = within_limit(
                token,
                ident,
                streetview.find_panorama_async(meta['latitude'], meta['longitude'], http)
            )
            batch.append(job)

        final = await asyncio.gather(*batch)

        for idx, snap in enumerate(final):
            if snap is not None:
                results_map[idx] = {
                    'longitude': snap.lon,
                    'latitude': snap.lat,
                    'date': str(snap.date)
                }
            else:
                results_map[idx] = {
                    'longitude': None,
                    'latitude': None,
                    'date': None
                }

    return results_map

Такой шаблон держит параллелизм под контролем и решает задачу запуска ровно четырёх проверок координат одновременно. Для наглядности во время экспериментов можно добавить искусственную задержку, но для реального выполнения это не требуется.

Порядок против времени завершения

Когда важен порядок, asyncio.gather сохраняет исходную последовательность переданных корутин. Если хотите обрабатывать результаты по мере готовности, используйте asyncio.as_completed — он выдаёт их в порядке завершения. Выбирайте режим в зависимости от того, как вы планируете собирать итог. Для отслеживания прогресса также можно интегрировать gather с tqdm в дружелюбном к асинхронности формате.

Почему это важно

Ожидание внутри цикла последовательно выполняет IO и недоиспользует параллельность. Запуская сетевые операции пакетом, вы убираете паузы и сокращаете «стеночное» время. На практике даже простой переход на create_task или gather заставляет счётчик прогресса взлетать — явный сигнал, что работа перекрывается, а не выстраивается в очередь. Семафор помогает не перегружать внешние сервисы и дозировать нагрузку без переписывания остального конвейера.

Выводы

Используйте await осторожно внутри циклов, которые делают IO. Нужны параллельные запросы — планируйте корутины через create_task и ждите их позднее или собирайте и запускайте разом через gather. Когда требуется жёсткий лимит параллельности, оборачивайте вызовы в Semaphore с нужной шириной, например четыре. Важен порядок — gather его сохраняет; важна обработка по мере готовности — подойдёт as_completed. Эти приёмы легко сочетаются с существующей сессией и шагами с DataFrame и дают мгновимый, хорошо заметный прирост скорости.

Статья основана на вопросе на StackOverflow от Daniel AG и ответе от furas.