Skip to content

Refactored Runtime Guide (Canonical)

This guide documents the canonical Retriever runtime workflow:

Pipeline / TemporalFlow → Pipeline.validate() → IR → (optional) Pipeline.build_execution() → execute_ir()

It intentionally avoids the legacy Flow.from_module/LocalExecutor API.

1) Quickstart: build + run a tiny pipeline

from retriever.flow import Flow, Pipeline, Rate, Latest, io


@io
class SrcOut:
    value: int


@io
class AddOut:
    value: int


class Source(Flow[None, SrcOut]):
    def step(self, _):  # type: ignore[override]
        return SrcOut(value=1)


class AddOne(Flow[SrcOut, AddOut]):
    def step(self, input: SrcOut) -> AddOut:
        return AddOut(value=input.value + 1)


pipe = Pipeline("quickstart")
src = Source() @ Rate(hz=10)
add = AddOne() @ Rate(hz=10)
pipe.connect(src, add, sync=Latest())

pipe.run(backend="multiprocessing", duration=1.0)
# Or record deterministic local steps explicitly:
# pipe.record("log.mcap", steps=10, dt=0.1)

Async full run (non-blocking)

engine = pipe.run(backend="multiprocessing", blocking=False)
# ... do other work ...
engine.stop()

Single-step debugging (in-process)

Pipeline.step() runs the authored pipeline in the current Python process and advances it by one step:

res = pipe.step(dt=0.1)  # advances an internal logical clock by 0.1s
print(res.executed)
pipe.close_stepper()

More details: docs/guides/debugging.md.

Record + replay (stepper-first)

For deterministic persisted recording, prefer pipe.record(...).

If you use pipe.run(record=...), you must opt into the in-process backend explicitly:

pipe.run(backend="in-process", duration=1.0, record="log.mcap")

For explicit stepper-driven recording and replay, use:

pipe.record("logs/camera_recording.rrd", steps=10, dt=0.05)
pipe.replay(camera, path="logs/camera_recording.rrd")

2) Core concepts

@io types (ports)

Flows communicate with typed @io classes. Each annotated field becomes a port. Use @io directly. Do not stack it with @dataclass.

Flow[I, O] (node logic)

A Flow is a node that implements:

  • reset() (optional)
  • step(input: I) -> O
  • finalize() (optional)

Clocks (when a node executes)

Attach a clock using flow @ clock:

  • Rate(hz=...): periodic execution, sampling all connected inputs on each tick
  • Tick(hz=...): periodic tick-only execution, sampling no inputs
  • Trigger("field", ...): event-driven execution on one or more named input fields
  • Hybrid(hz=..., trigger=[...]): periodic execution plus immediate trigger-driven execution

Defaults: - Rate(hz=...) and the periodic path of Hybrid(...) sample all connected inputs. - Tick(hz=...) is the explicit tick-only clock. - Trigger(...) samples only the triggering input fields.

Wiring (edges)

Connect nodes either with explicit Pipeline.connect(...) calls or with TemporalFlow.then(...) / >> while a Pipeline is active as the owner.

Option A — explicit Pipeline.connect(...)

pipe = Pipeline("my_pipeline")
a = A() @ Rate(hz=10)
b = B() @ Rate(hz=10)
pipe.connect(a, b, sync=Latest())

Option B — with pipe: + handle chaining

pipe = Pipeline("my_pipeline")
with pipe:
    a = A() @ Rate(hz=10)
    b = B() @ Rate(hz=10)
    a >> b

sync is an Adapter that defines how the downstream samples its input queue.

3) Execution model

Validate to IR

pipe.validate() turns the authored graph into an IR. This happens automatically during Pipeline.run(...), but you can call it directly for debugging or inspection.

Build execution graph (partitioning + placement)

pipe.build_execution() creates an ExecutionGraph, a physical graph used to decide:

  • which flows should run together (co-location / fusion)
  • where a partition should run (placement hints; currently informational)

Run with a backend

execute_ir(ir, backend=..., duration=..., blocking=...) runs the IR on:

  • multiprocessing: local Python multiprocessing backend
  • dora: dora-rs backend (requires compatible dora CLI + deps)
  • in-process: single-process wrapper for debugging/recording around a live Pipeline

execute_ir(...) accepts either an IR or an ExecutionGraph.

Dora backend config: native node overrides (Tier A.1)

The dora backend can optionally run selected nodes as native dora nodes (Rust binaries) rather than Python executors. This is controlled at execution time (no change to Flow authoring code) via backend_config["native_overrides"]:

pipe.run(
    backend="dora",
    backend_config={
        # Match by node.id, "module:Type", or Type (in that order).
        "native_overrides": {
            "CameraSource": "native:retriever-dora-camera",
        }
    },
    duration=10.0,
)

Native overrides are an advanced backend extension point: keep the public pipeline API stable, and place runtime-specific implementation behind the backend adapter.

Backend config: buffer engine (Tier B.3)

The runtime hot-path samples timestamped per-port buffers every step. Backends expose a buffer_engine switch so we can later swap in a Rust implementation without changing user pipelines:

pipe.run(
    backend="dora",
    backend_config={"buffer_engine": "python"},  # default
    duration=10.0,
)

"native" is reserved for a future retriever_native extension. See examples/tutorial/c_debug_and_replay/05_buffer_engine_demo.py for a minimal runnable demo.

4) Event/time model (FRP vocabulary)

TimedBuffer

Each input port maintains a finite timestamped buffer:

retriever.flow.types.TimedBuffer[T] = list[tuple[float, T]]

This is what Subscriber.get_all() returns and what Adapters sample. For collection/replay/export contracts, use retriever.types.data.EventBuffer instead of the runtime tuple buffer.

EventStream

EventStream[T] is a conceptual view over an event source. In the runtime:

  • each port is an EventStream
  • the runtime materializes only a finite TimedBuffer per port

EventStream.sample(adapter, now=...) applies an Adapter to its TimedBuffer.

Behavior

Behavior[T] is a continuous-time sampler: t -> value. In this runtime it is usually derived from an EventStream via an Adapter.

Use the methods on retriever.flow.types.EventStream and retriever.flow.types.Behavior for stream-style composition, including map, filter, merge, fold, snapshot, combine_latest, flat_map, Behavior.select(...), and Behavior.until(...).

Adapters (sampling policy)

Adapters decide how to interpret a TimedBuffer at time now:

  • Latest(): last value
  • Hold(debounce=...): zero-order hold with debounce
  • Window(duration=..., agg=...): time window aggregation
  • Events(duration=..., include_timestamps=...): return history/window (for “queue manipulation” flows)

5) Pipelines registry + plugins

Register a pipeline factory

Use retriever.registry.pipeline.register_pipeline to register a pipeline builder that returns:

  • an IR, or
  • a PipelineBuilder / Pipeline (it will be validated automatically)

Plugin discovery

The runtime supports entry-point plugins (retriever.plugins) so system packages can register pipelines without living in the runtime repo.

6) Notes

  • This guide uses retriever.flow.Pipeline. The top-level retriever.Pipeline re-exports the same class.
  • retriever.rt.signal.Signal is an internal per-step helper used by executors (sample → transform → publish). It is not an EventStream.