2026, Jan 01 15:00

Secure ZIP Uploads in Your REST API: Python Approach to Detect Overlapping Records and Zip Bombs

Learn how to validate ZIP uploads in Python by detecting overlapping records (Fifield-style zip bombs) via central directory parsing—secure your REST API.

Handling user-supplied ZIP uploads in a REST API sounds straightforward until you factor in crafted archives like Fifield-style zip bombs. These files exploit overlapping entry layouts in the ZIP structure, tricking naive extractors into doing expensive or unsafe work. If your goal is to validate ZIP files with standard Python tooling before extraction, you need a structural check that rejects archives with overlapping records.

Baseline: what a naive extractor looks like

The following snippet simply unpacks everything. It illustrates the gap: there is no validation that would block overlapping-entry bombs.

import zipfile

def naive_unpack(src_path, target_dir):
    with zipfile.ZipFile(src_path) as z:
        z.extractall(target_dir)

What actually goes wrong

Fifield’s construction uses overlapping file records. The archive is invalid according to the ZIP format, yet many readers will still attempt to process it. The reliable way to detect this pattern is to parse the central directory, recover the local file entry ranges, and then verify that none of those ranges overlap. Any overlap means the ZIP is invalid and, in this context, likely a bomb. You can do this by locating the end-of-central-directory, following Zip64 records when present, reading each central directory entry, mapping local headers and data descriptors to byte spans, and finally checking the spans for collisions. If overlaps are found, reject the archive before extraction.

Solution: detect overlapping records before extraction

The code below inspects a ZIP file for overlapping records. If it returns True, consider the file a zip bomb and refuse to extract. If it returns False, the file passed the overlap test. If it returns None, the file is invalid or unsupported in this context.

import sys
import io
import os
import struct


