2025, Dec 07 12:02

Параллельная multipart‑загрузка больших MP4 в Amazon S3 с aioboto3 и httpx

Почему асинхронная загрузка MP4 в Amazon S3 оказывается последовательной, и как параллельная multipart‑отправка частей через aioboto3 и httpx ускоряет процесс.

Быстрое перемещение MP4 на сотни мегабайт из внешних API в Amazon S3 — задача, ограниченная I/O. Сетевые обмены, HTTP‑стриминг и загрузки в S3 складываются, поэтому при строго последовательном исполнении на один файл уходят минуты. Асинхронный подход помогает, но только если создать достаточно полезной конкурентности. Ниже — наглядный разбор ситуации, когда загрузки выглядели «асинхронными», но вели себя последовательно, и как одновременная отправка частей в multipart‑загрузке улучшила реальное время выполнения.

Откуда берётся замедление

Изначально каждый исходный ролик стримился через httpx и загружался в S3 с помощью aioboto3 через multipart‑загрузку, но части отправлялись по одной. Это означало, что для файла в любой момент в полёте была только одна часть S3, а между файлами нередко казалось, будто S3 обрабатывает их последовательно. Конкурентность существовала на уровне задач, но внутри каждой задачи работа сериализовалась: ожидалась загрузка каждой части, прежде чем запускалась следующая.

Типичный паттерн: последовательная multipart‑загрузка внутри асинхронного процесса

Ниже приведён минимальный вариант паттерна, где части отправляются строго по порядку. Логика та же, идентификаторы переименованы для ясности. Скрипт обрабатывает несколько файлов параллельно, но части каждого файла всё равно уходят по одной.

import asyncio
import httpx
from botocore.exceptions import ClientError
import aioboto3
from typing import AsyncIterator
async def push_chunks_async(byte_stream: AsyncIterator[bytes], 
                            s3_object_key: str) -> None:
    """
    Асинхронно загружает видеоролики через aioboto3 с использованием multipart-загрузки,
    но части отправляются и ожидаются последовательно.
    """
    aio_session = aioboto3.Session()
    async with aio_session.client('s3') as s3_cli:
        init_meta = await s3_cli.create_multipart_upload(Bucket=bucket_label, Key=s3_object_key)
        multi_id = init_meta['UploadId']
        assembled_parts = []
        part_no = 1
        try:
            async for piece in byte_stream:
                part_reply = await s3_cli.upload_part(
                    Bucket=bucket_label,
                    Key=s3_object_key,
                    PartNumber=part_no,
                    UploadId=multi_id,
                    Body=piece
                )
                assembled_parts.append({
                    'ETag': part_reply['ETag'],
                    'PartNumber': part_no
                })
                part_no += 1
                print('Chunk done')
            await s3_cli.complete_multipart_upload(
                Bucket=bucket_label,
                Key=s3_object_key,
                UploadId=multi_id,
                MultipartUpload={'Parts': assembled_parts}
            )
            print('Upload Done')
        except Exception as err:
            await s3_cli.abort_multipart_upload(
                Bucket=bucket_label,
                Key=s3_object_key,
                UploadId=multi_id
            )
            print('An error occured', err)
async def fetch_and_ship(source_url):
    async with httpx.AsyncClient(timeout=httpx.Timeout(20, read=10)) as net:
        async with net.stream("GET", source_url) as resp:
            if resp.status_code == 200:
                try:
                    key_path = f'{dir_tag}/{file_tag}'
                    await push_chunks_async(resp.aiter_bytes(chunk_size=100*1024*1024), key_path)
                except ClientError as e:
                    print('Upload Failed', e)
            else:
                print('Bad Response', resp.status_code)
async def driver():
    endpoints = [
        'https://api.example.com/somevideo.mp4',
        'https://api.example.com/somevideo.mp4',
        'https://api.example.com/somevideo.mp4'
    ]
    jobs = [fetch_and_ship(u) for u in endpoints]
    await asyncio.gather(*jobs)
if __name__ == '__main__':
    bucket_label = 'Test-Bucket'
    dir_tag = 'Test'
    file_tag = 'Video.MP4'
    asyncio.run(driver())

