2025, Oct 07 07:17

Потоковая упаковка больших объектов MinIO в zip с miniopy-async без лишней памяти

Решение для упаковки больших объектов MinIO в zip с miniopy-async: поток без буферизации в ОЗУ, запись во временный файл и загрузка обратно — при 4 ГБ ОЗУ

Сжать набор больших объектов MinIO в один архив кажется простым, пока не появляются цифры. Отдельные объекты могут достигать 40 ГБ, доступной оперативной памяти — 4 ГБ, а место на диске — около 240 ГБ. С miniopy-async в стеке единственный реалистичный путь — отказаться от буферизации целых данных в памяти и вести потоковую загрузку и создание архива.

Ловушка: сборка zip в памяти

Читать объекты целиком перед записью в zip — первая идея, к которой часто прибегают, и именно она ломается при жёстких ограничениях по памяти. Следующий пример показывает, почему такой подход небезопасен для многогигабайтных объектов.

import io
import zipfile
import asyncio
from miniopy_async import Minio
api = Minio("localhost:9000", access_key="xxx", secret_key="xxx", secure=False)
async def build_zip_in_ram(source_bucket, item_keys):
    buf = io.BytesIO()
    with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_STORED) as bundle:
        for name in item_keys:
            res = await api.get_object(source_bucket, name)
            blob = await res.read()  # загружает весь объект в ОЗУ
            bundle.writestr(name, blob)
            await res.close()
    return buf.getvalue()  # теперь весь архив тоже находится в ОЗУ

Эта версия загружает каждый объект через read(), полностью хранит его в памяти и собирает архив в буфере BytesIO. При 4 ГБ ОЗУ и объектах до 40 ГБ она гарантированно упрётся в потолок.

Суть проблемы

Корень проблемы — буферизация. Получение крупных объектов через get_object и read() выделяет память, пропорциональную размеру объекта, а растущий zip в BytesIO многократно усиливает давление. Решение — перейти от буферизации в памяти к потоковой обработке на всём пути.

miniopy-async предоставляет асинхронный поток из get_object, позволяя обрабатывать данные по кускам по мере поступления. Модуль zipfile в Python работает с файловыми целями, но ожидает, что к ним можно обращаться с произвольным доступом (seek). Простой и надёжный обходной путь — писать архив во временный файл на диск, что соответствует условиям, когда диска существенно больше, чем ОЗУ.

Практичное потоковое решение с временным файлом

Решение ниже передаёт каждый объект из MinIO прямо в запись архива, выводит zip во временный файл на диске, а затем загружает этот файл обратно в MinIO как единый объект. На всём протяжении процесса в памяти не держатся ни целые объекты, ни весь архив.

import asyncio
import aiofiles
import tempfile
import zipfile
from miniopy_async import Minio
mc = Minio("localhost:9000", access_key="xxx", secret_key="xxx", secure=False)
async def pack_and_push(src_bucket, object_ids, dst_bucket, dst_name):
    # создаем временный файл для результирующего zip-архива
    with tempfile.NamedTemporaryFile(delete=False) as tf:
        spool_path = tf.name
    # передаем каждый объект в архив потоком, не загружая его целиком в ОЗУ
    with zipfile.ZipFile(spool_path, "w", compression=zipfile.ZIP_STORED) as archive:
        for obj_key in object_ids:
            stream = await mc.get_object(src_bucket, obj_key)
            with archive.open(obj_key, "w") as sink:
                async for fragment in stream.stream():
                    sink.write(fragment)
            await stream.close()
    # загружаем готовый zip-файл обратно в MinIO
    await mc.fput_object(dst_bucket, dst_name, spool_path)

Такой подход ограничивает потребление памяти размером одного передаваемого и записываемого фрагмента в текущий момент, а zipfile пишет на диск архив с произвольным доступом. С диском на 240 ГБ можно размещать крупные архивы, пока итоговый zip помещается на носителе.

Когда диска не хватает

Если суммарный объём N объектов превышает доступное место на диске, следующий шаг — стримить zip напрямую в MinIO с помощью multipart‑загрузки. Поскольку zipfile ожидает цель с поддержкой seek, потребуется потоковая реализация zip вместо стандартной библиотеки. Один из вариантов — библиотека вроде zipstream, умеющая формировать zip как поток.

Заметки из практики

Отмечу это как решение. Второй метод самый рациональный. Но есть ещё идея — «блокировать» io.BytesIO после .read(), чтобы ждать новые данные, это поможет последовательно загружать объект. Но это теоретически.

Мысль та же: не накапливать полный объём данных в памяти, а выбирать последовательный поток. На практике временный файл на диске остаётся самым простым и предсказуемым вариантом при обозначенных ограничениях.

Зачем это важно

Конвейеры обработки крупных объектов часто ломаются не из‑за полосы пропускания, а из‑за неограниченной буферизации. Потоковая схема минимизирует пиковое потребление памяти, сохраняет стабильность процесса при жёстких лимитах ОЗУ и предотвращает случайные OOM‑события при вроде бы простых операциях, таких как архивация. Использование временного файла с поддержкой seek позволяет обойти требования формата zip, не усложняя процесс загрузки.

Итоги

Если нужно упаковать в zip множество больших объектов MinIO с miniopy-async на хосте с ограниченной памятью, переводите всё на стриминг. Итерируйтесь по фрагментам из get_object вместо read(), записывайте записи архива прямо во временный файл на диске, затем отправляйте готовый файл через fput_object. Если итоговый архив не помещается на диске, переходите к потоковой реализации zip и multipart‑загрузке. Держитесь подальше от буферов в памяти для многогигабайтных объектов — так конвейер останется и быстрым, и предсказуемым.

Статья основана на вопросе на StackOverflow от TASK и ответе Aditya.