2025, Nov 10 03:03

Синхронизация асинхронных потоков в pandas без циклов

Как в pandas векторно выровнять события A относительно последнего B: ключ по cumsum, transform('first') и расчёт смещений без циклов, быстро и понятно.

Когда два потока данных обновляются с разной частотой, синхронизировать их бывает неожиданно сложно. Представьте смешанную последовательность строк из источников A и B, и для каждого события A нужно вычислить разницу во времени до последнего предшествующего B. Данные не строго периодичны: присутствует джиттер, поэтому фиксированный шаг или простой сдвиг не спасут. Решение должно обходиться без итераций и опираться на векторизованные операции pandas.

Пример набора данных

На входе — строки из двух источников с временными метками. Для каждой строки A требуется сдвиг относительно последнего B, который произошёл раньше.

src,idx,ts
B,1,20
A,1,100
A,2,200
A,3,300
B,2,320
A,4,400
A,5,500
A,6,600
B,3,620

Ожидаемый результат оставляет только строки A и показывает ts как смещение относительно предыдущего B:

src,idx,ts
A,1,80
A,2,180
A,3,280
A,4,80
A,5,180
A,6,280

Почему это нетривиально

Нужно посчитать построчную разницу, где каждая A ссылается на метку времени ближайшего предыдущего B. Из‑за джиттера нельзя полагаться на фиксированные окна или позиционные вычисления. На практике требуется разбить последовательность на смежные участки, которые начинаются на каждом B и продолжаются до следующего B (не включая его), затем вычесть первую метку времени участка из всех меток внутри этого участка. В завершение убрать строки B, чтобы остались только смещения для A.

Векторизованный подход в pandas

Главное — построить ключ группировки, который увеличивается всякий раз, когда встречается B. Это достигается сравнением src с 'B' и последующим накопительным суммированием. Сгруппируйте по этому ключу, возьмите первую метку времени в каждой группе и вычтите её из всех меток в той же группе. Так шкала времени обнуляется на каждом B и даёт нужные смещения для строк A. Никаких явных циклов Python, никакого apply.

Воспроизводимый код

Ниже приведён фрагмент, который создаёт демонстрационный DataFrame, выполняет группировку, считает смещения и отбрасывает строки B.

import pandas as pd

records = pd.DataFrame(
    {
        "src": ["B", "A", "A", "A", "B", "A", "A", "A", "B"],
        "idx": [1, 1, 2, 3, 2, 4, 5, 6, 3],
        "ts": [20, 100, 200, 300, 320, 400, 500, 600, 620],
    }
)

bucket = records["src"].eq("B").cumsum()
records["ts"] = records["ts"] - records.groupby(bucket)["ts"].transform("first")
result = records.loc[records["src"].ne("B"), ["src", "idx", "ts"]]

print(result)

Как это работает

Выражение src.eq('B').cumsum() создаёт целочисленный Series, который увеличивается на единицу при каждом B и остаётся постоянным для всех последующих строк до следующего B. Группировка по этому Series разбивает данные на участки, начинающиеся на B и заканчивающиеся сразу перед следующим B. transform('first') распространяет первую метку времени каждого участка на все строки внутри него. Вычитание её из ts переводит метки во время относительно последнего B. Фильтрация строк, где src равен 'B', оставляет только нужные смещения A.

Почему это важно

В инженерии временных рядов и потоковой обработке часто требуется считать смещения относительно маркеров синхронизации или управляющих событий. Выполнение этой задачи с помощью векторизованных операций pandas и лаконично, и масштабируемо. Такой подход избавляет от построчных итераций и накладных расходов apply, оставаясь при этом понятным и удобным в сопровождении.

Выводы

Выравнивая асинхронные потоки в pandas, мыслите участками, определёнными сигнальными событиями. Накопительная сумма по булевой маске даёт простой и надёжный ключ группировки. Внутри каждого участка transform('first') позволяет вычесть опорную метку времени, не покидая векторизованный путь. А если в итоге нужны только отдельные строки, отфильтруйте их после вычислений, не усложняя логику группировки.

Статья основана на вопросе на StackOverflow от Matt и ответе mcsoini.