2025, Sep 30 11:32
PySpark DataFrame बैचिंग: थ्रेशोल्ड पर रीसेट होने वाला संचयी योग
PySpark DataFrame में थ्रेशोल्ड-आधारित बैचिंग सीखें: सीमा पार कराने वाली पंक्ति को उसी बैच में रखें, pandas_udf से रीसेट संचयी योग करें और स्थिर, पुनरुत्पादक समूहकरण पाएं.
PySpark DataFrame में रिकॉर्ड्स को बैचों में जमा करना, जब तक कि संचयी योग किसी थ्रेशोल्ड (सीमा) को पार न कर जाए, सुनने में आसान लगता है। लेकिन एक अहम बारीकी कई इम्प्लीमेंटेशनों को फँसा देती है: वह सीमा-निर्धारक पंक्ति जो कुल को सीमा से ऊपर धकेलती है, उसी बैच में बनी रहनी चाहिए। अगर आप उस रिकॉर्ड को हटा दें या अगले समूह में धकेल दें, तो बैच तय की गई परिभाषा से मेल नहीं खाएँगे।
समस्या
हमारे पास दो कॉलम—ID और Count—वाला एक DataFrame है। लक्ष्य यह है कि डेटा को क्रम से पढ़ते हुए ऐसे बैच (ID की सूचियाँ) बनाएँ जहाँ हम ID जोड़ते जाएँ, और जैसे ही पहली बार संचयी Count दिए गए थ्रेशोल्ड तक पहुँचे या उसे पार करे, उसी क्षण बैच बंद हो जाए—और वह सीमा पर पहुँचाने वाली ID भी उसी बैच में शामिल रहे।
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql import Window
import pandas as pd
rows = [
    ("abc", 500),
    ("def", 300),
    ("ghi", 400),
    ("jkl", 200),
    ("mno", 1100),
    ("pqr", 900),
]
frame = spark.createDataFrame(rows, ["ID", "Count"])
threshold = 1000
1000 के थ्रेशोल्ड पर, बैच इस तरह दिखने चाहिए: [abc, def, ghi] जिनका योग 1200 है; फिर [jkl, pqr] जिनका योग 1100 है; और [mno] एक अलग बैच के रूप में, क्योंकि उसका Count पहले से ही थ्रेशोल्ड के बराबर या उससे अधिक है।
सीधी-सादी विधि क्यों चूक जाती है
अक्सर हम floor-आधारित समूहकरण या चलता हुआ योग और integer division का सहारा लेने की सोचते हैं। लेकिन ऐसा करने से वही पंक्ति छूट जाती है जो कुल को थ्रेशोल्ड से आगे ले जाती है, और समूहकरण गलत हो जाता है। मूल विचार यही है:
Floor और अन्य विधियाँ उस अंतिम ID को शामिल नहीं करतीं जो इसे थ्रेशोल्ड के पार ले जाती है। आपको किसी तरह का रीसेट होने वाला काउंटर चाहिए।
यहीं पर रीसेट होने वाला संचयी योग काम आता है: आप थ्रेशोल्ड तक जमा करते हैं, सीमा पर पहुँचते ही बैच बंद करते हैं—और सीमा-निर्धारक पंक्ति को उसी बैच में रखते हैं—फिर accumulator को रीसेट कर अगला बैच शुरू करते हैं।
समाधान
नीचे दी गई पद्धति दो चरणों में काम करती है। पहले, जिन पंक्तियों का Count पहले से थ्रेशोल्ड के बराबर या उससे अधिक है, उन्हें अलग कर दीजिए ताकि वे स्वतंत्र बैच बन जाएँ। फिर, बाकी पंक्तियों के लिए pandas_udf के जरिए लागू किए गए रीसेट होने वाले संचयी योग से बैच पहचानकर्ता (batch id) निकालिए। अंत में, स्वतंत्र पंक्तियों को अंतिम निकले बैच के बाद क्रमांकित कीजिए और परिणामों को जोड़ दीजिए।
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql import Window
import pandas as pd
rows = [
    ("abc", 500),
    ("def", 300),
    ("ghi", 400),
    ("jkl", 200),
    ("mno", 1100),
    ("pqr", 900),
]
frame = spark.createDataFrame(rows, ["ID", "Count"])
# 1) जो पंक्तियाँ पहले से थ्रेशोल्ड तक पहुँच/पार कर चुकी हैं, उन्हें अलग करें
over_limit = frame.where(frame.Count >= 1000)
under_limit = frame.where(frame.Count < 1000)
# 2) बाद की नंबरिंग के लिए एकसमान प्रोसेसिंग क्रम बनाए रखने हेतु पंक्ति क्रमांक जोड़ें
under_limit = under_limit.withColumn(
    "idx", F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))
)
over_limit = over_limit.withColumn(
    "idx", F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))
)
# 3) रीसेट होने वाले संचयी योग से batch_id असाइन करें
@F.pandas_udf(IntegerType())
def tag_batches(cnt: pd.Series) -> pd.Series:
    tags = []
    acc = 0
    bucket_no = 0
    for val in cnt:
        acc += val
        tags.append(bucket_no)
        if acc >= 1000:
            bucket_no += 1
            acc = 0
    return pd.Series(tags)
