2025, Oct 20 09:35

बड़ी इमेज फ़ाइलें तेज़ी से डाउनलोड करें: async बनाम थ्रेड‑पूल

Python में बड़ी फ़ाइलों का async डाउनलोड क्यों धीमा पड़ता है और फाइलें कटती हैं—समाधान: सीमित समांतरता, स्ट्रीमिंग I/O, त्रुटि जाँच. aiohttp, httpx व ThreadPool तुलना.

नेटवर्क पर बड़े मीडिया को डाउनलोड करना सुनने में आसान लगता है, लेकिन जैसे ही समांतरता (concurrency), स्ट्रीमिंग और I/O समन्वय आते हैं, तस्वीर बदल जाती है. एक आम गलती है “पूरी तरह async” कोड लिखना, जो दर्जनों अनुरोध एक साथ चला देता है—और फिर हैरानी होती है कि यह छोटे थ्रेड‑पूल से भी धीमा क्यों है या, इससे बुरा, सेव हुई फाइलें खराब क्यों निकलीं. आइए एक वास्तविक केस से गुज़रते हैं, समझते हैं कि async संस्करण क्यों अटकता है और अधूरी फाइलें बनाता है, और बिना समग्र वास्तुकला बदले इसे ठीक करते हैं.

समस्या की रूपरेखा

काम है कई URL से एक साथ इमेज प्राप्त करना. एक छोटे पूल वाला थ्रेड‑आधारित समाधान ठीक प्रदर्शन देता है. मगर async संस्करण अप्रत्याशित रूप से धीमा है और जाँच करने पर पता चलता है कि वह अधूरी फाइलें लिख देता है जो खुलती ही नहीं.

समस्या दिखाने वाला न्यूनतम async उदाहरण

नीचे दिया स्निपेट हर URL के लिए एक टास्क शुरू करता है, पूरे HTTP उत्तर को मेमोरी में पढ़ता है और एक बार में डिस्क पर लिख देता है. ऊपर‑ऊपर सब ठीक लगता है, लेकिन यहीं से सुस्ती और खराब आउटपुट दिखने लगते हैं.

import asyncio
import aiohttp
import aiofiles
import os
import time
async def fetch_image_raw(link, out_dir, sess):
    async with sess.get(link) as resp:
        data = await resp.read()
        name = link.split('/')[-1]
        dest = os.path.join(out_dir, name)
        async with aiofiles.open(dest, "wb") as fh:
            await fh.write(data)
async def run_all():
    async with aiohttp.ClientSession() as sess:
        tasks = [fetch_image_raw(u, "async_slow", sess) for u in links]
        await asyncio.gather(*tasks)
if __name__ == "__main__":
    links = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    os.makedirs("async_slow", exist_ok=True)
    t0 = time.time()
    asyncio.run(run_all())
    t1 = time.time()
    print(f"Asyncio download time: {t1 - t0:.2f} seconds")

असल में हो क्या रहा है

दो अवलोकन इस व्यवहार को स्पष्ट करते हैं. पहला, फाइलें पूरी डाउनलोड नहीं हो रहीं; आउटपुट मौजूद है लेकिन खुलता नहीं, जो डिकोडिंग नहीं बल्कि डाउनलोड त्रुटि की ओर इशारा करता है. दूसरा, थ्रेड‑आधारित समाधान max_workers=3 के साथ समानांतरता को बांधता है, जबकि async संस्करण सभी एंडपॉइंट्स को एक साथ हिट करने की कोशिश करता है. async फ्लो में समकालिक कॉल्स की संख्या थ्रेड‑पूल के स्तर तक घटाने से यह असंगति खत्म हो जाती है. जब समांतरता बराबर कर दी जाती है और डाउनलोड ठीक से डिस्क पर स्ट्रीम होते हैं, तो दोनों दृष्टिकोण लगभग समान समय में पूर्ण होते हैं.

समाधान: प्रतिक्रियाएँ स्ट्रीम करें, त्रुटियाँ संभालें और समांतरता सीमित करें

सुधारा गया async संस्करण इन‑फ्लाइट अनुरोधों की संख्या सीमित करने के लिए सेमाफोर का प्रयोग करता है, डाटा को टुकड़ों में डिस्क पर लिखता है, और अनुरोध त्रुटियों की जाँच करता है. इससे डाउनलोड स्थिर रहते हैं और अधूरी फाइलों की समस्या समाप्त हो जाती है.

from functools import wraps
import aiofiles
import aiohttp
import asyncio
import os
import time
def cap_parallelism(limit=10):
    gate = asyncio.Semaphore(limit)
    def decorator(fn):
        @wraps(fn)
        async def inner(*args, **kwargs):
            async with gate:
                return await fn(*args, **kwargs)
        return inner
    return decorator
@cap_parallelism(limit=3)
async def fetch_streaming(sess: aiohttp.ClientSession, link: str, dest_path: str):
    try:
        async with sess.get(link, headers={'User-Agent': 'Downloader/1.0'}) as res:
            assert res.status == 200
            async with aiofiles.open(dest_path, mode='wb') as out:
                async for chunk in res.content.iter_chunked(8192):
                    await out.write(chunk)
    except Exception as exc:
        print(f"Error retrieving {link}: {exc}")
def to_path(root: str, link: str) -> str:
    name = link.split('/')[-1]
    return os.path.join(root, name)
