2025, Oct 22 21:34

google.adk के साथ Python में स्टेटफुल async चैट फ्लो: सही तरीका

इस गाइड में Python और google.adk के साथ async स्टेटफुल सत्र चैट फ्लो सेटअप, Runner/run_async, InMemorySessionService पर await, आम त्रुटियाँ और समाधान विस्तार से समझें.

Python में google.adk के साथ स्टेटफुल चैट फ्लो तैयार करते समय, सिंक्रोनस कंट्रोल फ्लो को async API के साथ मिलाना आसानी से उलझन भरी त्रुटियों तक ले जा सकता है। एक आम स्थिति यह होती है: InMemorySessionService से सत्र शुरू करना, question_answering_agent के जरिए उपयोगकर्ता का संदेश भेजना, और अंत में सत्र की अंतिम स्थिति निकालना। यदि आप coroutine-आधारित मेथड्स को await किए बिना कॉल करते हैं या किसी non-awaited async जेनरेटर पर इटरेट करते हैं, तो पूरे रन में विफलताएँ क्रम से उभरती चली जाएँगी।

लक्षण

सत्र इनिशियलाइज़ करना, एजेंट के साथ बातचीत करना, और फिर session.state पढ़ना—इन सबकी कोशिश ने एक साथ कई मुद्दे पैदा कर दिए।

AttributeError: 'coroutine' object has no attribute 'state' and ValueError: Session not found (with RuntimeWarning: coroutine '...' was never awaited)

समस्या का उदाहरण

नीचे दिया गया स्क्रिप्ट सत्र बनाता है और एक प्रश्न‑उत्तर एजेंट चलाता है, लेकिन async मेथड्स को ऐसे चलाता है जैसे वे सिंक्रोनस हों।

import uuid
from dotenv import load_dotenv
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from question_answering_agent import question_answering_agent as qa_agent_node
load_dotenv()
# स्थिति को मेमोरी में संग्रहीत करें
store_backend = InMemorySessionService()
seed_state = {
    "user_name": "John Doe",
    "user_preferences": """
        I like to play Pickleball, Disc Golf, and Tennis.
        My favorite food is Mexican.
        My favorite TV show is Game of Thrones.
        Loves it when people like and subscribe to his YouTube channel.
    """,
}
# एक नया सत्र बनाएँ (गलत तरीके से, coroutine को await किए बिना)
APP_TITLE = "John Doe Bot"
ACCOUNT_ID = "john_doe"
TRACKING_ID = str(uuid.uuid4())
session_live = store_backend.create_session(
    app_name=APP_TITLE,
    user_id=ACCOUNT_ID,
    session_id=TRACKING_ID,
    state=seed_state,
)
print("CREATED NEW SESSION:")
print(f"\tSession ID: {TRACKING_ID}")
exec_engine = Runner(
    agent=qa_agent_node,
    app_name=APP_TITLE,
    session_service=store_backend,
)
payload_in = types.Content(
    role="user", parts=[types.Part(text="What is Johns favorite TV show?")]
)
# async जनरेटर के बजाय सिंक इटरेटर का उपयोग
for evt in exec_engine.run(
    user_id=ACCOUNT_ID,
    session_id=TRACKING_ID,
    new_message=payload_in,
):
    if evt.is_final_response():
        if evt.content and evt.content.parts:
            print(f"Final Response: {evt.content.parts[0].text}")
print("==== Session Event Exploration ====")
retrieved = store_backend.get_session(
    app_name=APP_TITLE, user_id=ACCOUNT_ID, session_id=TRACKING_ID
)
# अंतिम सत्र स्थिति लॉग करें (असफल होगा क्योंकि retrieved एक coroutine है, सत्र नहीं)
print("=== Final Session State ===")
for k, v in retrieved.state.items():
    print(f"{k}: {v}")

Runner द्वारा उपयोग किया गया एजेंट परिभाषा:

from google.adk.agents import Agent
qa_agent_node = Agent(
    name="qa_agent_node",
    model="gemini-2.0-flash",
    description="Question answering agent",
    instruction="""
    You are a helpful assistant that answers questions about the user's preferences.
    Here is some information about the user:
    Name: 
    {user_name}
    Preferences: 
    {user_preferences}
    """,
)

असल में गड़बड़ कहाँ हुई

मूल समस्या असिंक्रोनस फ्लो कंट्रोल की है। InMemorySessionService.create_session एक coroutine लौटाता है और उसे await करना ज़रूरी है। यही बात InMemorySessionService.get_session पर भी लागू होती है। रनर run_async के जरिए एक असिंक्रोनस जेनरेटर देता है, जिसे async for लूप से consume करना चाहिए। इन्हें सिंक्रोनस कॉल मान लेने पर पहले तो 'coroutine never awaited' वाली RuntimeWarning आती है, और उसके बाद .state को coroutine पर एक्सेस करने पर AttributeError तथा missing session वाली ValueError जैसी डाउनस्ट्रीम त्रुटियाँ दिखती हैं।

