2025, Oct 17 02:15

Фильтрация массивов структур в PySpark: filter и array_contains

Как отфильтровать массивы структур в PySpark построчно: используем Spark SQL filter и array_contains вместо оператора in. Примеры и разбор ошибок парсера.

Фильтрация столбцов‑массивов со структурами в PySpark — распространённая потребность: в каждой строке есть массив событий и сопутствующий массив идентификаторов, который отмечает, какие события сохранить. Сложность возникает, когда пытаешься применить оператор Python in внутри expr: вместо ожидаемого результата он выдаёт синтаксическую ошибку.

Постановка задачи

Представьте DataFrame со столбцом activities типа array<struct<id, time>> и ещё одним столбцом wanted_ids, который хранит список идентификаторов, подлежащих сохранению для каждой строки. Когда условие фильтрации ссылается на одиночный целочисленный id, всё работает без проблем.

from pyspark.sql import functions as Fn
dataset = dataset.withColumn(
    'kept_time',
    Fn.expr('filter(activities, it -> it.id == wanted_id)').getField('time')
)

Но как только wanted_id превращается в столбец-массив и вы пытаетесь заменить == на in, парсер выражений падает.

from pyspark.sql import functions as Fn
dataset = dataset.withColumn(
    'kept_events',
    Fn.expr('filter(activities, it -> it.id in wanted_ids)')
)

Почему это ломается

Функция expr не преобразует оператор Python in в корректную конструкцию Spark SQL для проверки принадлежности элементу массива. В Spark SQL идиоматичный способ проверить, есть ли элемент в массиве, — это array_contains. Использование in в таком контексте приводит к синтаксической ошибке, а не к валидному предикату.

Решение

Используйте array_contains внутри лямбда-функции filter в expr. Так логика остаётся построчной: каждый struct.id сравнивается с массивом в той же строке, и возвращаются только подходящие события.

from pyspark.sql import functions as Fn
dataset = dataset.withColumn(
    'kept_events',
    Fn.expr('filter(activities, it -> array_contains(wanted_ids, it.id))')
)

Если вам нужен только time из отфильтрованных событий, спроецируйте его из получившегося массива структур.

from pyspark.sql import functions as Fn
dataset = dataset.withColumn(
    'kept_times',
    Fn.expr('filter(activities, it -> array_contains(wanted_ids, it.id))').getField('time')
)

Почему это важно

array_contains — надёжный способ выразить проверку принадлежности для столбцов‑массивов внутри expr. Он избавляет от ошибок парсера, целиком держит логику в Spark SQL и аккуратно обрабатывает построчную фильтрацию данных array<struct>.

Итоги

Фильтруя массив структур по построчному списку идентификаторов, не используйте оператор Python in внутри expr. Полагайтесь на array_contains в связке с filter, чтобы задать условие напрямую на языке Spark SQL. Если нужно конкретное поле из отфильтрованных структур, получите его через getField у результата.

Статья основана на вопросе с StackOverflow от Nourless и ответе от jei.