2025, Oct 15 23:17
Как обновлять данные и синхронизировать сессии в Taipy
Как в Taipy настроить глобальное состояние, on_init и broadcast_callback, чтобы дашборд обновлялся по кнопке и расписанию из Postgres у всех пользователей.
Обновление живых данных в многостраничном приложении Taipy кажется простым, пока UI не отказывается обновляться. Типичная конфигурация с шапкой, KPI-блоками и отдельными маршрутами Analysis и Overview часто загружается один раз и «застывает», оставляя кнопку “Refresh Now” без видимого эффекта во всех сессиях. Задача проста: по расписанию и по запросу через кнопку подтягивать свежие строки из Postgres и чтобы каждый подключенный пользователь видел одно и то же обновленное состояние.
Постановка задачи и паттерн без обновления
Приложение загружает данные для KPI и двух контентных страниц из общего модуля. Пример ниже показывает схему, при которой интерфейс не обновляется надежно для всех пользователей и страниц.
# data/get_data.py
import psycopg2
import pandas as pd
from datetime import datetime, timedelta
PG_HOST = "localhost"
PG_PORT = "5432"
PG_DBNAME = "test"
PG_USER = "pg_db"
PG_PASSWORD = "Admin"
df_cache = None  # предполагается общим для всех страниц
day_slice = None
def pull_dataset():
    conn = psycopg2.connect(
        host=PG_HOST,
        port=PG_PORT,
        dbname=PG_DBNAME,
        user=PG_USER,
        password=PG_PASSWORD
    )
    query = "SELECT * FROM prod_db"
    df_cache = pd.read_sql(query, conn)
    df_cache['date'] = pd.to_datetime(df_cache['date'])
    df_cache.columns = ['Date', 'Location', 'Tower Name', 'Zone', 'Stage Label', 'Class Stage Label']
    yesterday = (datetime.now() - timedelta(days=1)).date()
    day_slice = df_cache[df_cache['Date'].dt.date == yesterday]
    day_slice = day_slice.sort_values(by='Class Stage Label', ascending=False)
    conn.close()
    return df_cache, day_slice
def refresh_store(state=None):
    global df_cache, day_slice
    df_cache, day_slice = pull_dataset()
    if state:
        state.df_cache = df_cache
        state.day_slice = day_slice
# main.py
from taipy.gui import Gui
import threading
import time
from data.get_data import refresh_store
from pages import root
from Analysis import analysis
from Overview import overview
# фоновый цикл
def tick_loop(state):
    while True:
        time.sleep(300)
        refresh_store(state)
# попытка запустить при инициализации GUI
def boot_refresh(state):
    refresh_store(state)
    threading.Thread(target=tick_loop, args=(state,), daemon=True).start()
routes = {
    "/": home_view,
    "Analysis": analysis_view,
    "Overview": overview_view
}
Gui(pages=routes).run(on_init=boot_refresh)
# pages/root.py
from data.get_data import day_slice, refresh_store
import taipy.gui.builder as tgb
with tgb.Page() as home_view:
    tgb.text('## Performance Insights', mode='md')
    tgb.button('Refresh Now', on_action=refresh_store)
    with tgb.layout("1 1 1 1 1 1"):
        with tgb.part():
            tgb.text('### Date', mode='md')
            tgb.text("#### {','.join(day_slice['Date'].dt.strftime('%Y-%m-%d').unique())}", mode='md')
        with tgb.part():
            tgb.text('### Total', mode='md')
            tgb.text("#### {int(day_slice['Class Stage Label'].count())}", mode='md')
        with tgb.part():
            tgb.text('### Normal Stage', mode='md')
            tgb.text("#### {int(day_slice[day_slice['Class Stage Label']=='Normal'].shape[0])}", mode='md')
        with tgb.part():
            tgb.text('### Stage One', mode='md')
            tgb.text("#### {int(day_slice[day_slice['Class Stage Label']=='Stage1'].shape[0])}", mode='md')
        with tgb.part():
            tgb.text('### Stage Two', mode='md')
            tgb.text("#### {int(day_slice[day_slice['Class Stage Label']=='Stage2'].shape[0])}", class_name="lblink", mode='md')
        with tgb.part():
            tgb.text('### Stage Three', mode='md')
            tgb.text("#### {int(day_slice[day_slice['Class Stage Label']=='Stage3'].shape[0])}", class_name="blink", mode='md')
    tgb.navbar()
    with tgb.expandable(title='Data', expanded=False):
        tgb.table("{day_slice}")
Что на самом деле идёт не так
Объекты данных загружаются и даже переназначаются, но не находятся там, где Taipy ожидает видеть общее состояние приложения. Пока они не лежат в главном модуле, где создается Gui, их не воспринимают как глобальное состояние, разделяемое всеми страницами. Цикл обновления также меняет контекст лишь одной сессии вместо рассылки изменений. И, наконец, попытка передать on_init в Gui.run не сработает: этот параметр не поддерживается; Taipy ищет функцию с именем on_init в главном модуле или допускает присвоение через gui.on_init.
Решение: сделать состояние глобальным для приложения и рассылать обновления
Корректный подход — импортировать общие переменные в главный модуль, чтобы Taipy считал их глобальным состоянием, инициализировать их при подключении сессии через on_init и рассылать плановые обновления всем пользователям с помощью broadcast_callback. Кнопка по‑прежнему вызывает ту же функцию обновления данных.
# main.py
from taipy.gui import Gui
import threading
import time
from data.get_data import refresh_store, df_cache, day_slice
from pages import root
from Analysis import analysis
from Overview import overview
# инициализация данных при подключении новой сессии
def on_init(state):
    refresh_store(state)
# периодически рассылать обновление во все сессии
def auto_refresher(gui: Gui):
    while True:
        time.sleep(300)
        gui.broadcast_callback(refresh_store)
routes = {
    "/": home_view,
    "Analysis": analysis_view,
    "Overview": overview_view
}
gui = Gui(pages=routes)
thread = threading.Thread(target=auto_refresher, args=(gui,))
thread.start()
gui.run()
Такая схема решает сразу три задачи. Во‑первых, df_cache и day_slice живут в области видимости главного модуля, поэтому все страницы используют одни и те же объекты. Во‑вторых, on_init запускается для каждой новой сессии и сразу заполняет состояние. В‑третьих, фоновый поток не трогает отдельную сессию: он вызывает broadcast_callback и раз в пять минут выполняет функцию обновления сразу для всех.
Почему это важно для дашбордов Taipy
Дашборды, собирающие данные на нескольких страницах, должны оставаться согласованными между сессиями пользователей. Централизация общего состояния в главном модуле и рассылка обновлений гарантируют, что ручное нажатие “Refresh Now” и плановый рефреш приводят к одному и тому же, общему для приложения результату. Это предотвращает расхождения между пользователями и избавляет от «тихих» обновлений, которые не запускают перерисовку UI.
Итоги и практические рекомендации
Держите общие структуры данных в главном модуле рядом с Gui, чтобы они стали глобальным состоянием Taipy. Инициализируйте каждую сессию через функцию on_init — Taipy обнаружит её автоматически. Используйте Gui.broadcast_callback, чтобы раздавать периодические обновления всем подключенным пользователям. С такой связкой ваши KPI и таблицы будут предсказуемо обновляться и по кнопке, и по расписанию — на всех страницах и во всех сессиях.
Статья основана на вопросе на StackOverflow от user3219244 и ответе Zaccheus Sia.