2025, Nov 27 09:03
Как стримить tar.gz в FastAPI/Starlette без tarfile-блокировок
Пошаговая потоковая выдача tar.gz в FastAPI/Starlette: асинхронный стример без tarfile, корректный backpressure, gzip/zlib и тесты для крупных архивов.
Стриминг архива tar.gz напрямую из FastAPI/Starlette кажется простой задачей, пока не сталкиваешься с несоответствием между синхронной записью tarfile и асинхронным HTTP‑ответом. Если попытаться направить вывод tarfile прямо в StreamingResponse, быстро выясняется, что TarFile пишет в файловые объекты через синхронный вызов write, а Starlette ожидает асинхронный генератор или async send. Именно этот конфликт срывает упаковку «на лету» для крупных файлов, получаемых с другого бэкенда.
Воспроизведение проблемы
Идея ниже — переопределить метод stream_response у ответа, создать внутренний файловый «приёмник» и позволить tarfile отправлять в него куски данных. Приёмник затем пересылает эти куски в ASGI send. На первый взгляд всё логично, но упирается в две вещи: TarFile работает синхронно, а tarfile.open не поддерживает async with.
import asyncio
import os
import tarfile
from pathlib import Path
from typing import Mapping
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.testclient import TestClient
from starlette.background import BackgroundTask
from starlette.types import Send
ARCHIVE_PATH = Path("archive.tar.gz")
SLICE_BYTES = 1024
api = FastAPI()
INPUT_ITEMS = [("tarfile.py", Path(tarfile.__file__)), ("os.py", Path(os.__file__))]
class TarStreamingResponse(StreamingResponse):
def __init__(
self,
paths_for_tar: list[tuple[str, Path]],
status_code: int = 200,
headers: Mapping[str, str] | None = None,
media_type: str | None = None,
background: BackgroundTask | None = None,
) -> None:
self.paths_for_tar = paths_for_tar
self.status_code = status_code
self.media_type = self.media_type if media_type is None else media_type
self.background = background
self.init_headers(headers)
async def stream_response(self, send: Send) -> None:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
class ChunkSink:
def write(self, buffer):
print(f"Really sending {len(buffer)} bytes")
asyncio.run_coroutine_threadsafe(
send(
{
"type": "http.response.body",
"body": buffer,
"more_body": True,
}
),
asyncio.get_running_loop(),
)
async with tarfile.open(
mode="w|gz", fileobj=ChunkSink(), bufsize=SLICE_BYTES
) as tar_fp:
for name, src in self.paths_for_tar:
await tar_fp.addfile(tarfile.TarInfo(name), src.open("rb"))
await send({"type": "http.response.body", "body": b"", "more_body": False})
@api.get("/")
def deliver_tar() -> StreamingResponse:
return TarStreamingResponse(
paths_for_tar=INPUT_ITEMS,
media_type="application/tar+gzip",
headers={"Content-Disposition": f'attachment; filename="{ARCHIVE_PATH.name}"'},
)
Попытка пойти этим путём приводит к ошибкам при использовании tarfile.open как асинхронного контекстного менеджера. Вы увидите примерно такую ошибку:
TypeError: 'TarFile' object does not support the asynchronous context manager protocol
Даже если убрать async with, останется базовое несоответствие: синхронный TarFile отправляет байты в файловый .write, тогда как ASGI send — это awaitable. Планирование send через run_coroutine_threadsafe не даёт нужного бэкпрешсura и гарантий порядка, поэтому данные могут так и не быть сброшены, как вы ожидаете.
Почему это не работает
Суть трения — в архитектуре. В потоковом режиме (w|gz) TarFile синхронно пишет в файловый объект. А StreamingResponse в Starlette строится вокруг асинхронных генераторов или async send, работающих в петле событий asyncio. TarFile не «понимает» асинхронность и не может быть ожидаем, а tarfile.open не является асинхронным контекстным менеджером. Смешивание синхронной записи и асинхронной отправки без правильного моста либо блокирует, либо ломает управление потоком.
Добавление await перед API TarFile или обёртка tarfile.open в async with проблему не решат. Переделывать сам TarFile под асинхронность — слишком затратно и нецелесообразно.
Рабочий подход
Надёжный способ — управлять потоком tar и gzip самостоятельно и предоставить асинхронный интерфейс, который будет кормить ASGI send. Вместо того чтобы просить TarFile писать в приёмник, соберите небольшой «стример» tar+gzip, который выдаёт сжатые куски через асинхронный колбэк. Ответ просто ждёт этот колбэк для каждого фрагмента, и сетевой бэкпрешcure соблюдается.
Полная реализация
Код ниже читает набор файлов порциями, формирует tar‑поток, на лету сжимает его через zlib и отдаёт через собственный подкласс StreamingResponse. Для построения заголовков и выравнивания используются элементы из tarfile.TarFile и tarfile._Stream, при этом корректно ведутся кадрирование gzip, CRC и учёт размеров.
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "fastapi",
# "httpx",
# "pytest",
# "uvicorn",
# ]
# ///
"""A script to read files by chunks, archive them, compress them and send them as a streaming response on the fly.
To run it, use the uv package manager and run `uv run main.py`.
"""
import os
import struct
import tarfile
import time
import zlib
from pathlib import Path
from typing import Callable, Generator, Mapping
import pytest
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.testclient import TestClient
from starlette.background import BackgroundTask
from starlette.types import Send
### ПАРАМЕТРЫ ###
# Список файлов для чтения, упаковки в tar-gz и потоковой передачи
SOURCE_LIST = [("tarfile.py", Path(tarfile.__file__)), ("os.py", Path(os.__file__))]
# Размер фрагмента, отправляемого в ответе
SLICE_BYTES = 1024
# Режим: pytest или uvicorn
RUN_MODE = "pytest"
##
## АДАПТЕР
##
def iter_file_bytes(file_path: Path, window: int) -> Generator[bytes, None, None]:
"""Read a file and return a generator with bytes."""
with open(file_path, "rb") as fh:
while chunk := fh.read(window):
yield chunk
class GzipTarEmitter:
"""An object to read files and generate tar-gz chunks.
This is written using pieces of `tarfile.TarFile` and `tarfile._Stream`.
"""
def __init__(
self,
items_to_pack: list[tuple[str, Path]],
push: Callable[[bytes], None],
window: int = SLICE_BYTES,
level: int = 9,
out_name: str = "archive.tar.gz",
) -> None:
self.items_to_pack = items_to_pack
self.push = push
self.window = window
self.out_name = out_name
self.accum = b""
self.wrote = 0
self.level = level
self.exception = zlib.error
self.hash_crc = zlib.crc32(b"")
async def put(self, chunk: bytes) -> None:
self.hash_crc = zlib.crc32(chunk, self.hash_crc)
self.wrote += len(chunk)
chunk = self.deflater.compress(chunk)
await self._drip(chunk)
async def _drip(self, chunk: bytes) -> None:
self.accum += chunk
while len(self.accum) > self.window:
await self.push(self.accum[: self.window])
self.accum = self.accum[self.window :]
async def start_gzip(self) -> None:
self.deflater = zlib.compressobj(
self.level,
zlib.DEFLATED,
-zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL,
0,
)
timestamp = struct.pack("<L", int(time.time()))
await self._drip(b"\037\213\010\010" + timestamp + b"\002\377")
if self.out_name.endswith(".gz"):
self.out_name = self.out_name[:-3]
self.out_name = os.path.basename(self.out_name)
await self._drip(self.out_name.encode("iso-8859-1", "replace") + tarfile.NUL)
async def emit_tar_stream(self) -> None:
cursor = 0
window = SLICE_BYTES
for name, input_path in self.items_to_pack:
info = tarfile.TarInfo(name)
info.size = input_path.stat().st_size
info.type = tarfile.REGTYPE
header = info.tobuf()
await self.put(header)
cursor += len(header)
for piece in iter_file_bytes(input_path, window):
await self.put(piece)
cursor += info.size
blocks, remainder = divmod(info.size, tarfile.BLOCKSIZE)
if remainder > 0:
await self.put(tarfile.NUL * (tarfile.BLOCKSIZE - remainder))
blocks += 1
cursor += blocks * tarfile.BLOCKSIZE
await self.put(tarfile.NUL * (tarfile.BLOCKSIZE * 2))
cursor += tarfile.BLOCKSIZE * 2
blocks, remainder = divmod(cursor, tarfile.RECORDSIZE)
if remainder > 0:
await self.put(tarfile.NUL * (tarfile.RECORDSIZE - remainder))
async def finish(self) -> None:
self.accum += self.deflater.flush()
await self.push(self.accum)
await self.push(struct.pack("<L", self.hash_crc))
await self.push(struct.pack("<L", self.wrote & 0xFFFFFFFF))
class StreamedArchiveResponse(StreamingResponse):
"""An extension of StreamingResponse to archive and compress files on the fly."""
def __init__(
self,
items_to_pack: list[tuple[str, Path]],
out_name: str = "archive.tar.gz",
status_code: int = 200,
headers: Mapping[str, str] | None = None,
media_type: str | None = None,
background: BackgroundTask | None = None,
) -> None:
self.items_to_pack = items_to_pack
self.out_name = out_name
self.status_code = status_code
self.media_type = self.media_type if media_type is None else media_type
self.background = background
self.init_headers(headers)
async def stream_response(self, send: Send) -> None:
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.raw_headers,
}
)
async def emit(buf: bytes):
await send(
{
"type": "http.response.body",
"body": buf,
"more_body": True,
}
)
archiver = GzipTarEmitter(
self.items_to_pack, emit, window=SLICE_BYTES, out_name=self.out_name
)
await archiver.start_gzip()
await archiver.emit_tar_stream()
await archiver.finish()
await send({"type": "http.response.body", "body": b"", "more_body": False})
###
### ПРИЛОЖЕНИЕ FASTAPI
###
api = FastAPI()
@api.get("/")
def download_tar_stream() -> StreamingResponse:
return StreamedArchiveResponse(
items_to_pack=SOURCE_LIST,
media_type="application/tar+gzip",
headers={"Content-Disposition": 'attachment; filename="archive.tar.gz"'},
)
##
## ТЕСТЫ
##
TEST_ARCHIVE_PATH = Path("archive.tar.gz")
http_client = TestClient(api)
def test_archive_stream():
if TEST_ARCHIVE_PATH.exists():
os.remove(TEST_ARCHIVE_PATH)
resp = http_client.get("/")
resp.raise_for_status()
with TEST_ARCHIVE_PATH.open("wb") as fh:
for chunk in resp.iter_bytes(SLICE_BYTES):
fh.write(chunk)
with tarfile.open(TEST_ARCHIVE_PATH, "r:gz") as tf:
names = [ti.name for ti in tf.getmembers()]
expected = [p[0] for p in SOURCE_LIST]
assert set(names) == set(expected)
if __name__ == "__main__":
match RUN_MODE:
case "pytest":
pytest.main([__file__, "-v"])
case "uvicorn":
uvicorn.run("app_module:api", port=8080, reload=True)
case _:
print("Nothing to execute")
Почему это важно
При потоковой передаче больших архивов нельзя позволять себе полностью буферизовать их на диске или в памяти. Цикл событий должен оставаться отзывчивым, а клиент — непрерывно получать данные. Понимание того, что tarfile работает синхронно, а конвейер ответов Starlette ориентирован на асинхронность, помогает избежать дедлоков и потерь данных. Небольшой эмиттер tar+gzip с асинхронной записью даёт предсказуемый бэкпрешcure и аккуратно встраивается в ASGI.
Итоги
Если вам нужен tar.gz «на лету» в FastAPI/Starlette, не пытайтесь заставить tarfile работать в асинхронном контексте и напрямую кормить его ASGI send. Управляйте потоком сами: формируйте tar‑заголовки, копируйте байты файла порциями, добивайте блоки до выравнивания, сжимайте по ходу и отправляйте каждый сжатый кусок через awaitable. Держите размер чанка под контролем, задайте корректные Content-Disposition и media_type и проверьте результат простым тестом. Такой подход экономит память и обеспечивает правильный поток ответа.