Refactored Runtime Guide (Canonical)¶
This guide documents the canonical Retriever runtime workflow:
Pipeline / TemporalFlow → Pipeline.validate() → IR → (optional) Pipeline.build_execution() → execute_ir()
It intentionally avoids the legacy Flow.from_module/LocalExecutor API.
1) Quickstart: build + run a tiny pipeline¶
from retriever.flow import Flow, Pipeline, Rate, Latest, io
@io
class SrcOut:
value: int
@io
class AddOut:
value: int
class Source(Flow[None, SrcOut]):
def step(self, _): # type: ignore[override]
return SrcOut(value=1)
class AddOne(Flow[SrcOut, AddOut]):
def step(self, input: SrcOut) -> AddOut:
return AddOut(value=input.value + 1)
pipe = Pipeline("quickstart")
src = Source() @ Rate(hz=10)
add = AddOne() @ Rate(hz=10)
pipe.connect(src, add, sync=Latest())
pipe.run(backend="multiprocessing", duration=1.0)
# Or record deterministic local steps explicitly:
# pipe.record("log.mcap", steps=10, dt=0.1)
Async full run (non-blocking)¶
Single-step debugging (in-process)¶
Pipeline.step() runs the authored pipeline in the current Python process and advances it by one step:
res = pipe.step(dt=0.1) # advances an internal logical clock by 0.1s
print(res.executed)
pipe.close_stepper()
More details: docs/guides/debugging.md.
Record + replay (stepper-first)¶
For deterministic persisted recording, prefer pipe.record(...).
If you use pipe.run(record=...), you must opt into the in-process backend explicitly:
For explicit stepper-driven recording and replay, use:
pipe.record("logs/camera_recording.rrd", steps=10, dt=0.05)
pipe.replay(camera, path="logs/camera_recording.rrd")
2) Core concepts¶
@io types (ports)¶
Flows communicate with typed @io classes. Each annotated field becomes a port.
Use @io directly. Do not stack it with @dataclass.
Flow[I, O] (node logic)¶
A Flow is a node that implements:
reset()(optional)step(input: I) -> Ofinalize()(optional)
Clocks (when a node executes)¶
Attach a clock using flow @ clock:
Rate(hz=...): periodic execution, sampling all connected inputs on each tickTick(hz=...): periodic tick-only execution, sampling no inputsTrigger("field", ...): event-driven execution on one or more named input fieldsHybrid(hz=..., trigger=[...]): periodic execution plus immediate trigger-driven execution
Defaults:
- Rate(hz=...) and the periodic path of Hybrid(...) sample all connected inputs.
- Tick(hz=...) is the explicit tick-only clock.
- Trigger(...) samples only the triggering input fields.
Wiring (edges)¶
Connect nodes either with explicit Pipeline.connect(...) calls or with TemporalFlow.then(...) / >>
while a Pipeline is active as the owner.
Option A — explicit Pipeline.connect(...)
pipe = Pipeline("my_pipeline")
a = A() @ Rate(hz=10)
b = B() @ Rate(hz=10)
pipe.connect(a, b, sync=Latest())
Option B — with pipe: + handle chaining
sync is an Adapter that defines how the downstream samples its input queue.
3) Execution model¶
Validate to IR¶
pipe.validate() turns the authored graph into an IR. This happens automatically during Pipeline.run(...),
but you can call it directly for debugging or inspection.
Build execution graph (partitioning + placement)¶
pipe.build_execution() creates an ExecutionGraph, a physical graph used to decide:
- which flows should run together (co-location / fusion)
- where a partition should run (placement hints; currently informational)
Run with a backend¶
execute_ir(ir, backend=..., duration=..., blocking=...) runs the IR on:
multiprocessing: local Python multiprocessing backenddora: dora-rs backend (requires compatible dora CLI + deps)in-process: single-process wrapper for debugging/recording around a livePipeline
execute_ir(...) accepts either an IR or an ExecutionGraph.
Dora backend config: native node overrides (Tier A.1)¶
The dora backend can optionally run selected nodes as native dora nodes (Rust binaries) rather than Python executors.
This is controlled at execution time (no change to Flow authoring code) via backend_config["native_overrides"]:
pipe.run(
backend="dora",
backend_config={
# Match by node.id, "module:Type", or Type (in that order).
"native_overrides": {
"CameraSource": "native:retriever-dora-camera",
}
},
duration=10.0,
)
Native overrides are an advanced backend extension point: keep the public pipeline API stable, and place runtime-specific implementation behind the backend adapter.
Backend config: buffer engine (Tier B.3)¶
The runtime hot-path samples timestamped per-port buffers every step. Backends expose a
buffer_engine switch so we can later swap in a Rust implementation without changing
user pipelines:
"native" is reserved for a future retriever_native extension.
See examples/tutorial/c_debug_and_replay/05_buffer_engine_demo.py for a minimal runnable demo.
4) Event/time model (FRP vocabulary)¶
TimedBuffer¶
Each input port maintains a finite timestamped buffer:
retriever.flow.types.TimedBuffer[T] = list[tuple[float, T]]
This is what Subscriber.get_all() returns and what Adapters sample.
For collection/replay/export contracts, use retriever.types.data.EventBuffer instead of the runtime tuple buffer.
EventStream¶
EventStream[T] is a conceptual view over an event source. In the runtime:
- each port is an EventStream
- the runtime materializes only a finite
TimedBufferper port
EventStream.sample(adapter, now=...) applies an Adapter to its TimedBuffer.
Behavior¶
Behavior[T] is a continuous-time sampler: t -> value. In this runtime it is usually derived from an
EventStream via an Adapter.
Use the methods on retriever.flow.types.EventStream and retriever.flow.types.Behavior for stream-style composition, including map, filter, merge, fold, snapshot, combine_latest, flat_map, Behavior.select(...), and Behavior.until(...).
Adapters (sampling policy)¶
Adapters decide how to interpret a TimedBuffer at time now:
Latest(): last valueHold(debounce=...): zero-order hold with debounceWindow(duration=..., agg=...): time window aggregationEvents(duration=..., include_timestamps=...): return history/window (for “queue manipulation” flows)
5) Pipelines registry + plugins¶
Register a pipeline factory¶
Use retriever.registry.pipeline.register_pipeline to register a pipeline builder that returns:
- an
IR, or - a
PipelineBuilder/Pipeline(it will be validated automatically)
Plugin discovery¶
The runtime supports entry-point plugins (retriever.plugins) so system packages can register pipelines without
living in the runtime repo.
6) Notes¶
- This guide uses
retriever.flow.Pipeline. The top-levelretriever.Pipelinere-exports the same class. retriever.rt.signal.Signalis an internal per-step helper used by executors (sample → transform → publish). It is not anEventStream.