2025, Nov 27 15:03

Как безопасно обновлять данные в Bokeh-сервере для нескольких сессий

Почему ломаются Bokeh-приложения с несколькими сессиями и как безопасно обновлять данные через add_next_tick_callback и partial из фонового потока в Python

Создавать просмотрщик данных в реальном времени поверх сервера Bokeh — естественный выбор, если вы уже работаете с Python. Сложности начинаются, когда вы пытаетесь в реальном времени отправлять один и тот же поток данных в несколько открытых сессий. Если наивно «раздавать» обновления из фонового потока в каждую сессию, первый клиент будет выглядеть нормально, но как только подключится вторая вкладка, можно столкнуться с непонятными ошибками, а исходные графики застывают до прихода новых данных. Разберёмся, почему так происходит и как аккуратно это исправить.

Воспроизводим проблему

Конфигурация ниже запускает рабочий поток сразу после старта сервера, периодически подтягивает новые данные и пытается обновлять все активные сессии через add_next_tick_callback. С одной сессией всё работает, но при подключении второй всё ломается, а первичный рендер может остаться устаревшим до следующего тика обновления.

# server_hooks.py
import time
import threading

import bokeh.server.contexts
from bokeh.plotting import Document
from bokeh.models import ColumnDataSource

run_flag = True
shared_series = {
    'x': [1, 2, 3, 4, 5],
    'y': [6, 7, 2, 4, 7]
}

def apply_update(doc: Document, payload: dict):
    renderer = doc.get_model_by_name("line")
    if renderer is None:
        print("Model not found in document")
        return
    assert isinstance(renderer, bokeh.models.renderers.glyph_renderer.GlyphRenderer)
    src: ColumnDataSource = renderer.data_source
    src.data = payload


def fetch_payload(k: int) -> dict[str, list[float]]:
    global shared_series
    time.sleep(1)
    shared_series['x'].append(k)
    shared_series['y'].append(k * 0.5 % 5)
    return shared_series


def when_server_boots(ctx: bokeh.server.contexts.BokehServerContext):
    def feeder():
        global run_flag
        n = 0
        while run_flag:
            print(f"Iteration {n + 1}")
            new_payload = fetch_payload(n)
            for sess in ctx.sessions:
                print(sess.destroyed, sess.expiration_requested)
                sess.document.add_next_tick_callback(lambda: apply_update(sess.document, new_payload))
            n += 1

    t = threading.Thread(target=feeder, daemon=True)
    t.start()


def when_server_stops(ctx: bokeh.server.contexts.BokehServerContext):
    global run_flag
    run_flag = False
# main.py
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
from .server_hooks import shared_series

ds = ColumnDataSource(data=shared_series)

fig = figure(title="Simple Line Example", x_axis_label='x', y_axis_label='y')
fig.line(x="x", y="y", legend_label="My Value", line_width=2, source=ds, name="line")

curdoc().add_root(fig)

Когда подключается несколько сессий, сервер может выбрасывать runtime‑ошибку, связанную с отложенными записями и блокировками документа. Страница также показывает лишь начальное состояние до тех пор, пока не придёт первое фоновое обновление.

RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes

Что на самом деле происходит

Одновременно всплывают две проблемы. Во‑первых, передавать работу из фонового потока в событийный цикл Bokeh нужно именно так, как ожидает сервер; иначе во время правок вы нарушите предположения о блокировке документа. Во‑вторых, если при создании сессии нет доступного общего состояния, новые документы не увидят актуальные данные до следующего тика от рабочего потока.

Документация рекомендует при передаче колбеков между потоками использовать вызываемый объект, связанный через partial(). Такой приём избегает проблем, возникающих с «встроенной» lambda. Для инициализации состояния сохраните ссылку в контексте сервера — тогда каждая новая сессия сразу получит доступ к одному и тому же контейнеру данных.

Чиним путь обновления и начальное состояние