समाधान: asyncio में लपेटें और सही कॉल्स को await करें

उपाय यह है कि पूरा निष्पादन इवेंट लूप के भीतर करें, सत्र बनाते और पढ़ते समय await लगाएँ, और रनर के असिंक्रोनस जेनरेटर को async for से इटरेट करें। वातावरण वेरिएबल्स पहले लोड किए जाते हैं, और सभी कॉल्स में वही identifiers इस्तेमाल होते हैं। नीचे asyncio का एक पूरा, काम करने वाला संस्करण दिया है।

import uuid
import asyncio
import os
from dotenv import load_dotenv
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from google.genai.client import Client
from question_answering_agent import question_answering_agent as qa_agent_node
# वातावरण वेरिएबल्स लोड करें
load_dotenv()
api_key_value = os.getenv("GOOGLE_API_KEY")
async def boot():
    # सत्र के लिए स्टेटफुल स्टोरेज
    store_backend = InMemorySessionService()
    # एजेंट के साथ साझा की जाने वाली प्रारंभिक स्थिति
    seed_state = {
        "user_name": "John Doe",
        "user_preferences": """
            I like to play Pickleball, Disc Golf, and Tennis.
            My favorite food is Mexican.
            My favorite TV show is Game of Thrones.
            Loves it when people like and subscribe to his YouTube channel.
        """,
    }
    # एक नया सत्र बनाएँ
    APP_TITLE = "John Doe Bot"
    ACCOUNT_ID = "john_doe"
    TRACKING_ID = str(uuid.uuid4())
    # सत्र को वास्तव में बनाने के लिए coroutine को await करें
    created_session = await store_backend.create_session(
        app_name=APP_TITLE,
        user_id=ACCOUNT_ID,
        session_id=TRACKING_ID,
        state=seed_state,
    )
    print("CREATED NEW SESSION:")
    print(f"\tSession ID: {TRACKING_ID}")
    # एजेंट चलाने से पहले छोटा विराम
    await asyncio.sleep(0.1)
    exec_engine = Runner(
        agent=qa_agent_node,
        app_name=APP_TITLE,
        session_service=store_backend,
    )
    payload_in = types.Content(
        role="user", parts=[types.Part(text="What is Johns favorite TV show?")]
    )
    # असिंक्रोनस इवेंट स्ट्रीम को उपभोग करें
    async for evt in exec_engine.run_async(
        user_id=ACCOUNT_ID,
        session_id=TRACKING_ID,
        new_message=payload_in,
    ):
        if evt.is_final_response():
            if evt.content and evt.content.parts:
                print(f"Final Response: {evt.content.parts[0].text}")
    print("==== Session Event Exploration ====")
    # अंतिम स्थिति देखने के लिए सत्र प्राप्ति को await करें
    fetched_session = await store_backend.get_session(
        app_name=APP_TITLE, user_id=ACCOUNT_ID, session_id=TRACKING_ID
    )
    print("=== Final Session State ===")
    if fetched_session and hasattr(fetched_session, "state"):
        for k, v in fetched_session.state.items():
            print(f"{k}: {v}")
    else:
        print("Session or session state could not be retrieved.")
if __name__ == "__main__":
    asyncio.run(boot())

यह क्यों मायने रखता है

google.adk के साथ स्टेटफुल ऑर्केस्ट्रेशन असिंक्रोनस सीमाओं पर निर्भर करता है। await बिंदुओं की अनदेखी न सिर्फ इवेंट स्ट्रीम तोड़ती है; कॉलर के दृष्टिकोण से सत्र अनइनिशियलाइज़्ड भी रह जाता है, इसलिए session.state पढ़ने की कोई भी कोशिश विफल होती है या ऐसा लगता है मानो सत्र मौजूद ही नहीं। अपने कंट्रोल फ्लो को लाइब्रेरी के async मॉडल के अनुरूप रखने से एजेंट को संदर्भ मिलता है, वह final responses देता है, और सत्र निरीक्षण के लिए संगत बना रहता है।

मुख्य निष्कर्ष

पूरा फ्लो इवेंट लूप के अंदर चलाएँ, create_session और get_session को await करें, और रनर के run_async स्ट्रीम पर async for से इटरेट करें। निर्माण और उपभोग के दौरान एक ही session_id रखें, और शुरू करने से पहले वातावरण वेरिएबल्स लोड कर लें। इन बातों के साथ, स्टेटफुल सत्र एंड‑टू‑एंड चलता है और अंतिम स्थिति लॉगिंग के लिए उपलब्ध रहती है।

यह लेख StackOverflow पर प्रश्न PinkBanter द्वारा और उसी उपयोगकर्ता PinkBanter के उत्तर पर आधारित है।