2025, Sep 25 11:33
Streamlit–LangGraph इंटरव्यू फ्लो में GraphRecursionError का समाधान
Streamlit और LangGraph इंटरव्यू फ्लो में self-loop से आने वाला GraphRecursionError कैसे ठीक करें: interrupt() से रोकें और Command(resume) से आगे बढ़ाएँ—गाइड.
उपयोगकर्ता का डेटा एकत्र करके उसके बाद इंटरव्यू चलाने वाला संवादात्मक फ्लो बनाना कागज़ पर आसान लगता है—जब तक कि एक्ज़िक्यूशन इंजन बिना स्वाभाविक ठहराव के बार‑बार उसी नोड को कॉल करता नहीं रहता। यदि आप Streamlit को langgraph के साथ जोड़ रहे हैं और Candidate ID डालते ही ऐप क्रैश हो जाता है, तो अगला यूज़र जवाब आने का इंतज़ार करने के बजाय यह अनियंत्रित लूप GraphRecursionError पर जाकर रुकता है।
समस्या दोहराने का तरीका
नीचे दिया गया फ्लो प्रोफ़ाइल फ़ील्ड इकट्ठा करते समय एक self-transition पर लूप करता है। नोड स्टेट को "इनपुट का इंतज़ार" चिह्नित कर देता है और लौट आता है, लेकिन लूप की शर्त सच बनी रहती है और ग्राफ़ के भीतर एक्ज़िक्यूशन को रोकने वाला कुछ नहीं होने के कारण शेड्यूलर उसी नोड में बार‑बार प्रवेश करता रहता है।
def build_workflow():
    saver = MemorySaver()
    flow = StateGraph(InterviewState)
    flow.add_node("prompt_details", prompt_details)
    flow.add_node("craft_questions", craft_questions)
    flow.add_node("conduct_round", conduct_round)
    flow.add_edge(START, "prompt_details")
    flow.add_conditional_edges(
        "prompt_details",
        lambda st: "continue" if st["step_ptr"] < len(st["info_requirements"]) else "done",
        {
            "continue": "prompt_details",
            "done": "craft_questions",
        },
    )
    flow.add_conditional_edges(
        "craft_questions",
        lambda st: "continue" if st["topic_ptr"] < len(st["skills_pool"]) else "done",
        {
            "continue": "craft_questions",
            "done": "conduct_round",
        },
    )
    flow.add_conditional_edges(
        "conduct_round",
        lambda st: "continue" if st.get("q_ptr", 0) < len(
            st["per_skill_questions"].get(
                st["skills_pool"][st.get("topic_ptr", 0) - 1], []
            )
        ) else "done",
        {
            "continue": "conduct_round",
            "done": END,
        },
    )
    return flow.compile(checkpointer=saver)
class FlowRunner:
    def __init__(self):
        self.flow = build_workflow()
    async def begin(self, applicant_id: str):
        run_id = str(uuid.uuid4())
        boot_state = {
            "collected_profile": [],
            "skills_pool": [],
            "per_skill_questions": {},
            "info_requirements": CANDIDATE_INFORMATION_NEED,
            "step_ptr": 0,
            "topic_ptr": 0,
            "q_ptr": 0,
            "chat_log": [],
            "last_bot_message": None,
            "last_idx": None,
            "awaiting_reply": False,
            "pending_prompt": "",
            "thread_id": run_id,
        }
        conf = {"configurable": {"thread_id": run_id}}
        out = await self.flow.ainvoke(boot_state, conf)
        return run_id, out
async def prompt_details(sta: InterviewState) -> InterviewState:
    pos = sta.get("step_ptr", 0)
    llm = st.session_state.llm
    requested = sta["info_requirements"]
    if pos >= len(requested):
        sta["awaiting_reply"] = False
        return sta
    if sta.get("last_idx") != pos:
        qp = ChatPromptTemplate.from_messages([
            HumanMessagePromptTemplate.from_template(
                """
                You are an interview assistant named 'deep'.
                Politely ask the candidate for <data_to_ask>.
                If <information_index> == 2 (technical_skills), tell them to list their skills.
                <information_index>: {information_index}
                <data_to_ask>: {data_to_ask}
                """
            )
        ])
        prompt_text = (qp | llm).invoke({
            "data_to_ask": requested[pos],
            "information_index": pos,
        }).content
        sta["chat_log"].append({"role": "assistant", "content": prompt_text})
        sta["pending_prompt"] = prompt_text
        sta["last_idx"] = pos
        sta["awaiting_reply"] = True
    return staलॉन्च करते ही एप्लिकेशन कुछ इस तरह की त्रुटि फेंकता है:
