2025, Sep 29 11:31
PySpark में classification, id_1, id_2 के समूह इंडेक्स बिना join
PySpark में classification, id_1, id_2 पर आधारित समूह इंडेक्स बिना distinct+join: hash(...) से कॉम्पैक्ट key या concat_ws(...) से पठनीय key। अरब-पंक्ति स्केल पर तेज़।
जब आप टाइम सीरीज़ या इवेंट लॉग्स को एक लंबी तालिका में रखते हैं और हर तार्किक समूह के लिए एक स्थिर पहचानकर्ता चाहिए, तो पंक्ति-स्तरीय IDs काम नहीं आएंगी। मकसद सरल है: PySpark में सैकड़ों मिलियन पंक्तियों के पैमाने पर classification, id_1 और id_2 के एक ही संयोजन को साझा करने वाली हर पंक्ति को समान इंडेक्स देना—वह भी अलग मैपिंग टेबल बना कर उसे जॉइन किए बिना।
आधारभूत तरीका, जो भारी लगता है
प्रति‑समूह इंडेक्स पाने का सीधा रास्ता है कि पहले विशिष्ट कुंजियाँ निकाली जाएँ, उन्हें एक ID दी जाए और फिर मूल डेटा से जॉइन कर दिया जाए। यह चलता है, पर बड़े डेटा पर यह बहुत शफ़लिंग करा देता है।
from pyspark.sql import functions as S
# मूल डेटासेट
data = spark.table("some_source")
# यूनिक ग्रुप कीज़ का इंडेक्स मैप बनाएं, फिर उसे वापस जॉइन करें
key_map = (
    data
    .select("classification", "id_1", "id_2")
    .dropDuplicates()
    .withColumn("grp_key", S.monotonically_increasing_id())
)
data_indexed = data.join(key_map, ["classification", "id_1", "id_2"]) यह अपेक्षित परिणाम दे देता है, लेकिन distinct + join का यह अतिरिक्त रास्ता अरब-पंक्ति स्तर पर महंगा साबित हो सकता है।
समस्या का मूल क्या है?
आपको प्रति पंक्ति अलग मान नहीं, बल्कि उसी key triple से जुड़ी सभी पंक्तियों के लिए एक जैसा मान चाहिए। इसलिए monotonically_increasing_id() बाहर हो जाता है—यह पंक्ति-स्तर पर काम करता है। तरकीब यह है कि समूह बनाने वाले कॉलमों से ही सीधे एक मान निकाला जाए, ताकि उसी समूह की हर पंक्ति बिना अलग मैपिंग बनाए वही परिणाम निकाले।
जॉइन के बिना प्रति‑समूह इंडेक्स निकालने के दो नैटिव तरीके
पहला विकल्प है hashing। classification, id_1 और id_2 को साथ में हैश करने से आपको एक ऐसा integer मिलता है जो उस समूह का स्थिर प्रतिनिधित्व करता है। यह Spark की बिल्ट‑इन फ़ंक्शन का उपयोग करता है और किसी अतिरिक्त डेटा संरचना की ज़रूरत नहीं पड़ती।
from pyspark.sql import functions as S
indexed_by_hash = (
    data
    .withColumn(
        "grp_idx",
        S.hash(S.col("classification"), S.col("id_1"), S.col("id_2"))
    )
) दूसरा विकल्प concatenation है। अगर आप इंसान‑पठनीय इंडेक्स पसंद करते हैं, तो कुंजी कॉलमों को एक ही स्ट्रिंग में जोड़ दें—बेहतर होगा कि एक delimiter रखें ताकि मान आपस में घुलें नहीं।
from pyspark.sql import functions as S
indexed_by_concat = (
    data
    .withColumn(
        "grp_idx",
        S.concat_ws("::", S.col("classification"), S.col("id_1"), S.col("id_2"))
    )
) दोनों तरीक़े हर पंक्ति पर इन्हीं कॉलमों से इंडेक्स निकालते हैं, इसलिए dropDuplicates() और उसके बाद के join की ज़रूरत खत्म हो जाती है।
यह चुनाव क्यों मायने रखता है
Hashing एक कॉम्पैक्ट, अपारदर्शी कुंजी देता है, जो तब ठीक बैठता है जब आपको सिर्फ़ एक स्थिर पहचानकर्ता चाहिए। Concatenation एक नज़र में समझना आसान है, पर स्टोरेज बढ़ा सकता है—खासकर तब जब मूल स्ट्रिंग्स लंबी हों। अगर payload का आकार चिंता है, तो hashing आम तौर पर व्यावहारिक डिफ़ॉल्ट है।
सारांश
PySpark में बड़े पैमाने पर समूह‑स्तरीय इंडेक्स देने के लिए, लुकअप बनाकर जॉइन करने के बजाय इसे सीधे समूह कॉलमों से निकालें। कॉम्पैक्ट, नैटिव integer इंडेक्स चाहिए तो hash(...) अपनाएँ, जो यहीं पर गणना हो जाए। पठनीयता प्राथमिकता हो और स्ट्रिंग का आकार स्वीकार्य हो, तो concat_ws(...) का उपयोग करें। दोनों रास्ते तर्क को सरल रखते हैं और Spark की नैटिव क्षमताओं के अनुरूप रहते हैं।