2025, Sep 30 15:33
asyncio में IO-बाउंड लूप्स को समानांतर करें: create_task, gather और Semaphore से 4 तक concurrency
Python asyncio में IO-बाउंड लूप्स तेज करें: create_task, asyncio.gather और Semaphore से अनुरोध समानांतर चलाएँ, concurrency को 4 तक सीमित करें, थ्रूपुट बढ़ाएँ.
asyncio में IO-आधारित लूप्स को समानांतर चलाना अक्सर कम मेहनत में बड़ा असर देने वाला अनुकूलन होता है। जब कोई coroutine किसी टाइट लूप में हर ऑपरेशन पर await करता है, तो वह स्वभावतः एक बार में सिर्फ एक आइटम ही प्रोसेस करता है। यदि काम नेटवर्क-बाउंड है, तो यह संभावित थ्रूपुट का बड़ा हिस्सा अनछुआ छोड़ देता है। नीचे एक केंद्रित वॉकथ्रू है: कई निर्देशांकों के लिए अनुरोधों को साथ‑साथ बैच में कैसे भेजें, और समानांतरता को अधिकतम चार चल रहे अनुरोधों तक कैसे सीमित करें।
बेसलाइन: एक‑एक कर await करना
शुरुआती संस्करण प्रत्येक इटरेशन में केवल एक निर्देशांक प्रोसेस करता है और परिणाम को एक dictionary में सहेजता है। यह पैनोरमा मेटाडेटा लाने के लिए aiohttp और find_panorama_async कॉल का उपयोग करता है। तर्क सरल है, लेकिन लूप अगले पर जाने से पहले हर कॉल का इंतजार (await) करता है।
import asyncio
from aiohttp import ClientSession
from tqdm.asyncio import tqdm
import pandas as pd
from streetlevel import streetview
geo_map = {
    1: {'country': 'AZE', 'longitude': 48.84328388521507, 'latitude': 39.75349231850633},
    2: {'country': 'AZE', 'longitude': 46.42568983461019, 'latitude': 41.74686028542068},
    3: {'country': 'AZE', 'longitude': 46.77391084838791, 'latitude': 41.05880534123746},
    4: {'country': 'AZE', 'longitude': 46.57032078287734, 'latitude': 39.312871420751485},
    5: {'country': 'AZE', 'longitude': 47.26319069021316, 'latitude': 41.28950907274436},
    6: {'country': 'AZE', 'longitude': 46.49956247696345, 'latitude': 40.96062005058899},
    7: {'country': 'AZE', 'longitude': 48.291171317357815, 'latitude': 38.939108897065445},
    8: {'country': 'AZE', 'longitude': 47.12081533506723, 'latitude': 39.681052295694656},
}
async def scan_points(geo_map):
    pano_map = {}
    async with ClientSession() as http:
        for ident, meta in tqdm(geo_map.items(), desc="Parsing coordinates", unit="coordinate"):
            snap = await streetview.find_panorama_async(meta['latitude'], meta['longitude'], http)
            if snap is not None:
                pano_map[ident] = {
                    'longitude': snap.lon,
                    'latitude': snap.lat,
                    'date': str(snap.date)
                }
            else:
                pano_map[ident] = {
                    'longitude': None,
                    'latitude': None,
                    'date': None
                }
    return pano_map
async def run_pipeline(geo_map):
    outcome = await scan_points(geo_map)
    frame = pd.DataFrame.from_dict(outcome, orient='index')
    frame.reset_index(inplace=True)
    frame.rename(columns={'index': 'uuid'}, inplace=True)
    frame.to_parquet('results.parquet', index=False)
asyncio.run(run_pipeline(geo_map))
गति क्यों धीमी पड़ती है
await तब तक coroutine को रोक देता है जब तक प्रतीक्षित ऑपरेशन पूरा नहीं हो जाता। लूप के अंदर इसका इस्तेमाल अन्य नेटवर्क कॉल्स को तब तक शुरू होने से रोक देता है, जब तक मौजूदा कॉल खत्म न हो जाए। यह सही है, लेकिन स्वभावतः क्रमिक (सीक्वेंशियल) है। थ्रूपुट बेहतर करने के लिए विचार यह है कि कई coroutines एक साथ शुरू किए जाएँ और उनके परिणाम बाद में await किए जाएँ।
create_task के साथ कई कार्य शुरू करें, परिणाम बाद में await करें
IO को समानांतर करने का एक तरीका है create_task से कई coroutines शेड्यूल करना और बाद में उन्हें await करना। इससे वे तुरंत शुरू हो जाते हैं और इवेंट लूप उनकी प्रगति को इंटरलीव कर पाता है।
async def scan_points_parallel_fire_and_wait(geo_map):
    results_map = {}
    async with ClientSession() as http:
        pending = []
        # सभी कार्य तुरंत शुरू करें
        for ident, meta in tqdm(geo_map.items(), desc="Parsing coordinates", unit="coordinate"):
            fut = asyncio.create_task(
                streetview.find_panorama_async(meta['latitude'], meta['longitude'], http)
            )
            pending.append(fut)
        # बाद में, हर कार्य का परिणाम await करें
        for idx, fut in enumerate(pending):
            snap = await fut
            if snap is not None:
                results_map[idx] = {
                    'longitude': snap.lon,
                    'latitude': snap.lat,
                    'date': str(snap.date)
                }
            else:
                results_map[idx] = {
                    'longitude': None,
                    'latitude': None,
                    'date': None
                }
    return results_map
