2025, Oct 22 21:18

Как правильно работать с google.adk в Python: await, run_async и состояние сессии

Разбираем ошибки async в Python с google.adk: await для create_session и get_session, async for с run_async, как избежать coroutine was never awaited ошибок.

Когда вы настраиваете состояние чат-потока с google.adk на Python, смешивание синхронной логики с асинхронными API — прямой путь к запутанным ошибкам. Типичный сценарий: вы создаёте сессию через InMemorySessionService, отправляете сообщение пользователя в question_answering_agent, а затем пытаетесь вывести итоговое состояние сессии. Если вы вызываете методы-корутины без await или итерируете асинхронный генератор, не дождавшись его, ошибки начнут сыпаться одна за другой.

Симптомы

Попытка инициализировать сессию, пообщаться с агентом и затем прочитать session.state приводит сразу к нескольким проблемам.

AttributeError: 'coroutine' object has no attribute 'state' and ValueError: Session not found (with RuntimeWarning: coroutine '...' was never awaited)

Пример проблемы

Ниже скрипт, который поднимает сессию и запускает агент для ответов на вопросы, но обращается к асинхронным методам так, будто они синхронные.

import uuid
from dotenv import load_dotenv
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from question_answering_agent import question_answering_agent as qa_agent_node
load_dotenv()
# Хранение состояния в памяти
store_backend = InMemorySessionService()
seed_state = {
    "user_name": "John Doe",
    "user_preferences": """
        I like to play Pickleball, Disc Golf, and Tennis.
        My favorite food is Mexican.
        My favorite TV show is Game of Thrones.
        Loves it when people like and subscribe to his YouTube channel.
    """,
}
# Создание НОВОЙ сессии (ошибочно, без ожидания корутины)
APP_TITLE = "John Doe Bot"
ACCOUNT_ID = "john_doe"
TRACKING_ID = str(uuid.uuid4())
session_live = store_backend.create_session(
    app_name=APP_TITLE,
    user_id=ACCOUNT_ID,
    session_id=TRACKING_ID,
    state=seed_state,
)
print("CREATED NEW SESSION:")
print(f"\tSession ID: {TRACKING_ID}")
exec_engine = Runner(
    agent=qa_agent_node,
    app_name=APP_TITLE,
    session_service=store_backend,
)
payload_in = types.Content(
    role="user", parts=[types.Part(text="What is Johns favorite TV show?")]
)
# Использование синхронного итератора вместо асинхронного генератора
for evt in exec_engine.run(
    user_id=ACCOUNT_ID,
    session_id=TRACKING_ID,
    new_message=payload_in,
):
    if evt.is_final_response():
        if evt.content and evt.content.parts:
            print(f"Final Response: {evt.content.parts[0].text}")
print("==== Session Event Exploration ====")
retrieved = store_backend.get_session(
    app_name=APP_TITLE, user_id=ACCOUNT_ID, session_id=TRACKING_ID
)
# Вывод финального состояния сессии (падает, потому что retrieved — корутина, а не объект сессии)
print("=== Final Session State ===")
for k, v in retrieved.state.items():
    print(f"{k}: {v}")

Определение агента, которое использует раннер:

from google.adk.agents import Agent
qa_agent_node = Agent(
    name="qa_agent_node",
    model="gemini-2.0-flash",
    description="Question answering agent",
    instruction="""
    You are a helpful assistant that answers questions about the user's preferences.
    Here is some information about the user:
    Name: 
    {user_name}
    Preferences: 
    {user_preferences}
    """,
)

Что именно пошло не так

Суть сбоя — в управлении асинхронным потоком. InMemorySessionService.create_session возвращает корутину, и её нужно ожидать через await. То же относится к InMemorySessionService.get_session. Раннер предоставляет асинхронный генератор через run_async, итерироваться по нему следует с помощью async for. Если обращаться к этим вызовам как к синхронным, появится RuntimeWarning о «coroutine was never awaited», а затем — вторичные ошибки: AttributeError при попытке получить .state у корутины и ValueError о том, что сессия не найдена.

Исправление: обернуть в asyncio и await нужные вызовы

Решение — выполнять код внутри событийного цикла, ожидать создание и получение сессии и итерироваться по асинхронному генератору раннера. Переменные окружения загружаются заранее, а одни и те же идентификаторы используются во всех вызовах. Ниже — полностью рабочая версия на asyncio.

import uuid
import asyncio
import os
from dotenv import load_dotenv
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from google.genai.client import Client
from question_answering_agent import question_answering_agent as qa_agent_node
# Загрузка переменных окружения
load_dotenv()
api_key_value = os.getenv("GOOGLE_API_KEY")
async def boot():
    # Хранилище состояния для сессии
    store_backend = InMemorySessionService()
    # Исходное состояние, доступное агенту
    seed_state = {
        "user_name": "John Doe",
        "user_preferences": """
            I like to play Pickleball, Disc Golf, and Tennis.
            My favorite food is Mexican.
            My favorite TV show is Game of Thrones.
            Loves it when people like and subscribe to his YouTube channel.
        """,
    }
    # Создание НОВОЙ сессии
    APP_TITLE = "John Doe Bot"
    ACCOUNT_ID = "john_doe"
    TRACKING_ID = str(uuid.uuid4())
    # Ожидание корутины для фактического создания сессии
    created_session = await store_backend.create_session(
        app_name=APP_TITLE,
        user_id=ACCOUNT_ID,
        session_id=TRACKING_ID,
        state=seed_state,
    )
    print("CREATED NEW SESSION:")
    print(f"\tSession ID: {TRACKING_ID}")
    # Небольшая пауза перед запуском агента
    await asyncio.sleep(0.1)
    exec_engine = Runner(
        agent=qa_agent_node,
        app_name=APP_TITLE,
        session_service=store_backend,
    )
    payload_in = types.Content(
        role="user", parts=[types.Part(text="What is Johns favorite TV show?")]
    )
    # Чтение асинхронного потока событий
    async for evt in exec_engine.run_async(
        user_id=ACCOUNT_ID,
        session_id=TRACKING_ID,
        new_message=payload_in,
    ):
        if evt.is_final_response():
            if evt.content and evt.content.parts:
                print(f"Final Response: {evt.content.parts[0].text}")
    print("==== Session Event Exploration ====")
    # Ожидание получения сессии, чтобы посмотреть финальное состояние
    fetched_session = await store_backend.get_session(
        app_name=APP_TITLE, user_id=ACCOUNT_ID, session_id=TRACKING_ID
    )
    print("=== Final Session State ===")
    if fetched_session and hasattr(fetched_session, "state"):
        for k, v in fetched_session.state.items():
            print(f"{k}: {v}")
    else:
        print("Session or session state could not be retrieved.")
if __name__ == "__main__":
    asyncio.run(boot())

Почему это важно

Состоящая из шагов оркестрация с google.adk опирается на асинхронные границы. Игнорирование точек await не только ломает поток событий: с точки зрения вызывающего кода сессия остаётся неинициализированной, поэтому попытки прочитать session.state завершаются сбоем или выглядят так, будто сессии не существует. Когда управление потоком согласовано с асинхронной моделью библиотеки, агент получает контекст, выдаёт финальные ответы, а сессия остаётся согласованной и доступной для проверки.

Выводы

Запускайте весь процесс внутри event loop, добавляйте await для create_session и get_session и обходите поток run_async через async for. Держите один и тот же session_id от создания до обработки и убедитесь, что переменные окружения загружены до старта. При таком подходе сессия работает от начала до конца, а итоговое состояние доступно для логирования.

Статья основана на вопросе на StackOverflow от PinkBanter и ответе PinkBanter.