2025, Dec 22 09:02
Как развернуть иерархию «родитель–потомок» в PySpark с GraphFrames
Покажем, как развернуть иерархию родитель–потомок в PySpark без рекурсии: GraphFrames, motif-запросы и расчет RequiredQuantity/BatchSize для всех уровней.
Раскрутить иерархию «родитель–потомок» в PySpark кажется простым, пока не попытаешься протолкнуть рекурсию через тысячи узлов. Здесь задача — для каждого Material пройти по всем уровням Component и добавить строки для каждого нижестоящего компонента, при этом нормализуя RequiredQuantity по формуле RequiredQuantity / BatchSize * родительский RequiredQuantity. Наивная реализация через рекурсию на Python поверх Spark DataFrame быстро упирается в узкое место.
Постановка задачи и медленный подход
Иерархия задана двумя столбцами: Material — родитель, Component — потомок. Для каждого корневого материала нужно обойти всех потомков и посчитать нормализованное количество для каждой добавленной строки компонента.
Код ниже делает это, рекурсивно обходя потомков, но на практике он крайне медленный: около пяти минут на один материал на наборе почти из пяти тысяч материалов.
from pyspark.sql import functions as F
def walk_parts(bom_df, root_item):
if bom_df.isEmpty():
return bom_df
focus_df = bom_df.where(F.col("Material") == root_item)
lot_size = focus_df.select("BatchSize").first()["BatchSize"]
child_ids = (
focus_df.select("Component").rdd.flatMap(lambda r: r).collect()
)
for part_id in child_ids:
child_df = bom_df.where(F.col("Material") == part_id)
if not child_df.isEmpty():
req_qty = (
focus_df.where(F.col("Component") == part_id)
.select("RequiredQuantity")
.first()["RequiredQuantity"]
)
downstream = walk_parts(bom_df, part_id).withColumns(
{
"Material": F.lit(root_item),
"RequiredQuantity": F.col("RequiredQuantity")
* req_qty
/ F.col("BatchSize"),
"BatchSize": F.lit(lot_size),
}
)
focus_df = focus_df.union(downstream)
return focus_df
roots = bom_df.select("Material").distinct().rdd.flatMap(lambda r: r).collect()
final_df = ss.createDataFrame([], bom_df.schema)
for root in roots:
final_df = final_df.union(walk_parts(bom_df, root))
Почему это медленно
Внутри Python-циклов код снова и снова делает фильтры, collect и first. Каждое из этих действий запускает задания Spark, перетаскивает данные на драйвер и наращивает нагрузку рекурсии. Эффект многократно усиливается по всем материалам и компонентам. Хотя сама рекурсия корректна, такой способ исполнения противоположен тому, как Spark ожидает выполнять распределённые вычисления.
Масштабируемое решение с GraphFrames
Смоделируйте иерархию как граф и доверьте GraphFrames перечисление путей. Задайте вершины из каждого уникального узла (и Material, и Component), рёбра — из связей «родитель–потомок», затем проходите по длинам путей с помощью motif-запросов. На каждом шаге считайте накопленную RequiredQuantity вдоль пути через UDF, применяющий правило нормализации на каждом ребре.
from pyspark.sql import functions as F, types as T, SparkSession
from graphframes import GraphFrame
ss = SparkSession.builder.getOrCreate()
src_df = ss.createDataFrame(
[
("A", "A1", 1300, 1.0),
("A", "A2", 1300, 0.056),
("A", "A3", 1300, 2.78),
("A", "B", 1300, 1300.5),
("B", "B1", 1000, 1007.0),
("B", "B2", 1000, 3.5),
("B", "C", 1000, 9.0),
("C", "C1", 800, 806.4),
],
["Material", "Component", "BatchSize", "RequiredQuantity"],
)
verts = (
src_df.select(F.col("Material").alias("id"))
.union(src_df.select(F.col("Component").alias("id")))
.distinct()
)
links = (
src_df.select(
F.col("Material").alias("src"),
F.col("Component").alias("dst"),
"BatchSize",
"RequiredQuantity",
)
.union(
src_df.select(
"Component",
F.lit(None),
F.lit(None),
F.lit(None),
F.lit(None),
)
)
.distinct()
)
g = GraphFrame(verts, links)
@F.udf(T.DoubleType())
def fold_qty(*path_edges: list[dict]) -> float:
out = path_edges[0]["RequiredQuantity"]
for pe in path_edges[1:]:
out *= pe["RequiredQuantity"] / pe["BatchSize"]
return out
expanded = ss.createDataFrame([], src_df.schema)
hop = 1
while True:
motif = ";".join(f"(v{j})-[e{j}]->(v{j+1})" for j in range(hop))
step = g.find(motif)
if step.isEmpty():
break
expanded = expanded.union(
step.select(
F.col("v0")["id"],
F.col(f"v{hop}")["id"],
F.col("e0")["BatchSize"],
fold_qty(*(c for c in step.columns if c.startswith("e"))),
)
)
hop += 1
Этот подход дал полное раскрытие чуть больше чем за семь минут для всех материалов. Если вы запускаете в блокноте Fabric, заранее настройте сессию с зависимостью GraphFrames и установите Python-обёртку.
%%configure -f
{
"conf": {
"spark.jars.packages": "graphframes:graphframes:0.8.4-spark3.5-s_2.12"
}
}
%pip install graphframes-py
Почему это работает
Ключевая идея — превратить раскрытие иерархии в обход графа, который остаётся распределённым. Вершины и рёбра берутся напрямую из DataFrame. Motif-запрос по шагам увеличивает длину пути, а UDF складывает RequiredQuantity вдоль этого пути строго по нужной формуле. Вместо рекурсии на стороне драйвера вся работа выражена через операции DataFrame, которые Spark умеет распараллеливать.
Зачем это важно
Раскрытие связей «родитель–потомок» встречается регулярно. Тянущаяся рекурсия поверх DataFrame чревата тем, что кластерная задача превратится в последовательный цикл. Представив отношения как рёбра и применив GraphFrames, вы останетесь в модели исполнения Spark и при этом сохраните точные вычисления, требуемые предметной областью.
Итоги
Сохраняйте явное направление «родитель–потомок»: Material — родитель, Component — потомок, а обход переносите в GraphFrames вместо итераций на Python. Стройте вершины и рёбра из одной исходной таблицы, чтобы граф был полным, проходите по числу «hops» через find и считайте нормализованную RequiredQuantity вдоль пути. Так логика остаётся точной, а время выполнения — предсказуемым, даже когда иерархия разрастается.