2025, Nov 18 19:00

On-the-fly tar.gz streaming in FastAPI/Starlette: a working async pattern with backpressure

Learn to stream tar.gz archives in FastAPI/Starlette using an async pattern that respects ASGI backpressure. Avoid tarfile piping; use StreamingResponse.

Streaming a tar.gz archive straight from FastAPI/Starlette sounds simple until you meet the mismatch between a synchronous tarfile writer and an asynchronous HTTP response. If your code tries to pipe tarfile output directly into StreamingResponse, you quickly discover that TarFile writes to file-like objects via a synchronous write call, while Starlette expects an async generator or an async send. That friction is exactly what derails on-the-fly archiving for large files fetched from another backend.

Reproducing the issue

The idea below is to override the response’s stream_response, create an inner file-like sink, and let tarfile push chunks into it. The sink then forwards those chunks to the ASGI send. It looks reasonable, but it runs into two problems: TarFile is synchronous, and tarfile.open does not support async with.

import asyncio
import os
import tarfile
from pathlib import Path
from typing import Mapping

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.testclient import TestClient
from starlette.background import BackgroundTask
from starlette.types import Send

ARCHIVE_PATH = Path("archive.tar.gz")
SLICE_BYTES = 1024

api = FastAPI()

INPUT_ITEMS = [("tarfile.py", Path(tarfile.__file__)), ("os.py", Path(os.__file__))]


class TarStreamingResponse(StreamingResponse):
    def __init__(
        self,
        paths_for_tar: list[tuple[str, Path]],
        status_code: int = 200,
        headers: Mapping[str, str] | None = None,
        media_type: str | None = None,
        background: BackgroundTask | None = None,
    ) -> None:
        self.paths_for_tar = paths_for_tar
        self.status_code = status_code
        self.media_type = self.media_type if media_type is None else media_type
        self.background = background
        self.init_headers(headers)

    async def stream_response(self, send: Send) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )

        class ChunkSink:
            def write(self, buffer):
                print(f"Really sending {len(buffer)} bytes")
                asyncio.run_coroutine_threadsafe(
                    send(
                        {
                            "type": "http.response.body",
                            "body": buffer,
                            "more_body": True,
                        }
                    ),
                    asyncio.get_running_loop(),
                )

        async with tarfile.open(
            mode="w|gz", fileobj=ChunkSink(), bufsize=SLICE_BYTES
        ) as tar_fp:
            for name, src in self.paths_for_tar:
                await tar_fp.addfile(tarfile.TarInfo(name), src.open("rb"))

        await send({"type": "http.response.body", "body": b"", "more_body": False})


@api.get("/")
def deliver_tar() -> StreamingResponse:
    return TarStreamingResponse(
        paths_for_tar=INPUT_ITEMS,
        media_type="application/tar+gzip",
        headers={"Content-Disposition": f'attachment; filename="{ARCHIVE_PATH.name}"'},
    )

Trying this approach leads to errors when treating tarfile.open as an async context manager. You will see a failure like this:

TypeError: 'TarFile' object does not support the asynchronous context manager protocol

Even if you remove async with, you still face the fundamental mismatch: a synchronous TarFile is pushing bytes into a file-like .write, while the ASGI send is an awaitable. Scheduling send with run_coroutine_threadsafe does not give the required backpressure or ordering guarantees here, and data may never be flushed as you expect.

Why this fails

The core friction is architectural. In stream mode (w|gz), TarFile writes synchronously to a file-like object. Starlette’s StreamingResponse, however, is designed around async generators or an async send callable inside an asyncio-driven event loop. TarFile is not async-aware and cannot be awaited, and tarfile.open is not an async context manager. Mixing sync writes and an async send without a proper bridge either blocks or loses the expected flow control.

Adding await in front of TarFile APIs or wrapping tarfile.open in async with won’t fix it. Rewriting TarFile itself to be async-aware would be far more work than it’s worth.

The working approach

