2025, Oct 06 21:31

multiprocessing.Event के साथ asyncio सर्वर को सही तरीके से बंद करें

जानें कैसे multiprocessing.Event को asyncio के साथ सुरक्षित रूप से जोड़कर TCP सर्वर को बिना पोलिंग के बंद करें: run_in_executor और थ्रेड ब्रिज तरीके।

multiprocessing.Event की मदद से किसी दूसरे प्रोसेस से asyncio-आधारित सर्वर को रोकना ऊपर से आसान लगता है, लेकिन क्रॉस‑प्रोसेस सिंक्रोनाइज़ेशन को async इवेंट लूप के साथ मिलाने पर यह अक्सर अविश्वसनीय पोलिंग लूप में बदल जाता है। मुख्य चुनौती यह है कि टर्मिनेशन सिग्नल पर ऐसे प्रतिक्रिया दें कि लूप रुके नहीं और समय‑संवेदनशील जाँचों पर निर्भरता न रहे।

समस्या विवरण

एक async TCP सर्वर पर विचार करें जो समय-समय पर multiprocessing.Event को पोल करके तय करता है कि कब बाहर निकलना है:

self._stop_signal = multiprocessing.Event()
tcp_srv = await asyncio.start_server(
    self.on_client,
    self.bind_host,
    self.bind_port,
    backlog=self.backlog_size
)
async with tcp_srv:
    while not self._stop_signal.is_set():
        await asyncio.sleep(0.01)

उम्मीद रहती है कि कोई दूसरा प्रोसेस इवेंट पर set() बुलाए, लूप उसे देख ले और सर्वर बंद हो जाए। व्यवहार में, यह पोलिंग डांवाडोल हो सकती है: कभी निकल जाता है, कभी नहीं।

क्या हो रहा है

asyncio लूप के भीतर sleep के साथ multiprocessing.Event को पोल करना स्वभावतः समय‑संवेदी है। लूप तय अंतराल पर जागता है और फ़्लैग जाँचता है, जिससे रेस कंडीशनों की गुंजाइश बनती है। इससे भी बढ़कर, साधारण नियम यह है कि asyncio में ब्लॉकिंग multiprocessing प्रिमिटिव्स को सीधे नहीं बुलाना चाहिए। इंतज़ार को किसी थ्रेड पर ऑफ़लोड करना या उसे asyncio.Event से जोड़ना, स्टेट को बार‑बार जाँचने से कहीं सुरक्षित तरीका है।

एक और बारीकी है। साधारण फ़्लैग चेक अपने आप में सिंक्रोनस और नॉन‑ब्लॉकिंग है, लेकिन sleep के साथ टाइट पोल पर भरोसा करना टर्मिनेशन सिग्नलिंग के लिए फिर भी नाज़ुक रहता है। ऐसा समर्पित इंतज़ार जो इवेंट लूप को ब्लॉक न करे, ज़्यादा स्वच्छ डिज़ाइन है।

एक भरोसेमंद तरीका

ब्लॉकिंग wait को किसी थ्रेड में चलाएँ, और asyncio से उस पर await करें। इससे प्रोसेस‑स्तरीय सिंक्रोनाइज़ेशन इवेंट लूप से अलग हो जाता है और ऐड‑हॉक पोलिंग की ज़रूरत खत्म होती है।

import asyncio
import multiprocessing
async def await_shutdown(stop_evt):
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, stop_evt.wait)
# In the server task:
async with tcp_srv:
    shutdown_task = asyncio.create_task(
        await_shutdown(self._stop_signal)
    )
    await shutdown_task

यह पैटर्न सुनिश्चित करता है कि थ्रेड प्रोसेस‑स्तरीय इवेंट का इंतज़ार करता रहे तब भी इवेंट लूप प्रतिक्रियाशील बना रहे। इवेंट सेट होते ही टास्क पूरा हो जाता है और आप शटडाउन लॉजिक आगे बढ़ा सकते हैं।

विकल्प: asyncio.Event से ब्रिज करना

दूसरा विकल्प यह है कि बैकग्राउंड थ्रेड की मदद से multiprocessing.Event को asyncio.Event से जोड़ दें। थ्रेड प्रोसेस इवेंट पर ब्लॉक रहता है, और फिर लूप‑सुरक्षित तरीके से async इवेंट को सिग्नल कर देता है।

class EventRelay:
    def __init__(self, proc_evt):
        self.proc_evt = proc_evt
        self.aio_evt = asyncio.Event()
    def launch_watch(self):
        def watcher():
            self.proc_evt.wait()
            asyncio.run_coroutine_threadsafe(
                self._notify_aio(), asyncio.get_running_loop()
            )
        threading.Thread(target=watcher, daemon=True).start()
    async def wait(self):
        await self.aio_evt.wait()

इससे ब्लॉकिंग सिंक्रोनाइज़ेशन और async रनटाइम के बीच विभाजन बना रहता है, जबकि बाकी सिस्टम asyncio‑नेटिव प्रिमिटिव का इस्तेमाल कर सकता है।

यह क्यों मायने रखता है

Async सर्वर एक प्रतिक्रियाशील इवेंट लूप पर निर्भर होते हैं। ब्लॉकिंग कॉल डालना या पोलिंग लूप पर निर्भर रहना इसे कमजोर कर देता है। इंतज़ार को थ्रेड में अलग करना या asyncio प्रिमिटिव्स से ब्रिज करना आपके लूप को स्वस्थ रखता है और बिनाकिसी मनमाने sleep अंतराल पर निर्भर हुए शटडाउन को निश्चित बनाता है।

निष्कर्ष

यदि आपको multiprocessing.Event के आधार पर asyncio कोड रोकना है, तो उसे लूप के भीतर पोल न करें। या तो run_in_executor (या समकक्ष हेल्पर) के ज़रिये इवेंट की wait() को किसी थ्रेड में चलाएँ, या बैकग्राउंड थ्रेड से उसे asyncio.Event तक ब्रिज करें। दोनों तरीके लूप को ब्लॉक होने से बचाते हैं और शटडाउन नियंत्रण से टाइमिंग की अनिश्चितता हटाते हैं।

यह लेख StackOverflow पर प्रश्न (लेखक: UnemployedBrat) और Mahrez BenHamad के उत्तर पर आधारित है।