2025, Nov 19 21:00

Orchestrating split 7z archive downloads and extraction with Python asyncio, aiohttp, and async subprocesses

Learn how to download split 7z parts with aiohttp while extracting via asyncio subprocess, using TaskGroup to avoid blocking and keep event loop responsive

Async download and on-the-fly extraction of split 7z archives sounds straightforward until the event loop stalls on blocking calls. With Python 3.12.3, asyncio, and aiohttp, the core challenge is orchestrating two pipelines at once: fetching parts for file_N while extracting file_N−1 without freezing the loop.

Problem setup

The input starts as a flat list of 7z parts grouped by base archive name. Synchronous download-then-extract works but wastes time while extraction blocks the loop and the next batch waits.

Problematic code sample

The following snippet shows where things go wrong. The names are different, but the behavior mirrors the original approach.

async def orchestrate(self, items: list):
    base_labels = list(set([re.sub(r'\.7z.*$', '', y) for y in items]))  # ["file_1", "file_2", ...]
    for label in base_labels:
        related = [y for y in items if f"{base_labels}.7z" in y]
        for shard in related:
            self.fetch_chunk(shard)
        self.unpack_sync(label)  # long-running and blocks the loop
async def fetch_chunk(self, ref):
    url = f"{xxx}/file"
    async with self._session.get(url=url) as resp:
        with open(output_document, "wb") as fh:
            size = 64*1024
            async for buf in resp.content.iter_chunked(size):
                fh.write(buf)
async def unpack_sync(self, base) -> None:
    cmd = ['7z', 'x', f"{base}.7z.001", f"-o{save_dir}"]
    subprocess.run(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE).stdout.decode('utf-8')

What actually breaks

The extraction phase uses subprocess.run, which blocks the event loop. While 7z is extracting, no downloads progress. There is also a subtle logic bug in the selection of parts. The filter uses the wrong variable inside the f-string, which means related parts are not selected as intended. Finally, the download coroutine is created but never awaited, so the loop doesn’t actually wait for the I/O to finish.

Reshaping the workflow

The solution hinges on two simple moves. First, transform the flat list of parts into a list of batches, where each inner list contains all parts of a single archive. Second, use asyncio.TaskGroup to download the current batch while asynchronously extracting the previous one. Extraction must run without blocking the event loop; asyncio.create_subprocess_exec does exactly that. If a synchronous extraction wrapper is already in place and thread-safe, asyncio.to_thread is also a viable alternative.

Here is the input restructuring from a flat list of parts to a list of lists.

# FROM
payload = [
    { "part": "file_1.7z.001" },
    { "part": "file_1.7z.xxx" },
    { "part": "file_N.7z.001" },
    { "part": "file_N.7z.xxx" },
]
# TO
grouped = [
    [ "file_1.7z.001", "file_1.7z.xxx" ],
    [ "file_N.7z.001", "file_N.7z.xxx" ],
]

Working solution with asyncio + aiohttp

The following code downloads parts for index i while extracting index i−1, and finally extracts the last batch after the loop completes.

async def process_bundles(self, groups: list) -> None:
    for idx, group in enumerate(groups):
        if idx == 0:
            async with asyncio.TaskGroup() as tg:
                [
                    tg.create_task(
                        self.pull_from_gitlab(part, part)  # url and output path bound to the same part reference here
                    ) for part in group
                ]
        if idx != 0:
            async with asyncio.TaskGroup() as tg:
                [
                    tg.create_task(
                        self.pull_from_gitlab(part, part)
                    ) for part in group
                ]
                argv = [
                    'x',
                    groups[idx-1][0],
                    output_dir
                ]
                tg.create_task(
                    self.spawn_7z("7z", argv)
                )
    argv = [
        'x',
        groups[-1][0],
        output_dir
    ]
    await self.spawn_7z("7z", argv)
async def pull_from_gitlab(self, url: str, output_path: str, limiter: int = 2) -> None:
    async with asyncio.Semaphore(limiter):  # 2 concurrent downloads by default
        async with self._session.get(url=url) as resp:
            with open(output_path, "wb") as fp:
                chunk_size = 64*1024
                async for chunk in resp.content.iter_chunked(chunk_size):
                    fp.write(chunk)
async def spawn_7z(self, program: str, args: list[str]) -> None:
    proc = await asyncio.create_subprocess_exec(
        program,
        *args,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    await proc.communicate()
    print(f'{program} {" ".join(args)} exited with {proc.returncode}')

Why this works

Async I/O and CPU-bound or blocking tasks do not mix unless the blocking parts are moved off the loop. By keeping downloads inside the event loop and spawning extraction as an async subprocess, both pipelines make progress. The per-iteration TaskGroup lets the current batch of parts flow to disk while the previous batch decompresses in parallel. The final await ensures the last archive is extracted after the loop.

Conclusion

When orchestrating downloads and extraction for split 7z archives with asyncio and aiohttp, treat subprocesses as first-class async tasks. Keep network I/O on the event loop, execute extraction through asyncio.create_subprocess_exec, and align batches by index so that extraction lags one step behind download. Double-check part selection logic and remember to await coroutines; both details are easy to miss and costly at runtime.