The reliable way forward is to drive the tar and gzip stream yourself and surface an async interface that can feed the ASGI send. Instead of asking TarFile to push into a sink, build a tiny tar+gzip streamer that emits compressed chunks via an async callback. The response simply awaits that callback for each piece, and the network backpressure is respected.

Complete implementation

The following code reads a set of files in chunks, writes a tar stream, compresses it on the fly with zlib, and streams it through a custom StreamingResponse subclass. It uses pieces from tarfile.TarFile and tarfile._Stream to construct headers and padding while managing gzip framing, CRC, and size accounting.

# /// script
# requires-python = ">=3.12"
# dependencies = [
#     "fastapi",
#     "httpx",
#     "pytest",
#     "uvicorn",
# ]
# ///
"""A script to read files by chunks, archive them, compress them and send them as a streaming response on the fly.

To run it, use the uv package manager and run `uv run main.py`.
"""

import os
import struct
import tarfile
import time
import zlib
from pathlib import Path
from typing import Callable, Generator, Mapping

import pytest
import uvicorn
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.testclient import TestClient
from starlette.background import BackgroundTask
from starlette.types import Send

### PARAMETERS ###

# The list of file paths to be read and tar-gz and streamed
SOURCE_LIST = [("tarfile.py", Path(tarfile.__file__)), ("os.py", Path(os.__file__))]

# The size of a chunk to be sent as a response
SLICE_BYTES = 1024

# Mode: pytest or uvicorn
RUN_MODE = "pytest"

##
## ADAPTER
##


def iter_file_bytes(file_path: Path, window: int) -> Generator[bytes, None, None]:
    """Read a file and return a generator with bytes."""
    with open(file_path, "rb") as fh:
        while chunk := fh.read(window):
            yield chunk


class GzipTarEmitter:
    """An object to read files and generate tar-gz chunks.

    This is written using pieces of `tarfile.TarFile` and `tarfile._Stream`.
    """

    def __init__(
        self,
        items_to_pack: list[tuple[str, Path]],
        push: Callable[[bytes], None],
        window: int = SLICE_BYTES,
        level: int = 9,
        out_name: str = "archive.tar.gz",
    ) -> None:
        self.items_to_pack = items_to_pack
        self.push = push
        self.window = window
        self.out_name = out_name

        self.accum = b""
        self.wrote = 0

        self.level = level
        self.exception = zlib.error

        self.hash_crc = zlib.crc32(b"")

    async def put(self, chunk: bytes) -> None:
        self.hash_crc = zlib.crc32(chunk, self.hash_crc)
        self.wrote += len(chunk)

        chunk = self.deflater.compress(chunk)
        await self._drip(chunk)

    async def _drip(self, chunk: bytes) -> None:
        self.accum += chunk
        while len(self.accum) > self.window:
            await self.push(self.accum[: self.window])
            self.accum = self.accum[self.window :]

    async def start_gzip(self) -> None:
        self.deflater = zlib.compressobj(
            self.level,
            zlib.DEFLATED,
            -zlib.MAX_WBITS,
            zlib.DEF_MEM_LEVEL,
            0,
        )
        timestamp = struct.pack("<L", int(time.time()))
        await self._drip(b"\037\213\010\010" + timestamp + b"\002\377")

        if self.out_name.endswith(".gz"):
            self.out_name = self.out_name[:-3]
        self.out_name = os.path.basename(self.out_name)
        await self._drip(self.out_name.encode("iso-8859-1", "replace") + tarfile.NUL)

    async def emit_tar_stream(self) -> None:
        cursor = 0
        window = SLICE_BYTES

        for name, input_path in self.items_to_pack:
            info = tarfile.TarInfo(name)
            info.size = input_path.stat().st_size
            info.type = tarfile.REGTYPE

            header = info.tobuf()
            await self.put(header)
            cursor += len(header)

            for piece in iter_file_bytes(input_path, window):
                await self.put(piece)
            cursor += info.size

            blocks, remainder = divmod(info.size, tarfile.BLOCKSIZE)
            if remainder > 0:
                await self.put(tarfile.NUL * (tarfile.BLOCKSIZE - remainder))
                blocks += 1
            cursor += blocks * tarfile.BLOCKSIZE

        await self.put(tarfile.NUL * (tarfile.BLOCKSIZE * 2))
        cursor += tarfile.BLOCKSIZE * 2

        blocks, remainder = divmod(cursor, tarfile.RECORDSIZE)
        if remainder > 0:
            await self.put(tarfile.NUL * (tarfile.RECORDSIZE - remainder))

    async def finish(self) -> None:
        self.accum += self.deflater.flush()
        await self.push(self.accum)
        await self.push(struct.pack("<L", self.hash_crc))
        await self.push(struct.pack("<L", self.wrote & 0xFFFFFFFF))


