2025, Sep 25 11:22
Предотвращаем GraphRecursionError в LangGraph и Streamlit с interrupt() и Command
LangGraph и Streamlit падают с GraphRecursionError? Показываем, как остановить зацикливание: ставьте interrupt() и возобновляйте через Command(resume).
Построить диалоговый сценарий, который собирает данные пользователя, а затем проводит интервью, кажется просто — до тех пор, пока движок исполнения не начинает без паузы вызывать один и тот же узел. Если вы используете связку Streamlit и langgraph и видите падение сразу после ввода Candidate ID, неконтролируемый цикл заканчивается ошибкой GraphRecursionError вместо ожидания следующего ответа пользователя.
Как воспроизвести проблему
Ниже показан флоу, который зацикливается на самопереходе во время сбора полей профиля. Узел помечает состояние как ожидающее ввода и возвращается, но планировщик снова и снова входит в этот же узел, потому что условие цикла остаётся истинным, а внутри графа ничего не блокирует выполнение.
def build_workflow():
    saver = MemorySaver()
    flow = StateGraph(InterviewState)
    flow.add_node("prompt_details", prompt_details)
    flow.add_node("craft_questions", craft_questions)
    flow.add_node("conduct_round", conduct_round)
    flow.add_edge(START, "prompt_details")
    flow.add_conditional_edges(
        "prompt_details",
        lambda st: "continue" if st["step_ptr"] < len(st["info_requirements"]) else "done",
        {
            "continue": "prompt_details",
            "done": "craft_questions",
        },
    )
    flow.add_conditional_edges(
        "craft_questions",
        lambda st: "continue" if st["topic_ptr"] < len(st["skills_pool"]) else "done",
        {
            "continue": "craft_questions",
            "done": "conduct_round",
        },
    )
    flow.add_conditional_edges(
        "conduct_round",
        lambda st: "continue" if st.get("q_ptr", 0) < len(
            st["per_skill_questions"].get(
                st["skills_pool"][st.get("topic_ptr", 0) - 1], []
            )
        ) else "done",
        {
            "continue": "conduct_round",
            "done": END,
        },
    )
    return flow.compile(checkpointer=saver)
class FlowRunner:
    def __init__(self):
        self.flow = build_workflow()
    async def begin(self, applicant_id: str):
        run_id = str(uuid.uuid4())
        boot_state = {
            "collected_profile": [],
            "skills_pool": [],
            "per_skill_questions": {},
            "info_requirements": CANDIDATE_INFORMATION_NEED,
            "step_ptr": 0,
            "topic_ptr": 0,
            "q_ptr": 0,
            "chat_log": [],
            "last_bot_message": None,
            "last_idx": None,
            "awaiting_reply": False,
            "pending_prompt": "",
            "thread_id": run_id,
        }
        conf = {"configurable": {"thread_id": run_id}}
        out = await self.flow.ainvoke(boot_state, conf)
        return run_id, out
async def prompt_details(sta: InterviewState) -> InterviewState:
    pos = sta.get("step_ptr", 0)
    llm = st.session_state.llm
    requested = sta["info_requirements"]
    if pos >= len(requested):
        sta["awaiting_reply"] = False
        return sta
    if sta.get("last_idx") != pos:
        qp = ChatPromptTemplate.from_messages([
            HumanMessagePromptTemplate.from_template(
                """
                You are an interview assistant named 'deep'.
                Politely ask the candidate for <data_to_ask>.
                If <information_index> == 2 (technical_skills), tell them to list their skills.
                <information_index>: {information_index}
                <data_to_ask>: {data_to_ask}
                """
            )
        ])
        prompt_text = (qp | llm).invoke({
            "data_to_ask": requested[pos],
            "information_index": pos,
        }).content
        sta["chat_log"].append({"role": "assistant", "content": prompt_text})
        sta["pending_prompt"] = prompt_text
        sta["last_idx"] = pos
        sta["awaiting_reply"] = True
    return staПри запуске приложение падает с ошибкой вроде:
GraphRecursionError: Recursion limit of 25 reached without hitting a stop condition.
Что на самом деле происходит
Граф использует самопетлю на узле, который отвечает за сбор данных профиля. Ветвление (лямбда) проверяет, остаётся ли указатель в пределах списка обязательных полей и, если да, возвращает в тот же узел. Сам узел помечает состояние как «ожидание ответа» и выходит, не продвигая указатель. Поскольку указатель не меняется и в графе нет явной паузы, планировщик немедленно снова входит в этот узел и повторяет цикл, пока не достигнет лимита рекурсии. Флаг интерфейса вроде awaiting_reply не останавливает выполнение графа; движку нужен конкретный сигнал на приостановку.
Решение: приостановить граф через interrupt() и продолжить через Command
Корректный способ остановить цикл в момент, когда ожидается ввод пользователя, — вызвать interrupt() внутри узла. Это передаёт управление обратно вашему UI-слою. Когда пользователь ответит, возобновите граф через Command(resume=<user_input>). Условие цикла остаётся прежним, но выполнение приостанавливается между итерациями до поступления новых данных.
from langgraph.types import interrupt
async def prompt_details(sta: InterviewState) -> InterviewState:
    pos = sta.get("step_ptr", 0)
    if pos >= len(sta["info_requirements"]):
        return sta
    asked = build_prompt_for(sta, pos)
    sta["chat_log"].append({"role": "assistant", "content": asked})
    reply = interrupt(asked)  # приостановить выполнение здесь, пока не будет передана команда возобновления
    sta["chat_log"].append({"role": "user", "content": reply})
    sta["collected_profile"].append(reply)
    sta["step_ptr"] = pos + 1
    return staВ менеджере выполнения продолжайте граф, отправляя команду на возобновление, когда UI получает следующее сообщение.
from langgraph.types import Command
class FlowRunner:
    def __init__(self):
        self.flow = build_workflow()
    async def resume_with(self, run_id: str, user_text: str):
        conf = {"configurable": {"thread_id": run_id}}
        async for _ in self.flow.astream(Command(resume=user_text), conf, stream_mode="updates"):
            pass
        return self.flow.get_state(conf).valuesПочему это важно
Надеяться на флаг состояния, сигнализирующий «ожидание», не приостанавливает граф: планировщику нужна явная точка прерывания. interrupt() создаёт чёткую передачу управления между графом и интерфейсом, предотвращая неконтролируемую саморекурсию одного и того же узла. Затем Command(resume=...) возвращает ответ пользователя в приостановленный узел, чтобы поток предсказуемо продолжался.
Выводы
Когда вы реализуете самозацикливающийся узел, который по одному собирает ответы, необходимо останавливать выполнение внутри узла ровно в тот момент, когда задаёте вопрос. interrupt() гарантирует паузу графа, а возобновление через Command(resume=...) делает цикл безопасным и пошаговым. С таким шаблоном сбор информации завершается как задумано, и интервью без проблем переходит к генерации вопросов и дальше, не упираясь в лимит рекурсии.
Статья основана на вопросе с StackOverflow от naruto007 и ответе Cp Gowtham.