2025, Nov 18 23:00

How to Stream Live Data to Multiple Bokeh Server Sessions Safely with add_next_tick_callback and shared state

Push real-time data to multiple Bokeh server sessions while avoiding lock errors. Use add_next_tick_callback with partial and shared state to fix stale renders.

Building a live data viewer on top of a Bokeh server is a natural choice when you already speak Python. The challenge starts when you try to push the same data stream into multiple open sessions in real time. If you naïvely fan out updates from a background thread to each session, the first client will look fine, but as soon as a second tab joins, you may hit cryptic errors and see initial plots stuck until new data arrive. Let’s walk through what causes this and how to fix it cleanly.

Reproducing the problem

The setup below starts a worker thread as soon as the server boots, pulls new data periodically, and attempts to update all active sessions using add_next_tick_callback. It works for a single session, but breaks when a second session connects and can leave the initial render stale until the next update ticks in.

# server_hooks.py
import time
import threading
import bokeh.server.contexts
from bokeh.plotting import Document
from bokeh.models import ColumnDataSource
run_flag = True
shared_series = {
    'x': [1, 2, 3, 4, 5],
    'y': [6, 7, 2, 4, 7]
}
def apply_update(doc: Document, payload: dict):
    renderer = doc.get_model_by_name("line")
    if renderer is None:
        print("Model not found in document")
        return
    assert isinstance(renderer, bokeh.models.renderers.glyph_renderer.GlyphRenderer)
    src: ColumnDataSource = renderer.data_source
    src.data = payload
def fetch_payload(k: int) -> dict[str, list[float]]:
    global shared_series
    time.sleep(1)
    shared_series['x'].append(k)
    shared_series['y'].append(k * 0.5 % 5)
    return shared_series
def when_server_boots(ctx: bokeh.server.contexts.BokehServerContext):
    def feeder():
        global run_flag
        n = 0
        while run_flag:
            print(f"Iteration {n + 1}")
            new_payload = fetch_payload(n)
            for sess in ctx.sessions:
                print(sess.destroyed, sess.expiration_requested)
                sess.document.add_next_tick_callback(lambda: apply_update(sess.document, new_payload))
            n += 1
    t = threading.Thread(target=feeder, daemon=True)
    t.start()
def when_server_stops(ctx: bokeh.server.contexts.BokehServerContext):
    global run_flag
    run_flag = False
# main.py
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
from .server_hooks import shared_series
ds = ColumnDataSource(data=shared_series)
fig = figure(title="Simple Line Example", x_axis_label='x', y_axis_label='y')
fig.line(x="x", y="y", legend_label="My Value", line_width=2, source=ds, name="line")
curdoc().add_root(fig)

When multiple sessions connect, the server can throw a runtime error related to pending writes and document locks. The page also shows only the initial state until the first background update lands.

RuntimeError: _pending_writes should be non-None when we have a document lock, and we should have the lock when the document changes

What’s actually going wrong

Two issues surface at once. First, handing work from a background thread into Bokeh’s event loop must be done the way the server expects; otherwise you can trip document lock assumptions during patching. Second, without a shared and reachable state at session creation time, new documents won’t reflect the latest data until the worker thread dispatches another tick.

The documentation recommends using a callable bound via partial() when passing callbacks across thread boundaries. That pattern avoids the problems hit with the inline lambda approach. For seeding initial state, storing a reference on the server context lets each new session access the same data container as soon as it’s created.

Fixing the update path and the initial state

The first adjustment is to replace the lambda passed to add_next_tick_callback with functools.partial, as documented for thread-to-IOLoop updates. The second is to place a reference to the shared data on the Bokeh server context when the server loads, and to read that reference in each newly created document.

# server_hooks.py
import time
import threading
from functools import partial
import bokeh.server.contexts
from bokeh.plotting import Document
from bokeh.models import ColumnDataSource
run_flag = True
shared_series = {
    'x': [1, 2, 3, 4, 5],
    'y': [6, 7, 2, 4, 7]
}
def apply_update(doc: Document, payload: dict):
    renderer = doc.get_model_by_name("line")
    if renderer is None:
        print("Model not found in document")
        return
    assert isinstance(renderer, bokeh.models.renderers.glyph_renderer.GlyphRenderer)
    src: ColumnDataSource = renderer.data_source
    # src.stream(payload, rollover=100)
    src.data = payload
def fetch_payload(k: int) -> dict[str, list[float]]:
    global shared_series
    time.sleep(1)
    shared_series['x'].append(k)
    shared_series['y'].append(k * 0.5 % 5)
    return shared_series
def when_server_boots(ctx: bokeh.server.contexts.BokehServerContext):
    global shared_series
    ctx.global_data = {"data": shared_series}
    def feeder():
        global run_flag
        n = 0
        while run_flag:
            print(f"Iteration {n + 1}, doc count: {len(ctx.sessions)}")
            new_payload = fetch_payload(n)
            for sess in ctx.sessions:
                if sess.destroyed or sess.expiration_requested:
                    continue
                try:
                    doc = sess.document
                except AttributeError:
                    continue
                else:
                    doc.add_next_tick_callback(partial(apply_update, doc, new_payload))
            n += 1
    t = threading.Thread(target=feeder, daemon=True)
    t.start()
def when_server_stops(ctx: bokeh.server.contexts.BokehServerContext):
    global run_flag
    run_flag = False
# main.py
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource
srv_ctx = curdoc().session_context.server_context
if hasattr(srv_ctx, "global_data"):
    global_bucket: dict = srv_ctx.global_data
else:
    raise RuntimeError("global_data not found in server context")
series = global_bucket["data"]
print("data id", id(series))
source = ColumnDataSource(data=series)
fig = figure(title="Simple Line Example", x_axis_label='x', y_axis_label='y')
fig.line(x="x", y="y", legend_label="My Value", line_width=2, source=source, name="line")
curdoc().add_root(fig)

This preserves a single background producer that fans out updates to every live session safely, and it ensures that a newly opened page immediately reflects the current dataset before any fresh ticks arrive.

There is a practical note about processes. Keeping state on the server context is tied to a single process. If the server runs with more than one process, that approach can be problematic. In this scenario that’s acceptable because on Windows, with the stated Bokeh version, running multiple processes is not supported.

Why this matters for multi-session live apps

Live dashboards often start as a single-view prototype, then grow into multi-session tools. The moment you introduce a second tab or user, both correctness and latency become visible. Properly scheduling document updates from worker threads avoids hard-to-debug race conditions and lock errors. Seeding each document with a shared source of truth prevents the awkward “empty until the next tick” experience for late joiners.

Takeaways

If you drive Bokeh documents from a background thread, schedule UI changes with add_next_tick_callback using a callable created via partial. When multiple sessions must see the same evolving dataset, store a reference on the server context and pick it up during document creation so every client renders the current state immediately. Alternatives like per-session subscribers or HTTP polling exist, but if you want centralized updates with minimal delay, this pattern keeps things simple and responsive within the constraints described.