Суть проблемы

Асинхронный I/O ускоряет работу только при наличии нескольких операций в полёте. В коде выше загрузка каждой части дожидается завершения перед стартом следующей, поэтому один файл продвигается последовательно. Отсюда ощущение, что S3 «делает по одному», хотя снаружи вы запускаете несколько URL через asyncio.gather. При крупных частях стоимость ожидания каждой отправки доминирует, и цикл событий не может перекрывать работу S3 для того же объекта.

Для больших объектов Amazon рекомендует multipart‑загрузку при размере файла свыше 100 МБ. Это создаёт основу для конкурентности, но чтобы максимизировать пропускную способность при сетевом лимите, части одного файла тоже нужно отправлять одновременно.

Что действительно помогает: параллельная отправка частей в multipart‑загрузке

Идея в том, чтобы запускать загрузку каждой части отдельной задачей и ожидать их вместе. Так части одного объекта уходят параллельно, при этом сохраняется конкурентность и между разными файлами. Ниже — доработанная процедура, которая создаёт задачи на части и собирает их результаты перед завершением мультизагрузки.

import asyncio
import aioboto3
from typing import AsyncIterator, Dict, Any
async def put_concurrent_parts(byte_source: AsyncIterator[bytes], 
                               target_key: str) -> None:
    """
    Асинхронно загружает видеоролики через aioboto3
    с параллельной отправкой частей в multipart-загрузке.
    """
    aio_ses = aioboto3.Session()
    async with aio_ses.client('s3') as s3c:
        init_result = await s3c.create_multipart_upload(Bucket=bucket_label, Key=target_key)
        upload_token = init_result['UploadId']
        part_no = 1
        pending = []
        async def ship_piece(seq_no: int, payload: bytes) -> Dict[str, Any]:
            reply = await s3c.upload_part(
                Bucket=bucket_label,
                Key=target_key,
                PartNumber=seq_no,
                UploadId=upload_token,
                Body=payload
            )
            return {
                'ETag': reply['ETag'],
                'PartNumber': part_no
            }
        try:
            part_no = 1
            async for blob in byte_source:
                job = asyncio.create_task(ship_piece(part_no, blob))
                pending.append(job)
                part_no += 1
            collected = await asyncio.gather(*pending)
            await s3c.complete_multipart_upload(
                Bucket=bucket_label,
                Key=target_key,
                UploadId=upload_token,
                MultipartUpload={'Parts': collected}
            )
            print('Upload Done')
        except Exception as exc:
            await s3c.abort_multipart_upload(
                Bucket=bucket_label,
                Key=target_key,
                UploadId=upload_token
            )
            print('An error occurred', exc)

Чего ожидать после включения параллелизма

Когда части загружаются одновременно, вы максимально используете полезную конкурентность и внутри файла, и между файлами. С этого момента общее время упирается в доступную полосу и задержки сети. Для видео на сотни мегабайт критичен быстрый сетевой путь. Запуск скрипта на облачном инстансе даёт ту пропускную способность, которой домашнему роутеру обычно не хватает, — и именно там вы увидите самый заметный выигрыш после внедрения параллельной multipart‑загрузки.

Почему это важно

Конвейеры приёма крупных объектов зависят от I/O. Если отправка частей идёт по очереди, пропускная способность искусственно ограничена, как бы «асинхронно» ни выглядел остальной код. Параллелизм на уровне multipart превращает простои в полезную работу и сокращает время на файл, пока узким местом не станет сеть. Это согласуется с рекомендацией Amazon использовать multipart для файлов свыше 100 МБ и делает поведение таким, как ожидается: «несколько файлов создаются и загружаются одновременно», а не «обрабатываются по одному».

Выводы

Стримьте данные из источника, применяйте multipart‑загрузку для больших файлов и запускайте несколько отправок частей S3 одновременно, чтобы держать цикл событий занятым. После этого главным ограничителем остаётся сеть. Если упираетесь в скорость исходящего канала локально, запуск того же кода в облаке — практичный способ сократить общее время.