def detect_bomb(zio):
    def parse_end(payload, at):
        hdr = struct.unpack('<HHHHLLH', payload)
        if hdr[0] != 0 or hdr[1] != 0 or at + 22 + hdr[6] > fsz:
            return False
        nonlocal marks
        marks.append((at, at + 22 + hdr[6]))
        hdr = hdr[2:6]
        if hdr[0] == 0xFFFF or hdr[1] == 0xFFFF or hdr[2] == 0xFFFFFFFF or hdr[3] == 0xFFFFFFFF:
            if at < 20:
                return False
            zio.seek(at - 20, os.SEEK_SET)
            loc = struct.unpack('<LLQL', zio.read(20))
            if loc[0] != 0x07064B50 or loc[1] != 0 or loc[3] != 1 or loc[2] + 56 > fsz:
                return False
            marks.append((at - 20, at))
            zio.seek(loc[2], os.SEEK_SET)
            z64 = struct.unpack('<LQHHLLQQQQ', zio.read(56))
            if z64[0] != 0x06064B50 or z64[1] < 44 or z64[4] != 0 or z64[5] != 0:
                return False
            marks.append((loc[2], loc[2] + 12 + z64[1]))
            hdr = z64[6:10]
        if hdr[0] != hdr[1] or hdr[3] + hdr[2] > fsz:
            return False
        marks.append((hdr[3], hdr[3] + hdr[2]))
        marks.reverse()
        return hdr[1:]

    def locate_cd():
        block = 8192
        beg = fsz
        back = 22
        buf = b''
        i = 1
        while True:
            beg -= back
            if beg < 0:
                return False
            zio.seek(beg, os.SEEK_SET)
            buf = zio.read(back) + buf[:21]
            while i > 0:
                i -= 1
                if buf[i] == 0x50 and buf[i + 1] == 0x4B and buf[i + 2] == 5 and buf[i + 3] == 6:
                    got = parse_end(buf[i + 4:i + 22], beg + i)
                    if got:
                        return got
            back = ((beg - 1) & (block - 1)) + 1
            i = back

    marks = []
    zio.seek(0, os.SEEK_END)
    fsz = zio.tell()
    cd = locate_cd()
    if not cd:
        return None

    regions = []
    (cnt, cdlen, cdoff) = cd
    zio.seek(cdoff, os.SEEK_SET)
    cdbuf = zio.read(cdlen)
    i = 0
    while cnt > 0:
        if i + 46 > cdlen:
            break
        head = struct.unpack('<LHHHHHHLLLHHHHHLL', cdbuf[i:i + 46])
        i += 46
        if head[0] != 0x02014B50:
            break
        extra_skip = head[10] + head[11] + head[12]
        if i + extra_skip > cdlen:
            break
        clen = head[8]
        ulen = head[9]
        disk = head[13]
        off = head[16]
        if clen == 0xFFFFFFFF or ulen == 0xFFFFFFFF or disk == 0xFFFF or off == 0xFFFFFFFF:
            ok = False
            i += head[10]
            x_end = i + head[11]
            while i + 4 <= x_end:
                (xtag, xsz) = struct.unpack('<HH', cdbuf[i:i + 4])
                i += 4
                if i + xsz > x_end:
                    break
                xstop = i + xsz
                if xtag == 1:
                    if ulen == 0xFFFFFFFF:
                        if i + 8 > xstop:
                            break
                        ulen = struct.unpack('<Q', cdbuf[i:i + 8])[0]
                        i += 8
                    if clen == 0xFFFFFFFF:
                        if i + 8 > xstop:
                            break
                        clen = struct.unpack('<Q', cdbuf[i:i + 8])[0]
                        i += 8
                    if off == 0xFFFFFFFF:
                        if i + 8 > xstop:
                            break
                        off = struct.unpack('<Q', cdbuf[i:i + 8])[0]
                        i += 8
                    if disk == 0xFFFF:
                        if i + 4 > xstop:
                            break
                        off = struct.unpack('<L', cdbuf[i:i + 4])[0]
                        i += 4
                    if i != xstop:
                        break
                    ok = True
                    break
                else:
                    i = xstop
            if not ok:
                break
            i = x_end + head[12]
        else:
            i += extra_skip
        if disk != 0:
            break
        if off + 30 > fsz:
            break
        zio.seek(off, os.SEEK_SET)
        lhdr = struct.unpack('<LHHHHHLLLHH', zio.read(30))
        if lhdr[0] != 0x04034B50:
            break
        lend = off + 30 + lhdr[9] + lhdr[10] + clen
        if lend > fsz:
            break
        if lhdr[2] & 8 != 0:
            crc = head[7]
            zio.seek(lend, os.SEEK_SET)
            tail = zio.read(24)
            d24 = struct.unpack('<LLQQ', tail[:24]) if len(tail) == 24 else ()
            d20 = struct.unpack('<LQQ', tail[:20]) if len(tail) >= 20 else ()
            d16 = struct.unpack('<LLLL', tail[:16]) if len(tail) >= 16 else ()
            d12 = struct.unpack('<LLL', tail[:12]) if len(tail) >= 12 else ()
            if len(tail) == 24 and d24[0] == 0x08074B50 and d24[1] == crc and d24[2] == clen and d24[3] == ulen:
                lend += 24
            elif len(tail) >= 20 and d20[0] == crc and d20[1] == clen and d20[2] == ulen:
                lend += 20
            elif len(tail) >= 16 and d16[0] == 0x08074B50 and d16[1] == crc and d16[2] == clen and d16[3] == ulen:
                lend += 16
            elif len(tail) >= 12 and d12[0] == crc and d12[1] == clen and d12[2] == ulen:
                lend += 12
            else:
                break
        regions.append((off, lend))
        cnt -= 1
    else:
        if i == cdlen:
            regions += marks
            regions.sort()
            cur = regions[0]
            if chatty and cur[0] != 0:
                print(f'!! {zio.name} has {cur[0]} unused byte' f'{"" if cur[0] == 1 else "s"} at the start', file=sys.stderr)
            for nxt in regions[1:]:
                if cur[1] > nxt[0]:
                    return True
                elif chatty and cur[1] < nxt[0]:
                    gap = nxt[0] - cur[1]
                    print(f'!! {zio.name} has {gap} unused byte' f'{"" if gap == 1 else "s"} between records', file=sys.stderr)
                cur = nxt
            if chatty and cur[1] != fsz:
                tailgap = fsz - cur[1]
                print(f'!! {zio.name} has {tailgap} unused byte' f'{"" if tailgap == 1 else "s"} at the end', file=sys.stderr)
            return False
    return None


chatty = False
for arg in sys.argv[1:]:
    if arg.startswith('-'):
        if arg == '-v':
            chatty = True
        else:
            print(f'?? unknown option: {arg}', file=sys.stderr)

for arg in sys.argv[1:]:
    if arg.startswith('-'):
        continue
    with open(arg, 'rb') as zf:
        verdict = detect_bomb(zf)
        if verdict is None:
            print(f'{arg} is not a zip file or is invalid or unsupported')
        elif verdict:
            print(f'{arg} is a zip bomb! ** do not extract **')
        else:
            print(f'{arg} is good')

How this protects your API

The detector reads the central directory to locate each entry, resolves Zip64 metadata when needed, computes the precise byte spans of all local file records including any data descriptors, and then checks whether those spans overlap. If they do, the file is invalid and likely a bomb. You can run this check prior to any extraction and decline suspicious uploads.

If you prefer external tooling, another practical approach is to run unzip with a bomb check before extraction; adapting that logic is also possible. A pragmatic safeguard on top of structural checks is to enforce a ceiling on the amount of extracted data your service will accept, which limits blast radius even if something slips through.

Why you want this in production

ZIP upload endpoints are a natural target because many servers trust the format and go straight to extraction. Structural validation closes that gap with a deterministic test that does not rely on heuristics or outdated libraries. By refusing archives with overlapping records, you reduce risk without changing your extraction workflow.

Practical wrap-up

Validate before you unpack. Run an overlap check against the central directory to detect Fifield-style bombs. If the file is flagged, reject it and move on. Complement this with a hard cap on total extracted size in your pipeline. This small layer of scrutiny pays off the first time a malicious archive hits your endpoint.