2025, Oct 20 09:19

Как ускорить и стабилизировать асинхронную загрузку файлов в Python

Почему asyncio‑загрузки тормозят и рвут файлы, и как исправить: ограничение параллелизма семафором, потоковая запись на диск, обработка ошибок в aiohttp и httpx.

Скачивание крупных медиафайлов по сети кажется простой задачей, пока не появляются параллелизм, потоковая передача и координация ввода‑вывода. Типичная ловушка — написать «полностью асинхронный» код, который запускает десятки запросов одновременно, а затем удивляться, почему он работает медленнее небольшого пула потоков или, хуже того, почему сохранённые файлы оказываются повреждёнными. Разберём реальный случай: посмотрим, почему асинхронная версия тормозит и выдаёт неполные файлы, а затем исправим это без перестройки всей архитектуры.

Постановка задачи

Задача — одновременно загрузить изображения по нескольким URL. Решение на потоках с небольшим пулом работает удовлетворительно. Асинхронная версия, однако, неожиданно медленная и, как выясняется при проверке, записывает неполные, неоткрываемые файлы.

Минимальный асинхронный пример, демонстрирующий проблему

Фрагмент ниже создаёт по одной задаче на каждый URL, читает весь HTTP‑ответ целиком в память и записывает его одним махом. На первый взгляд всё нормально, но именно здесь проявляются тормоза и испорченные выходные файлы.

import asyncio
import aiohttp
import aiofiles
import os
import time
async def fetch_image_raw(link, out_dir, sess):
    async with sess.get(link) as resp:
        data = await resp.read()
        name = link.split('/')[-1]
        dest = os.path.join(out_dir, name)
        async with aiofiles.open(dest, "wb") as fh:
            await fh.write(data)
async def run_all():
    async with aiohttp.ClientSession() as sess:
        tasks = [fetch_image_raw(u, "async_slow", sess) for u in links]
        await asyncio.gather(*tasks)
if __name__ == "__main__":
    links = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    os.makedirs("async_slow", exist_ok=True)
    t0 = time.time()
    asyncio.run(run_all())
    t1 = time.time()
    print(f"Asyncio download time: {t1 - t0:.2f} seconds")

Что на самом деле происходит

Два наблюдения объясняют поведение. Во‑первых, файлы не докачиваются: они создаются, но не открываются — это признак ошибки загрузки, а не декодирования. Во‑вторых, решение на потоках явно ограничивает параллелизм параметром max_workers=3, тогда как асинхронная версия пытается обратиться ко всем источникам сразу. Если сократить число одновременных запросов в асинхронном коде до уровня пула потоков, расхождение исчезает. При выровненном параллелизме и корректной потоковой записи на диск оба подхода завершаются за сопоставимое время.

Исправление: потоковая передача, обработка ошибок и ограничение конкуренции

Исправленная асинхронная версия использует семафор для ограничения числа одновременных запросов, пишет данные на диск порциями и проверяет ошибки ответа. Это стабилизирует загрузки и избавляет от обрезанных файлов.

from functools import wraps
import aiofiles
import aiohttp
import asyncio
import os
import time
def cap_parallelism(limit=10):
    gate = asyncio.Semaphore(limit)
    def decorator(fn):
        @wraps(fn)
        async def inner(*args, **kwargs):
            async with gate:
                return await fn(*args, **kwargs)
        return inner
    return decorator
@cap_parallelism(limit=3)
async def fetch_streaming(sess: aiohttp.ClientSession, link: str, dest_path: str):
    try:
        async with sess.get(link, headers={'User-Agent': 'Downloader/1.0'}) as res:
            assert res.status == 200
            async with aiofiles.open(dest_path, mode='wb') as out:
                async for chunk in res.content.iter_chunked(8192):
                    await out.write(chunk)
    except Exception as exc:
        print(f"Error retrieving {link}: {exc}")
def to_path(root: str, link: str) -> str:
    name = link.split('/')[-1]
    return os.path.join(root, name)
async def run_fixed():
    urls = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    target_dir = 'async_fixed'
    os.makedirs(target_dir, exist_ok=True)
    async with aiohttp.ClientSession() as sess:
        jobs = [fetch_streaming(sess, u, to_path(target_dir, u)) for u in urls]
        await asyncio.gather(*jobs)