यह तरीका tqdm में दिखने वाली दर को नाटकीय रूप से बढ़ा देता है। ध्यान देने की बात है कि प्रदर्शित स्पीड लूप के इटरेशन की रफ्तार दिखाती है, न कि पूरा एंड‑टू‑एंड रनटाइम। फिर भी फर्क साफ दिखता है: एक रन में प्रोग्रेस बार पर 27.03 coordinate/s से बढ़कर छह अंकों तक।
gather के साथ सबको एकसाथ शुरू करना
create_task के बिना भी आप coroutines को एक सूची में इकट्ठा कर सकते हैं और asyncio.gather के साथ सबको एकसाथ चला सकते हैं; यह परिणाम उसी क्रम में लौटाता है।
async def scan_points_parallel_gather(geo_map):
    results_map = {}
    async with ClientSession() as http:
        batch = []
        # सभी coroutines तैयार करें
        for ident, meta in tqdm(geo_map.items(), desc="Parsing coordinates", unit="coordinate"):
            coro = streetview.find_panorama_async(meta['latitude'], meta['longitude'], http)
            batch.append(coro)
        # सबको एकसाथ चलाएँ और परिणाम एकत्र करें
        payloads = await asyncio.gather(*batch)
        for idx, snap in enumerate(payloads):
            if snap is not None:
                results_map[idx] = {
                    'longitude': snap.lon,
                    'latitude': snap.lat,
                    'date': str(snap.date)
                }
            else:
                results_map[idx] = {
                    'longitude': None,
                    'latitude': None,
                    'date': None
                }
    return results_map
एक और रन में रिपोर्ट की गई दर और भी बढ़ गई। फिर भी, यह काउंटर कुल रनटाइम का माप नहीं है; यह बस दिखाता है कि IO ओवरलैप करके कितना idle समय बचा।
Semaphore के साथ समानांतरता चार तक सीमित करना
असीमित concurrency हमेशा वांछनीय नहीं होती। एक समय में केवल चार अनुरोध चालू रखने के लिए हर coroutine को एक semaphore से लपेटें। यह निष्पादन को नियंत्रित करता है और सुनिश्चित करता है कि अधिकतम एक तय संख्या में ही कार्य समांतर चलें।
async def within_limit(lock, label, job):
    async with lock:
        return await job
async def scan_points_bounded(geo_map, max_parallel=4):
    results_map = {}
    async with ClientSession() as http:
        token = asyncio.Semaphore(max_parallel)
        batch = []
        for ident, meta in tqdm(geo_map.items(), desc="Parsing coordinates", unit="coordinate"):
            job = within_limit(
                token,
                ident,
                streetview.find_panorama_async(meta['latitude'], meta['longitude'], http)
            )
            batch.append(job)
        final = await asyncio.gather(*batch)
        for idx, snap in enumerate(final):
            if snap is not None:
                results_map[idx] = {
                    'longitude': snap.lon,
                    'latitude': snap.lat,
                    'date': str(snap.date)
                }
            else:
                results_map[idx] = {
                    'longitude': None,
                    'latitude': None,
                    'date': None
                }
    return results_map
यह पैटर्न concurrency को नियंत्रण में रखता है और ठीक चार coordinate lookup समानांतर चलाने की जरूरत पूरी करता है। प्रयोगों के दौरान दृश्यता के लिए कृत्रिम sleep जोड़ सकते हैं, लेकिन वास्तविक निष्पादन के लिए यह जरूरी नहीं है।
क्रम बनाम पूरा होना
जब क्रम महत्वपूर्ण हो, तो asyncio.gather प्रस्तुत किए गए coroutines का मूल क्रम बनाए रखता है। जबकि जैसे ही कोई भी काम पूरा हो, आप परिणाम प्रोसेस करना चाहते हों, तो asyncio.as_completed इसके बजाय परिणामों को उनके पूरा होने के क्रम में देता है। आउटपुट कैसे समेटना है, उसके अनुसार मोड चुनें। प्रगति ट्रैक करने के लिए gather को async‑अनुकूल तरीके से tqdm के साथ जोड़ना भी संभव है।
यह क्यों मायने रखता है
लूप के भीतर await वाले पैटर्न IO को सीरियलाइज़ कर देते हैं और उपलब्ध concurrent क्षमता का कम उपयोग करते हैं। कई नेटवर्क‑बाउंड ऑपरेशन्स को एक साथ शुरू करके आप idle गैप हटाते हैं और वॉल‑क्लॉक समय घटाते हैं। व्यवहार में, सिर्फ create_task या gather पर स्विच करने से भी प्रोग्रेस काउंटर तेज़ी से उछलता दिखता है—जो स्पष्ट संकेत है कि काम कतारबद्ध होने के बजाय ओवरलैप हो रहा है। एक semaphore जोड़ने से आप बाहरी सेवाओं पर लोड नियंत्रित रखते हैं, बिना बाकी पाइपलाइन बदले।
निष्कर्ष
IO कॉल करने वाले लूप्स में await का उपयोग सीमित रखें। समानांतर अनुरोध चाहिए तो या तो create_task से coroutines शेड्यूल करें और बाद में await करें, या उन्हें इकट्ठा कर gather के साथ एकसाथ चलाएँ। जब कड़ी सीमा चाहिए हो, तो Semaphore को वांछित चौड़ाई (जैसे चार) पर सेट कर कॉल्स को रैप करें। परिणामों का क्रम चाहिए तो gather उसे बनाए रखता है; सबसे पहले पूरा होने वाले परिणामों को संभालना हो तो as_completed सही टूल है। ये पैटर्न आपके मौजूदा session और DataFrame चरणों के साथ आसानी से फिट हो जाते हैं और तुरंत दिखने वाले गति सुधार देते हैं।