Во‑первых, замените lambda, передаваемую в add_next_tick_callback, на functools.partial — как и описано для обновлений из потока в IOLoop. Во‑вторых, при загрузке сервера поместите ссылку на общие данные в контекст Bokeh‑сервера и считывайте её при создании каждого документа.

# server_hooks.py
import time
import threading
from functools import partial

import bokeh.server.contexts
from bokeh.plotting import Document
from bokeh.models import ColumnDataSource

run_flag = True
shared_series = {
    'x': [1, 2, 3, 4, 5],
    'y': [6, 7, 2, 4, 7]
}

def apply_update(doc: Document, payload: dict):
    renderer = doc.get_model_by_name("line")
    if renderer is None:
        print("Model not found in document")
        return
    assert isinstance(renderer, bokeh.models.renderers.glyph_renderer.GlyphRenderer)
    src: ColumnDataSource = renderer.data_source
    # src.stream(payload, rollover=100)
    src.data = payload


def fetch_payload(k: int) -> dict[str, list[float]]:
    global shared_series
    time.sleep(1)
    shared_series['x'].append(k)
    shared_series['y'].append(k * 0.5 % 5)
    return shared_series


def when_server_boots(ctx: bokeh.server.contexts.BokehServerContext):
    global shared_series
    ctx.global_data = {"data": shared_series}

    def feeder():
        global run_flag
        n = 0
        while run_flag:
            print(f"Iteration {n + 1}, doc count: {len(ctx.sessions)}")
            new_payload = fetch_payload(n)
            for sess in ctx.sessions:
                if sess.destroyed or sess.expiration_requested:
                    continue
                try:
                    doc = sess.document
                except AttributeError:
                    continue
                else:
                    doc.add_next_tick_callback(partial(apply_update, doc, new_payload))
            n += 1

    t = threading.Thread(target=feeder, daemon=True)
    t.start()


def when_server_stops(ctx: bokeh.server.contexts.BokehServerContext):
    global run_flag
    run_flag = False
# main.py
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource

srv_ctx = curdoc().session_context.server_context
if hasattr(srv_ctx, "global_data"):
    global_bucket: dict = srv_ctx.global_data
else:
    raise RuntimeError("global_data not found in server context")

series = global_bucket["data"]
print("data id", id(series))
source = ColumnDataSource(data=series)

fig = figure(title="Simple Line Example", x_axis_label='x', y_axis_label='y')
fig.line(x="x", y="y", legend_label="My Value", line_width=2, source=source, name="line")

curdoc().add_root(fig)

Так сохраняется единый фоновый «поставщик», который безопасно распространяет обновления во все активные сессии, а новая вкладка сразу показывает текущее состояние набора данных ещё до прихода очередного тика.

Практическая заметка о процессах. Хранение состояния в контексте сервера привязано к одному процессу. Если сервер запущен в нескольких процессах, такой подход может оказаться проблемным. В данном сценарии это допустимо, поскольку в Windows с указанной версией Bokeh многопроцессный режим не поддерживается.

Почему это важно для многосессионных live‑приложений

Живые дашборды часто начинаются как прототип для одного экрана, а затем превращаются в инструмент для нескольких сессий. Как только появляется вторая вкладка или пользователь, становятся заметны и корректность, и задержки. Правильное планирование обновлений документа из рабочих потоков предотвращает сложные для отладки гонки и ошибки блокировок. Предварительная инициализация каждого документа общим источником истины избавляет опоздавших от неудобного эффекта «пусто до следующего тика».

Выводы

Если вы обновляете документы Bokeh из фонового потока, планируйте изменения интерфейса через add_next_tick_callback, передавая вызываемый объект, созданный с помощью partial. Когда нескольким сессиям нужно видеть один и тот же изменяющийся набор данных, храните ссылку в контексте сервера и считывайте её при создании документа — так каждый клиент сразу отрисует актуальное состояние. Альтернативы вроде подписчиков на сессию или HTTP‑опроса существуют, но если вам нужны централизованные обновления с минимальной задержкой, описанный приём остаётся простым и отзывчивым в обозначенных рамках.