if __name__ == '__main__':
    start = time.time()
    asyncio.run(run_fixed())
    stop = time.time()
    print(f"Asyncio download time: {stop - start:.2f} seconds")

Если вам ближе вариант на потоках, эквивалентное исправление — читать ответ из requests в потоковом режиме и удерживать размер пула равным трём. Это совпадает с асинхронным ограничением выше и даёт аналогичный результат.

from concurrent import futures
import os
import requests
import time
def stream_download(link, out_file):
    try:
        with requests.get(link, stream=True, headers={'User-Agent': 'Downloader/1.0'}) as resp:
            resp.raise_for_status()
            with open(out_file, 'wb') as sink:
                for block in resp.iter_content(chunk_size=8192):
                    if not block:
                        break
                    sink.write(block)
    except Exception as exc:
        print(f"Error retrieving {link}: {exc}")
def to_dest(folder, link):
    name = link.split('/')[-1]
    return os.path.join(folder, name)
def run_pool():
    urls = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    out_dir = 'threadpool_fixed'
    os.makedirs(out_dir, exist_ok=True)
    with futures.ThreadPoolExecutor(max_workers=3) as pool:
        pool.map(lambda u: stream_download(u, to_dest(out_dir, u)), urls)
if __name__ == '__main__':
    t0 = time.time()
    run_pool()
    t1 = time.time()
    print(f"ThreadPoolExecutor download time: {t1 - t0:.2f} seconds")

В качестве альтернативного асинхронного HTTP‑клиента httpx при тех же ограничениях даёт тот же итог. Схема та же: ограничиваем конкуренцию, пишем на диск чанками и явно обрабатываем HTTP‑ошибки.

from functools import wraps
import aiofiles
import asyncio
import httpx
import os
import time
def cap_parallelism(limit=10):
    gate = asyncio.Semaphore(limit)
    def decorator(fn):
        @wraps(fn)
        async def inner(*args, **kwargs):
            async with gate:
                return await fn(*args, **kwargs)
        return inner
    return decorator
@cap_parallelism(limit=3)
async def fetch_httpx(client: httpx.AsyncClient, link: str, dest: str):
    try:
        resp = await client.get(link, headers={'User-Agent': 'Downloader/1.0'})
        resp.raise_for_status()
        async with aiofiles.open(dest, 'wb') as out:
            for chunk in resp.iter_bytes(chunk_size=8192):
                await out.write(chunk)
    except Exception as exc:
        print(f"Error retrieving {link}: {exc}")
def make_path(root: str, link: str) -> str:
    name = link.split('/')[-1]
    return os.path.join(root, name)
async def run_httpx():
    urls = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    out_dir = 'httpx_async'
    os.makedirs(out_dir, exist_ok=True)
    async with httpx.AsyncClient() as client:
        jobs = [fetch_httpx(client, u, make_path(out_dir, u)) for u in urls]
        await asyncio.gather(*jobs)
if __name__ == '__main__':
    start = time.time()
    asyncio.run(run_httpx())
    stop = time.time()
    print(f"Asyncio download time: {stop - start:.2f} seconds")

Почему это важно

Разрыв между «работает у меня» и «стабильно работает в масштабе» часто прячется в деталях параллелизма и ввода‑вывода. Неограниченное количество асинхронных запросов может перегрузить удалённые серверы или вашу среду и привести к скрытым сбоям вроде усечённых файлов. Согласование уровня параллелизма между реализациями и потоковая передача ответа решают сразу два вопроса — корректность и пропускную способность. В замерах при таких настройках решения на пуле потоков и на async завершаются примерно за одно и то же время — хороший ориентир при выборе подхода.

Итоги

При параллельной загрузке больших файлов относитесь к сети и диску как к потоковым конвейерам. Осознанно ограничивайте параллелизм, проверяйте HTTP‑ошибки и записывайте данные на диск постепенно. Придерживаясь этих принципов — будь то небольшой ThreadPoolExecutor, aiohttp или httpx — вы получите стабильную работу и сопоставимую производительность.

Статья основана на вопросе на StackOverflow от Daniil Yefimov и ответе Detlef.