2025, Oct 20 09:00

Why Python asyncio downloads are slow and corrupt files, and how streaming with bounded concurrency fixes them

Diagnose and fix slow Python asyncio downloads: stream large files to disk, limit concurrency with semaphores, and handle errors. For aiohttp, httpx, threads.

Downloading large media over the network sounds simple until concurrency, streaming, and I/O coordination enter the picture. A common trap is to spin up “fully async” code that fires off dozens of requests in parallel and then wonder why it’s slower than a small thread pool or, worse, why the saved files are corrupt. Let’s walk through a real-world case, see why the async version stalls and produces incomplete files, and then fix it without changing the overall architecture.

Problem setup

The task is to fetch images from multiple URLs concurrently. A thread-based solution with a small pool performs acceptably. The async version, however, is unexpectedly slow and, as it turns out on inspection, writes incomplete files that can’t be opened.

Minimal async example that demonstrates the issue

The following snippet launches one task per URL, reads the entire HTTP response into memory, and writes it out in one go. It looks fine at a glance, but this is where the slowdown and broken outputs show up.

import asyncio
import aiohttp
import aiofiles
import os
import time

async def fetch_image_raw(link, out_dir, sess):
    async with sess.get(link) as resp:
        data = await resp.read()
        name = link.split('/')[-1]
        dest = os.path.join(out_dir, name)
        async with aiofiles.open(dest, "wb") as fh:
            await fh.write(data)

async def run_all():
    async with aiohttp.ClientSession() as sess:
        tasks = [fetch_image_raw(u, "async_slow", sess) for u in links]
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    links = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    os.makedirs("async_slow", exist_ok=True)
    t0 = time.time()
    asyncio.run(run_all())
    t1 = time.time()
    print(f"Asyncio download time: {t1 - t0:.2f} seconds")

What’s really going on

Two observations explain the behavior. First, the files aren’t fully downloaded; the outputs exist but can’t be opened, which indicates a download error rather than a decoding problem. Second, the thread-based solution explicitly caps parallelism with max_workers=3, while the async version attempts to hit all endpoints at once. Reducing the number of concurrent calls in the async flow to the same level as the thread pool removes this discrepancy. With concurrency aligned and downloads streamed to disk properly, both approaches finish in comparable time.

Fix: stream responses, handle errors, and bound concurrency

The corrected async version uses a semaphore to cap the number of in-flight requests, streams data in chunks to disk, and checks for request errors. This stabilizes downloads and eliminates partial files.

from functools import wraps
import aiofiles
import aiohttp
import asyncio
import os
import time

def cap_parallelism(limit=10):
    gate = asyncio.Semaphore(limit)
    def decorator(fn):
        @wraps(fn)
        async def inner(*args, **kwargs):
            async with gate:
                return await fn(*args, **kwargs)
        return inner
    return decorator

@cap_parallelism(limit=3)
async def fetch_streaming(sess: aiohttp.ClientSession, link: str, dest_path: str):
    try:
        async with sess.get(link, headers={'User-Agent': 'Downloader/1.0'}) as res:
            assert res.status == 200
            async with aiofiles.open(dest_path, mode='wb') as out:
                async for chunk in res.content.iter_chunked(8192):
                    await out.write(chunk)
    except Exception as exc:
        print(f"Error retrieving {link}: {exc}")

def to_path(root: str, link: str) -> str:
    name = link.split('/')[-1]
    return os.path.join(root, name)

async def run_fixed():
    urls = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    target_dir = 'async_fixed'
    os.makedirs(target_dir, exist_ok=True)
    async with aiohttp.ClientSession() as sess:
        jobs = [fetch_streaming(sess, u, to_path(target_dir, u)) for u in urls]
        await asyncio.gather(*jobs)

if __name__ == '__main__':
    start = time.time()
    asyncio.run(run_fixed())
    stop = time.time()
    print(f"Asyncio download time: {stop - start:.2f} seconds")

If you prefer a thread-based variant, the equivalent fix is to stream the response from requests and keep the pool size at three. This matches the async limit above and yields similar results.

from concurrent import futures
import os
import requests
import time

def stream_download(link, out_file):
    try:
        with requests.get(link, stream=True, headers={'User-Agent': 'Downloader/1.0'}) as resp:
            resp.raise_for_status()
            with open(out_file, 'wb') as sink:
                for block in resp.iter_content(chunk_size=8192):
                    if not block:
                        break
                    sink.write(block)
    except Exception as exc:
        print(f"Error retrieving {link}: {exc}")

def to_dest(folder, link):
    name = link.split('/')[-1]
    return os.path.join(folder, name)

def run_pool():
    urls = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    out_dir = 'threadpool_fixed'
    os.makedirs(out_dir, exist_ok=True)
    with futures.ThreadPoolExecutor(max_workers=3) as pool:
        pool.map(lambda u: stream_download(u, to_dest(out_dir, u)), urls)

if __name__ == '__main__':
    t0 = time.time()
    run_pool()
    t1 = time.time()
    print(f"ThreadPoolExecutor download time: {t1 - t0:.2f} seconds")

As an alternative async HTTP client, httpx reaches the same outcome under the same constraints. The structure is identical: limit concurrency, stream to disk in chunks, and surface HTTP errors.

from functools import wraps
import aiofiles
import asyncio
import httpx
import os
import time

def cap_parallelism(limit=10):
    gate = asyncio.Semaphore(limit)
    def decorator(fn):
        @wraps(fn)
        async def inner(*args, **kwargs):
            async with gate:
                return await fn(*args, **kwargs)
        return inner
    return decorator

@cap_parallelism(limit=3)
async def fetch_httpx(client: httpx.AsyncClient, link: str, dest: str):
    try:
        resp = await client.get(link, headers={'User-Agent': 'Downloader/1.0'})
        resp.raise_for_status()
        async with aiofiles.open(dest, 'wb') as out:
            for chunk in resp.iter_bytes(chunk_size=8192):
                await out.write(chunk)
    except Exception as exc:
        print(f"Error retrieving {link}: {exc}")

def make_path(root: str, link: str) -> str:
    name = link.split('/')[-1]
    return os.path.join(root, name)

async def run_httpx():
    urls = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    out_dir = 'httpx_async'
    os.makedirs(out_dir, exist_ok=True)
    async with httpx.AsyncClient() as client:
        jobs = [fetch_httpx(client, u, make_path(out_dir, u)) for u in urls]
        await asyncio.gather(*jobs)

if __name__ == '__main__':
    start = time.time()
    asyncio.run(run_httpx())
    stop = time.time()
    print(f"Asyncio download time: {stop - start:.2f} seconds")

Why this matters

The gap between “works locally” and “works reliably at scale” often hides in concurrency and I/O details. Allowing unbounded async requests can overwhelm remote servers or your own environment and lead to subtle failures such as truncated files. Matching concurrency between implementations and streaming responses addresses both correctness and throughput. In measured comparisons under these adjustments, the thread pool and the async solutions complete in similar time, which is a useful sanity check when choosing an approach.

Takeaways

When downloading large assets concurrently, treat network and disk as streaming pipelines. Cap concurrency intentionally, check for HTTP errors, and write incrementally to disk. If you stick to these principles, whether you use a small ThreadPoolExecutor, aiohttp, or httpx, you can expect stable behavior and comparable performance.

The article is based on a question from StackOverflow by Daniil Yefimov and an answer by Detlef.