2025, Sep 29 11:16
Стабильный групповой индекс в PySpark без join: два способа — hash и concat
Как присвоить стабильный групповой индекс в PySpark без dropDuplicates и join: используйте hash или concat по classification, id_1 и id_2 — быстро и без шифлов.
Когда вы храните временные ряды или журналы событий в «длинной» таблице и вам нужен стабильный идентификатор для логической группы, построчные ID не подходят. Задача проста: присвоить один и тот же индекс каждой строке с одинаковой комбинацией classification, id_1 и id_2, причём в масштабе сотен миллионов строк в PySpark — без построения и присоединения отдельной таблицы соответствий.
Базовый, но тяжеловесный подход
Прямолинейный способ получить индекс для группы — выделить уникальные ключи, присвоить им ID и вернуть его обратно через join. Это работает, но на больших данных приводит к значительным перетасовкам.
from pyspark.sql import functions as S
# Original dataset
data = spark.table("some_source")
# Build a mapping of unique group keys to an index, then join it back
key_map = (
    data
    .select("classification", "id_1", "id_2")
    .dropDuplicates()
    .withColumn("grp_key", S.monotonically_increasing_id())
)
data_indexed = data.join(key_map, ["classification", "id_1", "id_2"]) Результат достигается, но связка distinct + join обходится дорого на масштабе миллиардов строк.
В чём суть проблемы?
Нужна одинаковая величина для всех строк, принадлежащих одной тройке ключей, а не уникальное значение на каждую строку. Это исключает monotonically_increasing_id(), который работает на уровне строк. Приём заключается в том, чтобы выводить единое значение прямо из самих группирующих столбцов, чтобы каждая строка группы вычисляла один и тот же результат без отдельной таблицы соответствий.
Два встроенных способа получить групповой индекс без join
Первый вариант — хеширование. Захешировав вместе classification, id_1 и id_2, вы получаете одно целое число, которое стабильно представляет группу. Используется встроенная функция Spark, дополнительных структур данных не требуется.
from pyspark.sql import functions as S
indexed_by_hash = (
    data
    .withColumn(
        "grp_idx",
        S.hash(S.col("classification"), S.col("id_1"), S.col("id_2"))
    )
) Второй вариант — конкатенация. Если нужен человекочитаемый индекс, склейте ключевые столбцы в одну строку, желательно с разделителем, чтобы значения не смешивались.
from pyspark.sql import functions as S
indexed_by_concat = (
    data
    .withColumn(
        "grp_idx",
        S.concat_ws("::", S.col("classification"), S.col("id_1"), S.col("id_2"))
    )
) Оба подхода вычисляют индекс прямо из столбцов для каждой строки, устраняя необходимость в dropDuplicates() и последующем join.
Почему это важно
Хеширование даёт компактный, «непрозрачный» ключ — отлично, когда нужен лишь стабильный идентификатор. Конкатенацию проще понять с первого взгляда, но она раздувает объём хранения, особенно при длинных исходных строках. Если важен размер данных, обычно практичнее выбрать хеш.
Итоги
Чтобы присвоить групповой индекс в PySpark на больших объёмах, выводите его из группирующих столбцов, а не стройте справочник с последующим join. Используйте hash(...) — когда нужен компактный целочисленный индекс, считаемый «на месте». Выбирайте concat_ws(...) — если важна читаемость и допустим строковый объём. Оба подхода сохраняют простую логику и опираются на нативные возможности Spark.