StreamBinding is an invisible runtime component. It subscribes the current client stream to a named runtime stream and converts matching events into client state updates.
Use it for live feeds that are not finite background jobs: runtime metrics, telemetry samples, live logs already modeled as stream events, or any bounded series that should update existing components through bindings.
Do not use it as a replacement for BackgroundTask. A BackgroundTask represents work with a lifecycle; StreamBinding represents a mounted subscription.
For module-owned continuous publishers, use a long_run_command to publish
events and let pages subscribe with StreamBinding. The command owns the
producer lifecycle; the page only decides how to map stream payloads into UI
state.
Runtime Flow¶
- The page renders a
StreamBinding. - The client asks the core runtime to subscribe that binding.
- The runtime listens to the configured source stream.
- Matching events are transformed into
stateUpdatemessages for the client stream. - Components bound to that page/global store update normally.
- When the component unmounts or the client disconnects, the runtime unsubscribes.
Python Contract¶
sdk.ui.StreamBinding(
id: str,
stream: str,
event: str = "",
target: dict | None = None,
transformer: str | dict | None = None,
)Supported Properties¶
| Property | Type | Required | Python | YAML | Notes |
|---|---|---|---|---|---|
id |
str |
yes | constructor | id |
Stable binding id. |
stream |
str |
yes | constructor | stream |
Runtime stream to subscribe to. |
event |
str |
no | constructor | event |
Matches payload event_name. Empty means all events. |
target.store |
page | global |
yes | target |
target.store |
Store scope to update. |
target.path |
str |
yes | target |
target.path |
Base store path. |
target.mode |
set | append_window |
no | target |
target.mode |
Update strategy. Default is set. |
target.window |
int |
no | target |
target.window |
Maximum list length for append_window. |
target.mappings |
dict[str, str] |
yes, unless transformer is used |
target |
target.mappings |
Maps store fields to payload selectors. |
transformer |
str | dict |
no | constructor | transformer |
Action called for each matching payload before writing store values. |
transformer.name |
str |
yes, when transformer is a dict |
transformer |
transformer.name |
Qualified action name. |
transformer.context |
dict |
no | transformer |
transformer.context |
Static configuration passed to the action. |
Selectors currently support direct payload fields with $.field or dotted object paths such as $.node.cpu_percent.
Transformer Actions¶
Use transformer when the stream payload must be filtered, grouped, merged with
current client state, or converted into multiple store paths.
The transformer action receives:
| Context key | Value |
|---|---|
stream_id / _stream_id |
Current client stream id. Use it with sdk.effects.ask_current_store_value(...) when the transformer needs the current page/global store value. |
binding_id |
Mounted StreamBinding id. |
source_stream |
Runtime stream being consumed. |
event |
Matched event name. |
payload |
Raw stream event payload. |
target |
Declared target dictionary. |
context |
Static transformer.context dictionary. |
Return one of these shapes:
return {"value": next_value}value is written to target.path.
return {
"values": {
"/path/one": value_one,
"/path/two": value_two,
}
}values writes explicit store paths in the target scope.
YAML Example¶
- kind: StreamBinding
id: gpu_usage_binding
stream: system.runtime.metrics.events
event: runtime.metrics.sample
target:
store: page
path: /runtime_metrics/gpu
mode: append_window
window: 60
mappings:
data: $.vram_used_percent
labels: $.timestamp
- kind: Chart
id: gpu_usage_chart
chart_type: line
data: {type: store, scope: page, path: /runtime_metrics/gpu/data}
labels: {type: store, scope: page, path: /runtime_metrics/gpu/labels}Runtime Metrics Example¶
The core runtime publishes node resource samples on:
system.runtime.metrics.eventswith event_name:
runtime.metrics.sampleThe sample payload includes node_id, timestamp, label, cpu_percent,
ram_used_percent, ram_used_mb, ram_total_mb, vram_used_percent,
vram_used_mb, vram_total_mb, and has_nvidia_gpu.
- kind: StreamBinding
id: node_vram_binding
stream: system.runtime.metrics.events
event: runtime.metrics.sample
target:
store: page
path: /runtime_metrics/vram
mode: append_window
window: 60
mappings:
data: $.vram_used_percent
labels: $.labelTo group samples by node before writing the page store, use a transformer:
- kind: StreamBinding
id: node_metrics_binding
stream: system.runtime.metrics.events
event: runtime.metrics.sample
transformer:
name: monitor.transform_runtime_metrics
context:
window: 60
target:
store: page
path: /runtime_metrics/by_nodeThe action can read the current client-side value and return the replacement:
from democrai.sdk.decorators import action
@action("transform_runtime_metrics")
async def transform_runtime_metrics(ctx: dict, module_sdk):
current = await module_sdk.effects.ask_current_store_value(
ctx["stream_id"],
ctx["target"]["path"],
ctx["target"].get("store", "page"),
)
next_value = dict(current or {})
next_value[ctx["payload"]["node_id"]] = ctx["payload"]
return {"value": next_value}Module Publisher Example¶
A module can publish a long-running feed through a command registered at module initialization:
import asyncio
from democrai.sdk.client import active_sdk as sdk
from democrai.sdk.decorators import long_run_command
@long_run_command("my_module.metrics_publisher")
async def metrics_publisher(stop_event=None):
while stop_event is None or not stop_event.is_set():
await sdk.effects.publish_to_stream(
"my_module.metrics",
{
"event_name": "my_module.metrics.sample",
"value": 42,
"label": "sample",
},
)
await asyncio.sleep(1)The page subscribes without knowing how the producer is scheduled:
- kind: StreamBinding
id: my_module_metrics_binding
stream: my_module.metrics
event: my_module.metrics.sample
target:
store: page
path: /my_module/metrics
mode: append_window
window: 60
mappings:
data: $.value
labels: $.label
- kind: Chart
id: my_module_metrics_chart
chart_type: line
data: {type: store, scope: page, path: /my_module/metrics/data}
labels: {type: store, scope: page, path: /my_module/metrics/labels}Python Example¶
builder.add(
sdk.ui.StreamBinding(
"gpu_usage_binding",
stream="system.runtime.metrics.events",
event="runtime.metrics.sample",
target={
"store": "page",
"path": "/runtime_metrics/gpu",
"mode": "append_window",
"window": 60,
"mappings": {
"data": "$.vram_used_percent",
"labels": "$.timestamp",
},
},
)
)