2025, Sep 25 11:00

Prevent GraphRecursionError in LangGraph interview flows: pause with interrupt() and resume with Command(resume)

Hit a GraphRecursionError in a LangGraph + Streamlit interview flow? Pause self-looping nodes with interrupt() and resume with Command(resume) to proceed.

Building a conversational flow that collects user data and then runs an interview sounds straightforward until the execution engine keeps calling the same node without a natural pause. If you are combining Streamlit with langgraph and see a crash right after entering a Candidate ID, the runaway loop ends with a GraphRecursionError rather than waiting for the next user reply.

Reproducing the issue

The flow below loops over a self-transition while collecting profile fields. The node marks the state as waiting for input and returns, but the scheduler continues to re-enter the node because the loop condition remains true and nothing blocks execution inside the graph.

def build_workflow():
    saver = MemorySaver()
    flow = StateGraph(InterviewState)

    flow.add_node("prompt_details", prompt_details)
    flow.add_node("craft_questions", craft_questions)
    flow.add_node("conduct_round", conduct_round)

    flow.add_edge(START, "prompt_details")

    flow.add_conditional_edges(
        "prompt_details",
        lambda st: "continue" if st["step_ptr"] < len(st["info_requirements"]) else "done",
        {
            "continue": "prompt_details",
            "done": "craft_questions",
        },
    )

    flow.add_conditional_edges(
        "craft_questions",
        lambda st: "continue" if st["topic_ptr"] < len(st["skills_pool"]) else "done",
        {
            "continue": "craft_questions",
            "done": "conduct_round",
        },
    )

    flow.add_conditional_edges(
        "conduct_round",
        lambda st: "continue" if st.get("q_ptr", 0) < len(
            st["per_skill_questions"].get(
                st["skills_pool"][st.get("topic_ptr", 0) - 1], []
            )
        ) else "done",
        {
            "continue": "conduct_round",
            "done": END,
        },
    )

    return flow.compile(checkpointer=saver)


class FlowRunner:
    def __init__(self):
        self.flow = build_workflow()

    async def begin(self, applicant_id: str):
        run_id = str(uuid.uuid4())
        boot_state = {
            "collected_profile": [],
            "skills_pool": [],
            "per_skill_questions": {},
            "info_requirements": CANDIDATE_INFORMATION_NEED,
            "step_ptr": 0,
            "topic_ptr": 0,
            "q_ptr": 0,
            "chat_log": [],
            "last_bot_message": None,
            "last_idx": None,
            "awaiting_reply": False,
            "pending_prompt": "",
            "thread_id": run_id,
        }
        conf = {"configurable": {"thread_id": run_id}}
        out = await self.flow.ainvoke(boot_state, conf)
        return run_id, out


async def prompt_details(sta: InterviewState) -> InterviewState:
    pos = sta.get("step_ptr", 0)
    llm = st.session_state.llm
    requested = sta["info_requirements"]

    if pos >= len(requested):
        sta["awaiting_reply"] = False
        return sta

    if sta.get("last_idx") != pos:
        qp = ChatPromptTemplate.from_messages([
            HumanMessagePromptTemplate.from_template(
                """
                You are an interview assistant named 'deep'.
                Politely ask the candidate for <data_to_ask>.
                If <information_index> == 2 (technical_skills), tell them to list their skills.

                <information_index>: {information_index}
                <data_to_ask>: {data_to_ask}
                """
            )
        ])

        prompt_text = (qp | llm).invoke({
            "data_to_ask": requested[pos],
            "information_index": pos,
        }).content

        sta["chat_log"].append({"role": "assistant", "content": prompt_text})
        sta["pending_prompt"] = prompt_text
        sta["last_idx"] = pos
        sta["awaiting_reply"] = True

    return sta

On launch, the application throws a failure similar to:

GraphRecursionError: Recursion limit of 25 reached without hitting a stop condition.

What actually goes wrong

The graph uses a self-loop on the node responsible for gathering profile data. The branching lambda checks whether the pointer is still within the list of required fields and, if so, routes back to the same node. The node itself flags the state as waiting for input and returns without advancing the pointer. Because the pointer does not change and there is no explicit pause in the graph, the scheduler immediately re-enters the node and repeats this cycle until it hits the recursion limit. A UI flag like awaiting_reply does not stop graph execution; the engine needs a concrete signal to suspend.

The fix: pause the graph with interrupt() and resume with Command

The correct way to stop the loop at the point where user input is expected is to use interrupt() inside the node. This yields control back to your UI layer. When the user replies, resume the graph with Command(resume=<user_input>). The loop condition remains intact, but the execution halts between iterations until new data arrives.

from langgraph.types import interrupt

async def prompt_details(sta: InterviewState) -> InterviewState:
    pos = sta.get("step_ptr", 0)
    if pos >= len(sta["info_requirements"]):
        return sta

    asked = build_prompt_for(sta, pos)
    sta["chat_log"].append({"role": "assistant", "content": asked})

    reply = interrupt(asked)  # suspend here until a resume command is provided

    sta["chat_log"].append({"role": "user", "content": reply})
    sta["collected_profile"].append(reply)
    sta["step_ptr"] = pos + 1
    return sta

In the execution manager, continue the graph by streaming a resume command when the UI receives the next message.

from langgraph.types import Command

class FlowRunner:
    def __init__(self):
        self.flow = build_workflow()

    async def resume_with(self, run_id: str, user_text: str):
        conf = {"configurable": {"thread_id": run_id}}
        async for _ in self.flow.astream(Command(resume=user_text), conf, stream_mode="updates"):
            pass
        return self.flow.get_state(conf).values

Why this matters

Relying on a state flag to signal “waiting” does not pause the graph; the scheduler needs an explicit interruption point. interrupt() creates a clear handoff between the graph and the user interface, preventing uncontrolled self-recursion on the same node. Command(resume=...) then delivers the user’s answer back into the suspended node so the flow can progress deterministically.

Takeaways

When implementing a self-looping node that collects inputs one by one, you must stop execution inside the node at the exact moment you ask a question. Using interrupt() ensures the graph halts, and resuming with Command(resume=...) keeps the loop safe and incremental. With this pattern in place, the information collection step completes as intended, and the interview can continue to question generation and beyond without hitting a recursion limit.

The article is based on a question from StackOverflow by naruto007 and an answer by Cp Gowtham.