under_limit = under_limit.withColumn("batch_id", tag_batches(F.col("Count")))
# 4) थ्रेशोल्ड से कम वाली पंक्तियों के अंतिम बैच के बाद, स्वतंत्र पंक्तियों की नंबरिंग आगे से जारी रखें
over_limit = over_limit.withColumn(
    "batch_id",
    F.col("idx") + under_limit.agg({"batch_id": "max"}).collect()[0][0] + 1,
)
# 5) परिणामों को मिलाएँ और देखें
result_ds = under_limit.unionByName(over_limit, allowMissingColumns=True)
result_ds.select("ID", "Count", "batch_id").show()
अंदर से क्या हो रहा है
मुख्य कड़ी वह pandas_udf है जो Count मानों पर चलते हुए एक चलायमान accumulator बनाए रखती है। जब भी accumulator थ्रेशोल्ड तक पहुँचता है या उसे पार करता है, उसी पंक्ति के लिए मौजूदा batch id दर्ज होती है, फिर accumulator रीसेट होता है और batch id बढ़ा दी जाती है। इससे यह सुनिश्चित होता है कि कुल को थ्रेशोल्ड से आगे ले जाने वाली पंक्ति उसी बैच में रहे, अगली में न टले। जो पंक्तियाँ पहले से थ्रेशोल्ड पूरी कर चुकी हैं, उन्हें अलग संभाला जाता है और उनके batch id वहीं से आगे बढ़ते हैं जहाँ पिछली श्रृंखला रुकी थी। monotonically increasing id पर बनाए गए row numbers इस नंबरिंग चरण के लिए एक नियत क्रम बनाए रखने में मदद करते हैं।
यह क्यों मायने रखता है
जब बैच आगे के जॉब्स, संसाधन योजना या SLA विंडो का आधार बनते हैं, तब सीमा-निर्धारक पंक्तियों को सुरक्षित रखना अनिवार्य है। अगर मोड़ लेने वाला रिकॉर्ड अगले बैच में छलाँग लगा दे, तो बैच का आकार और संख्या दोनों बिगड़ जाते हैं, जिससे प्रोसेसिंग असमान हो सकती है और समस्या खोज पाना कठिन हो जाता है। रीसेट होने वाला संचयी योग अपनाने से समूहकरण परिभाषा के अनुरूप और सुसंगत रहता है।
मुख्य बातें
अगर समूहकरण में उस पहली पंक्ति को शामिल करना अनिवार्य है जो संचयी योग को थ्रेशोल्ड से ऊपर ले जाती है, तो floor जैसी तरकीबों के बजाय रीसेट होने वाला संचयी योग उपयोग करें। जो आइटम पहले से थ्रेशोल्ड पर या उससे ऊपर हैं, उन्हें स्वतंत्र बैच मानें और उनकी नंबरिंग छोटे बैचों के बाद जारी रखें। पुनरुत्पादक समूहकरण के लिए एकसमान प्रोसेसिंग क्रम सुनिश्चित करें, फिर परिणामों को जोड़ें और आगे की प्रोसेसिंग करें।