2025, Nov 29 00:02

Асинхронная загрузка частей и распаковка 7z на Python: asyncio и aiohttp

Пошаговое решение: как с asyncio и aiohttp параллелить загрузку частей и распаковку архивов 7z. TaskGroup, create_subprocess_exec, пример кода и типичные ошибки

Асинхронная загрузка и одновременная распаковка разбитых на части архивов 7z кажется простой задачей, пока цикл событий не замирает на блокирующих вызовах. В связке Python 3.12.3, asyncio и aiohttp ключевая сложность — согласованное управление двумя конвейерами сразу: получать части для file_N, пока распаковывается file_N−1, не «ставя на паузу» цикл.

Постановка задачи

На входе — плоский список частей 7z, сгруппированных по базовому имени архива. Синхронная схема «сначала скачать, затем распаковать» работает, но теряет время: распаковка блокирует цикл, а следующая партия простаивает.

Проблемный пример кода

Ниже — фрагмент, на котором видно, что идёт не так. Имена отличаются, но поведение повторяет исходный подход.

async def orchestrate(self, items: list):
    base_labels = list(set([re.sub(r'\.7z.*$', '', y) for y in items]))  # ["file_1", "file_2", ...]
    for label in base_labels:
        related = [y for y in items if f"{base_labels}.7z" in y]
        for shard in related:
            self.fetch_chunk(shard)
        self.unpack_sync(label)  # длительная операция, блокирует цикл


async def fetch_chunk(self, ref):
    url = f"{xxx}/file"
    async with self._session.get(url=url) as resp:
        with open(output_document, "wb") as fh:
            size = 64*1024
            async for buf in resp.content.iter_chunked(size):
                fh.write(buf)


async def unpack_sync(self, base) -> None:
    cmd = ['7z', 'x', f"{base}.7z.001", f"-o{save_dir}"]
    subprocess.run(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE).stdout.decode('utf-8')

Что на самом деле ломается

На этапе распаковки используется subprocess.run, и он блокирует цикл событий. Пока 7z занят, загрузки не двигаются. Плюс есть тонкая логическая ошибка в отборе частей: в f‑строке используется не та переменная, из‑за чего связанные части выбираются неверно. Наконец, корутина загрузки создаётся, но её не await-ят, поэтому цикл на самом деле не ждёт завершения ввода‑вывода.

Перестраиваем процесс

Решение держится на двух шагах. Во‑первых, преобразовать плоский список частей в список «пакетов», где каждый внутренний список содержит все части одного архива. Во‑вторых, использовать asyncio.TaskGroup: скачивать текущую партию, одновременно асинхронно распаковывая предыдущую. Распаковка не должна блокировать цикл событий — для этого подходит asyncio.create_subprocess_exec. Если у вас уже есть синхронная обёртка распаковки и она потокобезопасна, можно применить и asyncio.to_thread.

Вот как входные данные преобразуются из плоского списка частей в список списков.

# БЫЛО
payload = [
    { "part": "file_1.7z.001" },
    { "part": "file_1.7z.xxx" },

    { "part": "file_N.7z.001" },
    { "part": "file_N.7z.xxx" },
]

# СТАЛО
grouped = [
    [ "file_1.7z.001", "file_1.7z.xxx" ],
    [ "file_N.7z.001", "file_N.7z.xxx" ],
]

Рабочее решение с asyncio + aiohttp

Код ниже загружает части для индекса i, параллельно распаковывая индекс i−1, а после завершения цикла извлекает последнюю партию.

async def process_bundles(self, groups: list) -> None:
    for idx, group in enumerate(groups):
        if idx == 0:
            async with asyncio.TaskGroup() as tg:
                [
                    tg.create_task(
                        self.pull_from_gitlab(part, part)  # здесь url и путь сохранения привязаны к одной и той же ссылке на часть
                    ) for part in group
                ]

        if idx != 0:
            async with asyncio.TaskGroup() as tg:
                [
                    tg.create_task(
                        self.pull_from_gitlab(part, part)
                    ) for part in group
                ]

                argv = [
                    'x',
                    groups[idx-1][0],
                    output_dir
                ]
                tg.create_task(
                    self.spawn_7z("7z", argv)
                )

    argv = [
        'x',
        groups[-1][0],
        output_dir
    ]
    await self.spawn_7z("7z", argv)


async def pull_from_gitlab(self, url: str, output_path: str, limiter: int = 2) -> None:
    async with asyncio.Semaphore(limiter):  # по умолчанию 2 одновременные загрузки
        async with self._session.get(url=url) as resp:
            with open(output_path, "wb") as fp:
                chunk_size = 64*1024
                async for chunk in resp.content.iter_chunked(chunk_size):
                    fp.write(chunk)


async def spawn_7z(self, program: str, args: list[str]) -> None:
    proc = await asyncio.create_subprocess_exec(
        program,
        *args,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    await proc.communicate()
    print(f'{program} {" ".join(args)} exited with {proc.returncode}')

Почему это работает

Асинхронный I/O и CPU‑связанные или блокирующие задачи не уживаются, если блокирующие части не вынесены из цикла. Оставляя загрузки внутри цикла событий и запуская распаковку как асинхронный подпроцесс, оба конвейера движутся параллельно. TaskGroup на каждой итерации позволяет текущей партии попадать на диск, пока предыдущая распаковывается. Финальный await гарантирует, что последний архив извлечён после завершения цикла.

Выводы

Организуя загрузку и распаковку разбитых архивов 7z с asyncio и aiohttp, относитесь к подпроцессам как к полноценным асинхронным задачам. Сетевой I/O оставляйте в цикле событий, распаковку запускайте через asyncio.create_subprocess_exec и выравнивайте партии по индексу так, чтобы распаковка отставала на один шаг от загрузки. Проверьте логику отбора частей и не забывайте await корутины — эти мелочи легко упустить и дорого обходятся на практике.