class StreamedArchiveResponse(StreamingResponse):
    """An extension of StreamingResponse to archive and compress files on the fly."""

    def __init__(
        self,
        items_to_pack: list[tuple[str, Path]],
        out_name: str = "archive.tar.gz",
        status_code: int = 200,
        headers: Mapping[str, str] | None = None,
        media_type: str | None = None,
        background: BackgroundTask | None = None,
    ) -> None:
        self.items_to_pack = items_to_pack
        self.out_name = out_name
        self.status_code = status_code
        self.media_type = self.media_type if media_type is None else media_type
        self.background = background
        self.init_headers(headers)

    async def stream_response(self, send: Send) -> None:
        await send(
            {
                "type": "http.response.start",
                "status": self.status_code,
                "headers": self.raw_headers,
            }
        )

        async def emit(buf: bytes):
            await send(
                {
                    "type": "http.response.body",
                    "body": buf,
                    "more_body": True,
                }
            )

        archiver = GzipTarEmitter(
            self.items_to_pack, emit, window=SLICE_BYTES, out_name=self.out_name
        )
        await archiver.start_gzip()
        await archiver.emit_tar_stream()
        await archiver.finish()

        await send({"type": "http.response.body", "body": b"", "more_body": False})


###
### THE FASTAPI APPLICATION
###

api = FastAPI()


@api.get("/")
def download_tar_stream() -> StreamingResponse:
    return StreamedArchiveResponse(
        items_to_pack=SOURCE_LIST,
        media_type="application/tar+gzip",
        headers={"Content-Disposition": 'attachment; filename="archive.tar.gz"'},
    )


##
## TESTS
##

TEST_ARCHIVE_PATH = Path("archive.tar.gz")
http_client = TestClient(api)


def test_archive_stream():
    if TEST_ARCHIVE_PATH.exists():
        os.remove(TEST_ARCHIVE_PATH)

    resp = http_client.get("/")
    resp.raise_for_status()

    with TEST_ARCHIVE_PATH.open("wb") as fh:
        for chunk in resp.iter_bytes(SLICE_BYTES):
            fh.write(chunk)

    with tarfile.open(TEST_ARCHIVE_PATH, "r:gz") as tf:
        names = [ti.name for ti in tf.getmembers()]

    expected = [p[0] for p in SOURCE_LIST]
    assert set(names) == set(expected)


if __name__ == "__main__":
    match RUN_MODE:
        case "pytest":
            pytest.main([__file__, "-v"])
        case "uvicorn":
            uvicorn.run("app_module:api", port=8080, reload=True)
        case _:
            print("Nothing to execute")

Why this matters

When you stream large archives, you can’t afford to buffer them fully on disk or in memory. The event loop must stay responsive and the client must receive data continuously. Understanding that tarfile operates synchronously and that Starlette’s response pipeline is async-focused helps avoid deadlocks and missing data. Building a small tar+gzip emitter that exposes an async write gives you predictable backpressure and a clean integration point with ASGI.

Wrap-up

If you need on-the-fly tar.gz over FastAPI/Starlette, don’t try to coerce tarfile into an async context or feed it an ASGI send directly. Drive the stream yourself: generate tar headers, copy file bytes in chunks, pad to blocks, compress as you go, and forward each compressed chunk via an awaitable. Keep chunk size under control, set the correct Content-Disposition and media_type, and verify the resulting archive with a quick test. This approach keeps memory overhead low and the response flow correct.