2026, Jan 04 17:00
Time-Series Gap Detection in Python: Align streams fast with Numpy vectorization and np.searchsorted
Learn how to align time-series and flag timestamp gaps fast in Python using Numpy vectorization and binary search with np.searchsorted. Cut O(n·m) to O(m log n)
When you align time-series from multiple instruments, you often need to flag stretches in the primary stream that have no counterpart in a secondary stream within a tolerance window. The naïve approach — a nested scan that checks each timestamp in A against all timestamps in B — quickly collapses at scale. With hundreds of thousands of rows, an O(n·m) pass is simply too slow. Below is a practical walkthrough that keeps the logic intact but replaces the bottlenecks with more efficient Numpy-based strategies.
Baseline: clear but slow
The baseline scans the main stream and, for each item, computes the minimum distance to any timestamp in the test stream. If the minimum exceeds a threshold, it accumulates a gap span to flag later.
from datetime import datetime, timedelta
import time
# Synthetic data window
start_dt = datetime(2025, 4, 25, 0, 0, 0)
end_dt = datetime(2025, 4, 25, 2, 0, 0)
step = timedelta(seconds=1)
# Main dataset (uniform per-second timestamps)
series_A = []
cursor = start_dt
while cursor <= end_dt:
cursor += step
series_A.append(cursor)
# Secondary dataset with gaps inserted
series_B = series_A.copy()
del series_B[30:300]
del series_B[3000:3300]
def scan_gaps_baseline(seqA, seqB):
spans = []
span_start = -1
acc = 0
idx, span_end = 0, 0
window = timedelta(seconds=30)
for idx, tA in enumerate(seqA):
diffs = [abs(tB - tA) for tB in seqB]
if min(diffs) > window:
acc += 1
if span_start == -1:
span_start = idx
span_end = idx
else:
if span_start != -1:
pair = [seqA[span_start], seqA[span_end]]
msg = f' Flag data from {pair[0]} to {pair[1]}'
print(msg)
spans.append(pair)
span_start = -1
if span_start != -1 and span_end == idx: # trailing bad span
pair = [seqA[span_start], seqA[span_end]]
spans.append(pair)
return spans
# Demo timing
bad_ranges = None
tic = time.process_time()
bad_ranges = scan_gaps_baseline(series_A, series_B)
print(f'Loops: {len(series_A)} x {len(series_B)} = {len(series_A)*len(series_B)}')
print(f'Uncertainty Update Elapsed Time: {time.process_time() - tic:.3f} s')
This produces outputs like:
Flag data from 2025-04-25 00:01:01 to 2025-04-25 00:04:30
Flag data from 2025-04-25 00:55:01 to 2025-04-25 00:59:00
Loops: 7201 x 6631 = 47749831
Uncertainty Update Elapsed Time: 5.167 s
Why this is slow
The per-item scan over the entire comparison set gives an O(n) cost for each element in the main series, which turns the overall runtime into O(n·m). With hundreds of thousands of timestamps, the nested work dominates. The structure for collecting spans is fine; the inner distance computation is the choke point.
Speedup 1: vectorize the distance check with Numpy
A first practical win is to replace Python’s inner loop with a vectorized operation. Convert the comparison set to a Numpy array once, then compute absolute differences in a single step. The outer logic stays the same.
import numpy as np
def scan_gaps_numpy(seqA, seqB):
spans = []
span_start = -1
acc = 0
idx, span_end = 0, 0
window = timedelta(seconds=30)
arrB = np.array(seqB, dtype=np.datetime64)
for idx, tA in enumerate(seqA):
tA64 = np.datetime64(tA)
diffs = np.abs(arrB - tA64)
if np.min(diffs) > window:
acc += 1
if span_start == -1:
span_start = idx
span_end = idx
else:
if span_start != -1:
pair = [seqA[span_start], seqA[span_end]]
msg = f' Flag data from {pair[0]} to {pair[1]}'
print(msg)
spans.append(pair)
span_start = -1
if span_start != -1 and span_end == idx: # trailing bad span
pair = [seqA[span_start], seqA[span_end]]
spans.append(pair)
return spans
This is 20 times faster and returns the same flagged intervals. You keep the outer loop for span aggregation while removing the inner O(n) comprehension.
Speedup 2: binary search on sorted data with np.searchsorted
There is an even larger gain by changing the algorithmic approach. Instead of scanning all items in the test series for each point in the main series, sort the test series once and use binary search per element to find the nearest neighbors. The per-item cost drops to O(log n), so the total becomes O(m log n). Thanks to Numpy’s np.searchsorted, you can do this in a vectorized way.
def scan_gaps_search(seqA, seqB):
spans = []
span_start = -1
acc = 0
idx, span_end = 0, 0
window = timedelta(seconds=30)
npB = np.array(seqB, dtype=np.datetime64)
npB.sort()
npA = np.array(seqA, dtype=np.datetime64)
pos = np.searchsorted(npB, npA, side='right')
# Check neighbors around the insertion point
p1 = np.maximum(pos - 1, 0)
p2 = np.minimum(pos, npB.size - 1)
p3 = np.minimum(pos + 1, npB.size - 1)
d1 = np.abs(npB[p1] - npA)
d2 = np.abs(npB[p2] - npA)
d3 = np.abs(npB[p3] - npA)
m12 = np.minimum(d1, d2)
dmin = np.minimum(m12, d3)
for idx in range(len(npA)):
if dmin[idx] > window:
acc += 1
if span_start == -1:
span_start = idx
span_end = idx
else:
if span_start != -1:
pair = [seqA[span_start], seqA[span_end]]
msg = f' Flag data from {pair[0]} to {pair[1]}'
print(msg)
spans.append(pair)
span_start = -1
if span_start != -1 and span_end == idx: # trailing bad span
pair = [seqA[span_start], seqA[span_end]]
spans.append(pair)
return spans
This approach is about 90 times faster. The span-building logic is unchanged; only the nearest-neighbor check is replaced by a sorted lookup.
What the profiling tells us and how to squeeze more
Most of the time is actually spent in converting Python lists of datetimes into Numpy arrays. If that can be amortized or precomputed, the speedup grows further (greater than 200 times). Past the conversions, the remaining overhead primarily comes from the Python loop that stitches contiguous indices into ranges.
If that still does not meet the target, you can push conversions and the remaining loop down to compiled territory. Parallelizing conversions across threads and moving the hot paths to Cython are viable options, and they specifically help with speeding up the loop and the creation of typed arrays. In the end, the bulk of the runtime will be tied to reading pure-Python datetime objects.
There is also a way to cut array creation costs by first extracting timestamps as float, then casting to a datetime64 array. This reduces conversion overhead significantly; on one test setup it is about 6 times faster, with roughly 90% of the time spent just calling the datetime method. Precision is on the order of microseconds.
stamps = [t.timestamp() for t in seqB] # expensive part
npB = np.array(np.array(stamps, np.float64), dtype='datetime64[s]')
Why this matters
In real pipelines, you rarely compare only a few hundred points. Aligning streams from different instruments means millions of checks if you do it naïvely. Moving from O(n·m) scans to O(m log n) searches turns a painful post-processing step into a routine operation. The thresholding and span aggregation logic remains straightforward and auditable, while the bottleneck is handled by well-optimized primitives.
Note that the example focuses on two streams, even though the scenario involves multiple instruments. If you plan to extend it, consider how you will pair or fan out the comparisons across the relevant streams.
Takeaways
Start with a clear definition of the tolerance window for matching timestamps. Keep the span aggregation logic as is; it’s simple and reliable. Replace the inner per-item scan with vectorized operations. If vectorization alone is not enough, sort and use np.searchsorted to move to logarithmic lookups. Be mindful that list-to-array conversions can dominate runtime; precompute or accelerate them when possible. With these steps, you preserve correctness while achieving order-of-magnitude speed improvements.