GraphRecursionError: Recursion limit of 25 reached without hitting a stop condition.
असल में क्या गलत हो रहा है
ग्राफ़ प्रोफ़ाइल डेटा जुटाने वाले नोड पर self‑loop का इस्तेमाल करता है। ब्रांचिंग lambda यह जाँचती है कि पॉइंटर अब भी आवश्यक फ़ील्ड्स की सूची के भीतर है या नहीं; हाँ होने पर वह फिर उसी नोड पर रूट कर देता है। नोड स्वयं स्टेट को इनपुट की प्रतीक्षा में चिह्नित करता है और पॉइंटर बढ़ाए बिना लौट आता है। चूँकि पॉइंटर बदलता नहीं है और ग्राफ़ में कहीं भी स्पष्ट विराम नहीं है, शेड्यूलर तुरंत फिर उसी नोड में प्रवेश करता है और यह चक्र तब तक चलता है जब तक रिकर्शन सीमा नहीं छू जाती। awaiting_reply जैसी UI फ्लैग ग्राफ़ के निष्पादन को नहीं रोकती; इंजन को निलंबित करने के लिए एक ठोस सिग्नल चाहिए।
समाधान: interrupt() से ग्राफ़ रोकें और Command से फिर शुरू करें
जिस बिंदु पर यूज़र इनपुट अपेक्षित है, वहाँ लूप को रोकने का सही तरीका है—नोड के अंदर interrupt() कॉल करना। इससे नियंत्रण UI लेयर को लौट जाता है। जब यूज़र जवाब दे, तो Command(resume=<user_input>) के साथ ग्राफ़ को फिर चालू करें। लूप की शर्त जस‑की‑तस रहती है, पर नई जानकारी आने तक हर इटरेशन के बीच निष्पादन रुका रहता है।
from langgraph.types import interrupt
async def prompt_details(sta: InterviewState) -> InterviewState:
    pos = sta.get("step_ptr", 0)
    if pos >= len(sta["info_requirements"]):
        return sta
    asked = build_prompt_for(sta, pos)
    sta["chat_log"].append({"role": "assistant", "content": asked})
    reply = interrupt(asked)  # यहाँ तब तक रोकें जब तक resume कमांड न दी जाए
    sta["chat_log"].append({"role": "user", "content": reply})
    sta["collected_profile"].append(reply)
    sta["step_ptr"] = pos + 1
    return staExecution मैनेजर में, जैसे ही UI को अगला संदेश मिले, resume कमांड स्ट्रीम करके ग्राफ़ को आगे बढ़ाएँ।
from langgraph.types import Command
class FlowRunner:
    def __init__(self):
        self.flow = build_workflow()
    async def resume_with(self, run_id: str, user_text: str):
        conf = {"configurable": {"thread_id": run_id}}
        async for _ in self.flow.astream(Command(resume=user_text), conf, stream_mode="updates"):
            pass
        return self.flow.get_state(conf).valuesयह क्यों महत्वपूर्ण है
सिर्फ़ "waiting" जैसा स्टेट फ्लैग ग्राफ़ को नहीं रोकता; शेड्यूलर को स्पष्ट रुकावट बिंदु चाहिए। interrupt() ग्राफ़ और यूज़र इंटरफ़ेस के बीच साफ़‑सुथरा हैंडऑफ़ बनाता है, जिससे उसी नोड पर अनियंत्रित self‑recursion रुकती है। इसके बाद Command(resume=...) यूज़र का उत्तर निलंबित नोड तक पहुँचा देता है ताकि फ्लो नियत ढंग से आगे बढ़ सके।
मुख्य बातें
जब आप ऐसा self‑looping नोड बनाते हैं जो इनपुट एक‑एक करके लेता है, तो सवाल पूछते ही नोड के भीतर एक्ज़िक्यूशन रोकना ज़रूरी है। interrupt() ग्राफ़ को सुरक्षित रूप से थाम देता है, और Command(resume=...) के साथ दोबारा शुरू करने से लूप नियंत्रित और क्रमिक रहता है। इस पैटर्न के साथ जानकारी एकत्र करने का चरण उम्मीद के मुताबिक पूरा होता है, और इंटरव्यू बिना रिकर्शन सीमा से टकराए प्रश्न निर्माण व आगे की ओर सहजता से बढ़ सकता है।
यह लेख StackOverflow पर प्रश्न (लेखक: naruto007) और Cp Gowtham के उत्तर पर आधारित है।