2025, Sep 29 11:16

Стабильный групповой индекс в PySpark без join: два способа — hash и concat

Как присвоить стабильный групповой индекс в PySpark без dropDuplicates и join: используйте hash или concat по classification, id_1 и id_2 — быстро и без шифлов.

Когда вы храните временные ряды или журналы событий в «длинной» таблице и вам нужен стабильный идентификатор для логической группы, построчные ID не подходят. Задача проста: присвоить один и тот же индекс каждой строке с одинаковой комбинацией classification, id_1 и id_2, причём в масштабе сотен миллионов строк в PySpark — без построения и присоединения отдельной таблицы соответствий.

Базовый, но тяжеловесный подход

Прямолинейный способ получить индекс для группы — выделить уникальные ключи, присвоить им ID и вернуть его обратно через join. Это работает, но на больших данных приводит к значительным перетасовкам.

from pyspark.sql import functions as S
# Original dataset
data = spark.table("some_source")
# Build a mapping of unique group keys to an index, then join it back
key_map = (
    data
    .select("classification", "id_1", "id_2")
    .dropDuplicates()
    .withColumn("grp_key", S.monotonically_increasing_id())
)
data_indexed = data.join(key_map, ["classification", "id_1", "id_2"]) 

Результат достигается, но связка distinct + join обходится дорого на масштабе миллиардов строк.

В чём суть проблемы?

Нужна одинаковая величина для всех строк, принадлежащих одной тройке ключей, а не уникальное значение на каждую строку. Это исключает monotonically_increasing_id(), который работает на уровне строк. Приём заключается в том, чтобы выводить единое значение прямо из самих группирующих столбцов, чтобы каждая строка группы вычисляла один и тот же результат без отдельной таблицы соответствий.

Два встроенных способа получить групповой индекс без join

Первый вариант — хеширование. Захешировав вместе classification, id_1 и id_2, вы получаете одно целое число, которое стабильно представляет группу. Используется встроенная функция Spark, дополнительных структур данных не требуется.

from pyspark.sql import functions as S
indexed_by_hash = (
    data
    .withColumn(
        "grp_idx",
        S.hash(S.col("classification"), S.col("id_1"), S.col("id_2"))
    )
) 

Второй вариант — конкатенация. Если нужен человекочитаемый индекс, склейте ключевые столбцы в одну строку, желательно с разделителем, чтобы значения не смешивались.

from pyspark.sql import functions as S
indexed_by_concat = (
    data
    .withColumn(
        "grp_idx",
        S.concat_ws("::", S.col("classification"), S.col("id_1"), S.col("id_2"))
    )
) 

Оба подхода вычисляют индекс прямо из столбцов для каждой строки, устраняя необходимость в dropDuplicates() и последующем join.

Почему это важно

Хеширование даёт компактный, «непрозрачный» ключ — отлично, когда нужен лишь стабильный идентификатор. Конкатенацию проще понять с первого взгляда, но она раздувает объём хранения, особенно при длинных исходных строках. Если важен размер данных, обычно практичнее выбрать хеш.

Итоги

Чтобы присвоить групповой индекс в PySpark на больших объёмах, выводите его из группирующих столбцов, а не стройте справочник с последующим join. Используйте hash(...) — когда нужен компактный целочисленный индекс, считаемый «на месте». Выбирайте concat_ws(...) — если важна читаемость и допустим строковый объём. Оба подхода сохраняют простую логику и опираются на нативные возможности Spark.

Статья основана на вопросе с StackOverflow от CopyOfA и ответе пользователя welp.