2025, Dec 05 09:00

Expand a PySpark parent–child BOM with GraphFrames and compute normalized RequiredQuantity

Learn how to scale parent–child BOM expansion in PySpark by replacing recursion with GraphFrames, computing normalized RequiredQuantity across paths. Faster.

Expanding a parent–child hierarchy in PySpark sounds straightforward until you try to push recursion across thousands of nodes. The task here is to start from each Material, walk down all Component levels, and add rows for every downstream component while normalizing RequiredQuantity by the formula RequiredQuantity / BatchSize * Parent RequiredQuantity. Doing this naively with Python recursion over Spark DataFrames quickly becomes a bottleneck.

Problem setup and a slow approach

The hierarchy is defined by two columns: Material is the parent, Component is the child. For every root material, we need to traverse all descendants and compute the normalized quantity for each newly added component row.

The following PySpark code accomplishes this by recursively visiting children, but in practice it is extremely slow, taking roughly five minutes per material on a dataset with almost five thousand materials.

from pyspark.sql import functions as F
def walk_parts(bom_df, root_item):
    if bom_df.isEmpty():
        return bom_df
    focus_df = bom_df.where(F.col("Material") == root_item)
    lot_size = focus_df.select("BatchSize").first()["BatchSize"]
    child_ids = (
        focus_df.select("Component").rdd.flatMap(lambda r: r).collect()
    )
    for part_id in child_ids:
        child_df = bom_df.where(F.col("Material") == part_id)
        if not child_df.isEmpty():
            req_qty = (
                focus_df.where(F.col("Component") == part_id)
                .select("RequiredQuantity")
                .first()["RequiredQuantity"]
            )
            downstream = walk_parts(bom_df, part_id).withColumns(
                {
                    "Material": F.lit(root_item),
                    "RequiredQuantity": F.col("RequiredQuantity")
                    * req_qty
                    / F.col("BatchSize"),
                    "BatchSize": F.lit(lot_size),
                }
            )
            focus_df = focus_df.union(downstream)
    return focus_df
roots = bom_df.select("Material").distinct().rdd.flatMap(lambda r: r).collect()
final_df = ss.createDataFrame([], bom_df.schema)
for root in roots:
    final_df = final_df.union(walk_parts(bom_df, root))

Why this is slow

The logic repeatedly filters, collects, and firsts inside Python loops. Each of those triggers Spark jobs, shuffles data back to the driver, and stacks more work on recursion. The effect multiplies across materials and components. Even though the recursion works correctly, the execution pattern is the opposite of how Spark wants to run distributed workloads.

A scalable fix with GraphFrames

Model the hierarchy as a graph and let GraphFrames enumerate paths for you. Define vertices from every distinct node (both Material and Component), define edges from parent–child links, and then iterate through path lengths with motif queries. At each hop, compute the cumulative RequiredQuantity along the path using a UDF that applies the normalization rule across edges.

from pyspark.sql import functions as F, types as T, SparkSession
from graphframes import GraphFrame
ss = SparkSession.builder.getOrCreate()
src_df = ss.createDataFrame(
    [
        ("A", "A1", 1300, 1.0),
        ("A", "A2", 1300, 0.056),
        ("A", "A3", 1300, 2.78),
        ("A", "B", 1300, 1300.5),
        ("B", "B1", 1000, 1007.0),
        ("B", "B2", 1000, 3.5),
        ("B", "C", 1000, 9.0),
        ("C", "C1", 800, 806.4),
    ],
    ["Material", "Component", "BatchSize", "RequiredQuantity"],
)
verts = (
    src_df.select(F.col("Material").alias("id"))
    .union(src_df.select(F.col("Component").alias("id")))
    .distinct()
)
links = (
    src_df.select(
        F.col("Material").alias("src"),
        F.col("Component").alias("dst"),
        "BatchSize",
        "RequiredQuantity",
    )
    .union(
        src_df.select(
            "Component",
            F.lit(None),
            F.lit(None),
            F.lit(None),
            F.lit(None),
        )
    )
    .distinct()
)
g = GraphFrame(verts, links)
@F.udf(T.DoubleType())
def fold_qty(*path_edges: list[dict]) -> float:
    out = path_edges[0]["RequiredQuantity"]
    for pe in path_edges[1:]:
        out *= pe["RequiredQuantity"] / pe["BatchSize"]
    return out
expanded = ss.createDataFrame([], src_df.schema)
hop = 1
while True:
    motif = ";".join(f"(v{j})-[e{j}]->(v{j+1})" for j in range(hop))
    step = g.find(motif)
    if step.isEmpty():
        break
    expanded = expanded.union(
        step.select(
            F.col("v0")["id"],
            F.col(f"v{hop}")["id"],
            F.col("e0")["BatchSize"],
            fold_qty(*(c for c in step.columns if c.startswith("e"))),
        )
    )
    hop += 1

This approach produced the full expansion in a little over seven minutes for all materials. If you are running in a Fabric notebook, preconfigure the session to include GraphFrames and install the Python wrapper.

%%configure -f
{
    "conf": {
        "spark.jars.packages": "graphframes:graphframes:0.8.4-spark3.5-s_2.12"
    }
}
%pip install graphframes-py

Why this works

The key is to convert the hierarchical expansion into a graph traversal that stays distributed. Vertices and edges come directly from the DataFrame. The motif query grows the path length step by step, and the UDF composes RequiredQuantity along that path using exactly the required formula. Instead of recursive driver-side loops, the work is expressed as DataFrame operations that Spark can parallelize.

Why you should care

Parent–child expansions show up often. When you reach for recursion over a DataFrame, you risk turning a cluster job into a serial loop. Representing relationships as edges and using GraphFrames keeps everything within Spark's execution model while preserving the exact arithmetic your domain requires.

Takeaways

Keep the parent–child direction explicit by treating Material as the parent and Component as the child, and push the traversal into GraphFrames rather than iterating in Python. Build vertices and edges from the same source table so the graph is complete, iterate by hop count with find, and compute the normalized RequiredQuantity along the path. This keeps the logic accurate and the runtime predictable even as the hierarchy grows.