2025, Oct 17 02:15
Фильтрация массивов структур в PySpark: filter и array_contains
Как отфильтровать массивы структур в PySpark построчно: используем Spark SQL filter и array_contains вместо оператора in. Примеры и разбор ошибок парсера.
Фильтрация столбцов‑массивов со структурами в PySpark — распространённая потребность: в каждой строке есть массив событий и сопутствующий массив идентификаторов, который отмечает, какие события сохранить. Сложность возникает, когда пытаешься применить оператор Python in внутри expr: вместо ожидаемого результата он выдаёт синтаксическую ошибку.
Постановка задачи
Представьте DataFrame со столбцом activities типа array<struct<id, time>> и ещё одним столбцом wanted_ids, который хранит список идентификаторов, подлежащих сохранению для каждой строки. Когда условие фильтрации ссылается на одиночный целочисленный id, всё работает без проблем.
from pyspark.sql import functions as Fn
dataset = dataset.withColumn(
    'kept_time',
    Fn.expr('filter(activities, it -> it.id == wanted_id)').getField('time')
)
Но как только wanted_id превращается в столбец-массив и вы пытаетесь заменить == на in, парсер выражений падает.
from pyspark.sql import functions as Fn
dataset = dataset.withColumn(
    'kept_events',
    Fn.expr('filter(activities, it -> it.id in wanted_ids)')
)
Почему это ломается
Функция expr не преобразует оператор Python in в корректную конструкцию Spark SQL для проверки принадлежности элементу массива. В Spark SQL идиоматичный способ проверить, есть ли элемент в массиве, — это array_contains. Использование in в таком контексте приводит к синтаксической ошибке, а не к валидному предикату.
Решение
Используйте array_contains внутри лямбда-функции filter в expr. Так логика остаётся построчной: каждый struct.id сравнивается с массивом в той же строке, и возвращаются только подходящие события.
from pyspark.sql import functions as Fn
dataset = dataset.withColumn(
    'kept_events',
    Fn.expr('filter(activities, it -> array_contains(wanted_ids, it.id))')
)
Если вам нужен только time из отфильтрованных событий, спроецируйте его из получившегося массива структур.
from pyspark.sql import functions as Fn
dataset = dataset.withColumn(
    'kept_times',
    Fn.expr('filter(activities, it -> array_contains(wanted_ids, it.id))').getField('time')
)
Почему это важно
array_contains — надёжный способ выразить проверку принадлежности для столбцов‑массивов внутри expr. Он избавляет от ошибок парсера, целиком держит логику в Spark SQL и аккуратно обрабатывает построчную фильтрацию данных array<struct>.
Итоги
Фильтруя массив структур по построчному списку идентификаторов, не используйте оператор Python in внутри expr. Полагайтесь на array_contains в связке с filter, чтобы задать условие напрямую на языке Spark SQL. Если нужно конкретное поле из отфильтрованных структур, получите его через getField у результата.