2026, Jan 01 15:00
Secure ZIP Uploads in Your REST API: Python Approach to Detect Overlapping Records and Zip Bombs
Learn how to validate ZIP uploads in Python by detecting overlapping records (Fifield-style zip bombs) via central directory parsing—secure your REST API.
Handling user-supplied ZIP uploads in a REST API sounds straightforward until you factor in crafted archives like Fifield-style zip bombs. These files exploit overlapping entry layouts in the ZIP structure, tricking naive extractors into doing expensive or unsafe work. If your goal is to validate ZIP files with standard Python tooling before extraction, you need a structural check that rejects archives with overlapping records.
Baseline: what a naive extractor looks like
The following snippet simply unpacks everything. It illustrates the gap: there is no validation that would block overlapping-entry bombs.
import zipfile
def naive_unpack(src_path, target_dir):
with zipfile.ZipFile(src_path) as z:
z.extractall(target_dir)
What actually goes wrong
Fifield’s construction uses overlapping file records. The archive is invalid according to the ZIP format, yet many readers will still attempt to process it. The reliable way to detect this pattern is to parse the central directory, recover the local file entry ranges, and then verify that none of those ranges overlap. Any overlap means the ZIP is invalid and, in this context, likely a bomb. You can do this by locating the end-of-central-directory, following Zip64 records when present, reading each central directory entry, mapping local headers and data descriptors to byte spans, and finally checking the spans for collisions. If overlaps are found, reject the archive before extraction.
Solution: detect overlapping records before extraction
The code below inspects a ZIP file for overlapping records. If it returns True, consider the file a zip bomb and refuse to extract. If it returns False, the file passed the overlap test. If it returns None, the file is invalid or unsupported in this context.
import sys
import io
import os
import struct
def detect_bomb(zio):
def parse_end(payload, at):
hdr = struct.unpack('<HHHHLLH', payload)
if hdr[0] != 0 or hdr[1] != 0 or at + 22 + hdr[6] > fsz:
return False
nonlocal marks
marks.append((at, at + 22 + hdr[6]))
hdr = hdr[2:6]
if hdr[0] == 0xFFFF or hdr[1] == 0xFFFF or hdr[2] == 0xFFFFFFFF or hdr[3] == 0xFFFFFFFF:
if at < 20:
return False
zio.seek(at - 20, os.SEEK_SET)
loc = struct.unpack('<LLQL', zio.read(20))
if loc[0] != 0x07064B50 or loc[1] != 0 or loc[3] != 1 or loc[2] + 56 > fsz:
return False
marks.append((at - 20, at))
zio.seek(loc[2], os.SEEK_SET)
z64 = struct.unpack('<LQHHLLQQQQ', zio.read(56))
if z64[0] != 0x06064B50 or z64[1] < 44 or z64[4] != 0 or z64[5] != 0:
return False
marks.append((loc[2], loc[2] + 12 + z64[1]))
hdr = z64[6:10]
if hdr[0] != hdr[1] or hdr[3] + hdr[2] > fsz:
return False
marks.append((hdr[3], hdr[3] + hdr[2]))
marks.reverse()
return hdr[1:]
def locate_cd():
block = 8192
beg = fsz
back = 22
buf = b''
i = 1
while True:
beg -= back
if beg < 0:
return False
zio.seek(beg, os.SEEK_SET)
buf = zio.read(back) + buf[:21]
while i > 0:
i -= 1
if buf[i] == 0x50 and buf[i + 1] == 0x4B and buf[i + 2] == 5 and buf[i + 3] == 6:
got = parse_end(buf[i + 4:i + 22], beg + i)
if got:
return got
back = ((beg - 1) & (block - 1)) + 1
i = back
marks = []
zio.seek(0, os.SEEK_END)
fsz = zio.tell()
cd = locate_cd()
if not cd:
return None
regions = []
(cnt, cdlen, cdoff) = cd
zio.seek(cdoff, os.SEEK_SET)
cdbuf = zio.read(cdlen)
i = 0
while cnt > 0:
if i + 46 > cdlen:
break
head = struct.unpack('<LHHHHHHLLLHHHHHLL', cdbuf[i:i + 46])
i += 46
if head[0] != 0x02014B50:
break
extra_skip = head[10] + head[11] + head[12]
if i + extra_skip > cdlen:
break
clen = head[8]
ulen = head[9]
disk = head[13]
off = head[16]
if clen == 0xFFFFFFFF or ulen == 0xFFFFFFFF or disk == 0xFFFF or off == 0xFFFFFFFF:
ok = False
i += head[10]
x_end = i + head[11]
while i + 4 <= x_end:
(xtag, xsz) = struct.unpack('<HH', cdbuf[i:i + 4])
i += 4
if i + xsz > x_end:
break
xstop = i + xsz
if xtag == 1:
if ulen == 0xFFFFFFFF:
if i + 8 > xstop:
break
ulen = struct.unpack('<Q', cdbuf[i:i + 8])[0]
i += 8
if clen == 0xFFFFFFFF:
if i + 8 > xstop:
break
clen = struct.unpack('<Q', cdbuf[i:i + 8])[0]
i += 8
if off == 0xFFFFFFFF:
if i + 8 > xstop:
break
off = struct.unpack('<Q', cdbuf[i:i + 8])[0]
i += 8
if disk == 0xFFFF:
if i + 4 > xstop:
break
off = struct.unpack('<L', cdbuf[i:i + 4])[0]
i += 4
if i != xstop:
break
ok = True
break
else:
i = xstop
if not ok:
break
i = x_end + head[12]
else:
i += extra_skip
if disk != 0:
break
if off + 30 > fsz:
break
zio.seek(off, os.SEEK_SET)
lhdr = struct.unpack('<LHHHHHLLLHH', zio.read(30))
if lhdr[0] != 0x04034B50:
break
lend = off + 30 + lhdr[9] + lhdr[10] + clen
if lend > fsz:
break
if lhdr[2] & 8 != 0:
crc = head[7]
zio.seek(lend, os.SEEK_SET)
tail = zio.read(24)
d24 = struct.unpack('<LLQQ', tail[:24]) if len(tail) == 24 else ()
d20 = struct.unpack('<LQQ', tail[:20]) if len(tail) >= 20 else ()
d16 = struct.unpack('<LLLL', tail[:16]) if len(tail) >= 16 else ()
d12 = struct.unpack('<LLL', tail[:12]) if len(tail) >= 12 else ()
if len(tail) == 24 and d24[0] == 0x08074B50 and d24[1] == crc and d24[2] == clen and d24[3] == ulen:
lend += 24
elif len(tail) >= 20 and d20[0] == crc and d20[1] == clen and d20[2] == ulen:
lend += 20
elif len(tail) >= 16 and d16[0] == 0x08074B50 and d16[1] == crc and d16[2] == clen and d16[3] == ulen:
lend += 16
elif len(tail) >= 12 and d12[0] == crc and d12[1] == clen and d12[2] == ulen:
lend += 12
else:
break
regions.append((off, lend))
cnt -= 1
else:
if i == cdlen:
regions += marks
regions.sort()
cur = regions[0]
if chatty and cur[0] != 0:
print(f'!! {zio.name} has {cur[0]} unused byte' f'{"" if cur[0] == 1 else "s"} at the start', file=sys.stderr)
for nxt in regions[1:]:
if cur[1] > nxt[0]:
return True
elif chatty and cur[1] < nxt[0]:
gap = nxt[0] - cur[1]
print(f'!! {zio.name} has {gap} unused byte' f'{"" if gap == 1 else "s"} between records', file=sys.stderr)
cur = nxt
if chatty and cur[1] != fsz:
tailgap = fsz - cur[1]
print(f'!! {zio.name} has {tailgap} unused byte' f'{"" if tailgap == 1 else "s"} at the end', file=sys.stderr)
return False
return None
chatty = False
for arg in sys.argv[1:]:
if arg.startswith('-'):
if arg == '-v':
chatty = True
else:
print(f'?? unknown option: {arg}', file=sys.stderr)
for arg in sys.argv[1:]:
if arg.startswith('-'):
continue
with open(arg, 'rb') as zf:
verdict = detect_bomb(zf)
if verdict is None:
print(f'{arg} is not a zip file or is invalid or unsupported')
elif verdict:
print(f'{arg} is a zip bomb! ** do not extract **')
else:
print(f'{arg} is good')
How this protects your API
The detector reads the central directory to locate each entry, resolves Zip64 metadata when needed, computes the precise byte spans of all local file records including any data descriptors, and then checks whether those spans overlap. If they do, the file is invalid and likely a bomb. You can run this check prior to any extraction and decline suspicious uploads.
If you prefer external tooling, another practical approach is to run unzip with a bomb check before extraction; adapting that logic is also possible. A pragmatic safeguard on top of structural checks is to enforce a ceiling on the amount of extracted data your service will accept, which limits blast radius even if something slips through.
Why you want this in production
ZIP upload endpoints are a natural target because many servers trust the format and go straight to extraction. Structural validation closes that gap with a deterministic test that does not rely on heuristics or outdated libraries. By refusing archives with overlapping records, you reduce risk without changing your extraction workflow.
Practical wrap-up
Validate before you unpack. Run an overlap check against the central directory to detect Fifield-style bombs. If the file is flagged, reject it and move on. Complement this with a hard cap on total extracted size in your pipeline. This small layer of scrutiny pays off the first time a malicious archive hits your endpoint.