async def run_fixed():
    urls = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    target_dir = 'async_fixed'
    os.makedirs(target_dir, exist_ok=True)
    async with aiohttp.ClientSession() as sess:
        jobs = [fetch_streaming(sess, u, to_path(target_dir, u)) for u in urls]
        await asyncio.gather(*jobs)
if __name__ == '__main__':
    start = time.time()
    asyncio.run(run_fixed())
    stop = time.time()
    print(f"Asyncio download time: {stop - start:.2f} seconds")

यदि आप थ्रेड‑आधारित रूपांतर पसंद करते हैं, तो समान सुधार है—requests से प्रतिक्रिया को स्ट्रीम करें और पूल का आकार तीन रखें. यह ऊपर दिए async सीमा से मेल खाता है और समान परिणाम देता है.

from concurrent import futures
import os
import requests
import time
def stream_download(link, out_file):
    try:
        with requests.get(link, stream=True, headers={'User-Agent': 'Downloader/1.0'}) as resp:
            resp.raise_for_status()
            with open(out_file, 'wb') as sink:
                for block in resp.iter_content(chunk_size=8192):
                    if not block:
                        break
                    sink.write(block)
    except Exception as exc:
        print(f"Error retrieving {link}: {exc}")
def to_dest(folder, link):
    name = link.split('/')[-1]
    return os.path.join(folder, name)
def run_pool():
    urls = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    out_dir = 'threadpool_fixed'
    os.makedirs(out_dir, exist_ok=True)
    with futures.ThreadPoolExecutor(max_workers=3) as pool:
        pool.map(lambda u: stream_download(u, to_dest(out_dir, u)), urls)
if __name__ == '__main__':
    t0 = time.time()
    run_pool()
    t1 = time.time()
    print(f"ThreadPoolExecutor download time: {t1 - t0:.2f} seconds")

एक वैकल्पिक async HTTP क्लाइंट के रूप में, httpx भी इन्हीं सीमाओं में वही परिणाम देता है. संरचना समान है: समांतरता सीमित रखें, चंक्स में डिस्क पर स्ट्रीम करें, और HTTP त्रुटियाँ सतह पर लाएँ.

from functools import wraps
import aiofiles
import asyncio
import httpx
import os
import time
def cap_parallelism(limit=10):
    gate = asyncio.Semaphore(limit)
    def decorator(fn):
        @wraps(fn)
        async def inner(*args, **kwargs):
            async with gate:
                return await fn(*args, **kwargs)
        return inner
    return decorator
@cap_parallelism(limit=3)
async def fetch_httpx(client: httpx.AsyncClient, link: str, dest: str):
    try:
        resp = await client.get(link, headers={'User-Agent': 'Downloader/1.0'})
        resp.raise_for_status()
        async with aiofiles.open(dest, 'wb') as out:
            for chunk in resp.iter_bytes(chunk_size=8192):
                await out.write(chunk)
    except Exception as exc:
        print(f"Error retrieving {link}: {exc}")
def make_path(root: str, link: str) -> str:
    name = link.split('/')[-1]
    return os.path.join(root, name)
async def run_httpx():
    urls = [
        "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
        "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
    ] * 2
    out_dir = 'httpx_async'
    os.makedirs(out_dir, exist_ok=True)
    async with httpx.AsyncClient() as client:
        jobs = [fetch_httpx(client, u, make_path(out_dir, u)) for u in urls]
        await asyncio.gather(*jobs)
if __name__ == '__main__':
    start = time.time()
    asyncio.run(run_httpx())
    stop = time.time()
    print(f"Asyncio download time: {stop - start:.2f} seconds")

यह क्यों मायने रखता है

“लोकली काम करता है” और “स्केल पर भरोसेमंद काम करता है” के बीच का फासला अक्सर समांतरता और I/O की बारीकियों में छिपा रहता है. अनियंत्रित async अनुरोध दूरस्थ सर्वरों या आपके अपने परिवेश को ओवरलोड कर सकते हैं और सूक्ष्म विफलताएँ जन्म दे सकते हैं, जैसे कटे‑फटे (truncated) फाइलें. इम्प्लीमेंटेशनों के बीच समांतरता को मिलाना और प्रतिक्रियाओं को स्ट्रीम करना, शुद्धता और थ्रूपुट—दोनों—को संबोधित करता है. इन समायोजनों के तहत की गई मापी गई तुलना में, थ्रेड‑पूल और async समाधान लगभग समान समय में पूर्ण होते हैं, जो तरीका चुनते समय एक उपयोगी व्यावहारिक जाँच है.

मुख्य निष्कर्ष

बड़े एसेट्स को साथ‑साथ डाउनलोड करते समय, नेटवर्क और डिस्क को स्ट्रीमिंग पाइपलाइन की तरह समझें. समांतरता सोच‑समझकर सीमित करें, HTTP त्रुटियों की जाँच करें, और डिस्क पर क्रमशः लिखें. इन सिद्धांतों का पालन करें—चाहे छोटा ThreadPoolExecutor हो, aiohttp हो या httpx—तो स्थिर व्यवहार और तुलनीय प्रदर्शन की अपेक्षा कर सकते हैं.

यह लेख StackOverflow पर प्रश्न (लेखक: Daniil Yefimov) और Detlef के उत्तर पर आधारित है.