Flow Authoring Guide (Runtime/Core)¶
This guide describes the refactored Flow authoring surface used by the runtime/core:
Pipeline / TemporalFlow → Pipeline.validate() → IR → (optional) Pipeline.build_execution() → execute_ir()
Older pre-runtime-authoring material is not part of the public release docs in this repo.
Use the tutorial tracks and docs/handbook.md as the supported source of truth.
1) Define typed ports with @io¶
Prefer shared payloads or tuples first. Use @io when you need named reusable
boundary ports on a flow.
With @io, each annotated field becomes a named port.
from retriever.flow import io
@io
class CameraOut:
image: "np.ndarray"
@io
class DetectionsOut:
detections: list
Notes:
- @io makes all fields Optional[...] with default None. The runtime sets only the fields present for a step.
- @io is standalone. Do not stack it with @dataclass.
- input._signals exists as a lower-level debugging/introspection surface. Do not treat it as the main authoring pattern.
2) Implement a Flow[I, O]¶
A Flow is a typed node. Implement step(...) and optionally lifecycle hooks:
__lazy_init__()for backend-local heavy resourcesreset()for per-run or per-stepper state initializationfinalize()for cleanup
Keep module top-level code and __init__() import-safe and lightweight. Acquire
runtime-local resources in __lazy_init__() / reset().
from retriever.flow import Flow, io
@io
class SrcOut:
value: int
@io
class AddOut:
value: int
class Source(Flow[None, SrcOut]):
def step(self, _): # type: ignore[override]
return SrcOut(value=1)
class AddOne(Flow[SrcOut, AddOut]):
def step(self, input: SrcOut) -> AddOut:
return AddOut(value=input.value + 1)
- Override
step(...)in new code.run(...)is the deprecated backwards-compat alias, andforward(...)is a PyTorch-style alias forstep(...).
2.1) Optional Wrapper Factory (Torch/Gym)¶
If you want a quick bridge from a standard-library object, use retriever.lib.Wrapper.
This is convenient, but not the main authoring surface:
from retriever.lib import Wrapper
# PyTorch Module
model = Wrapper(MyModule())
# Gym Environment (pass instance or factory)
env = Wrapper(lambda: gym.make("CartPole-v1"))
3) Attach clocks: flow @ clock¶
Attach a clock to a flow instance to create a runnable node handle:
from retriever.flow import Rate, Trigger, Tick
src = Source() @ Rate(hz=10) # periodic, samples all inputs (default)
tick_only = Source() @ Tick(hz=10) # periodic, samples no inputs
event_driven = AddOne() @ Trigger("value")
Scheduling vs sampling¶
Clocks decide when a node runs; adapters decide how each connected input buffer is sampled.
Rate(hz=...)runs periodically and samples all connected inputs on each tick.Tick(hz=...)is the explicit tick-only clock and samples no inputs.Trigger("field", ...)runs on arrivals of the specified input fields.Hybrid(hz=..., trigger=[...])combines periodic execution with immediate trigger-driven runs.
For per-edge buffering and sampling behavior, configure adapters with sync=... and edge policies with edge_config=....
Use sync=... for the common case where one adapter policy applies to all inputs on an edge.
Use edge_config=... when you need per-port queue settings or per-port adapter overrides.
Example:
from retriever.flow import EdgeConfig, Hold, Latest
pipe.connect(
camera,
planner,
sync={"status": Hold(debounce=0.1)},
edge_config={
"*": EdgeConfig(qsize=32, on_full="drop"),
"frame": EdgeConfig(qsize=4, adapter=Latest()),
},
)
Rules:
- {"*": EdgeConfig(...)} sets defaults for every port on that edge.
- A named port entry like "frame": EdgeConfig(...) overrides the wildcard defaults for that port.
- Do not specify an adapter in both sync and edge_config for the same port. Retriever rejects that configuration instead of guessing which one should win.
Lag handling (Rate.on_lag=...)¶
If a node cannot keep up with its configured Rate(hz=...) (e.g. a large model is too slow),
Retriever needs an explicit policy for “missed ticks”.
Rate(..., on_lag=...) (and Hybrid(..., on_lag=...) for its periodic path) supports:
on_lag="warn"(default): skip missed ticks + emit throttled warnings (keeps latency bounded, but visible)on_lag="drop": same as warn, but without warnings (quiet best-effort)on_lag="error": raise if lagging by ≥ 1 tick (aliases:"panic","raise","strict")on_lag="catch_up": execute every tick eventually (simulation-style; latency can grow)
See: docs/handbook.md (Rate lag policy section).
Quick demo (Dora, using the panic alias):
pixi run python -m examples.tutorial.d_closed_loop_state_feedback.01_closed_loop_env --env toy --backend dora --hz 50 --duration 2 --on-lag panic
Pipeline-wide default:
Pipeline("name", on_lag="error")orpipe.set_on_lag("error")applies a default to any node still using the library default (on_lag="warn").
Global Defaults (optional)¶
For scripts and libraries, prefer explicit sync=... on each pipe.connect(...).
If you are prototyping in a notebook or REPL, you can still set a global default adapter:
import retriever
from retriever.flow import Latest
# Optional notebook/REPL convenience, not the preferred shared-example surface.
retriever.init(default_sync=Latest())
Use Pipeline(..., on_lag=...) or pipe.set_on_lag(...) to set lag policy defaults for a graph.
See also: docs/guide_temporal.md.
4) Connect nodes (edges)¶
Fan-in (Many-to-One)¶
You can connect multiple outputs to the same input port ("fan-in"). They share a single underlying buffer.
# A, B, and C all feed into monitor's 'data' port
a.then(monitor, sync=Latest())
b.then(monitor, sync=Latest())
c.then(monitor, sync=Latest())
- With
Latest(): The monitor runs whenever any of the sources emits a value (interleaved execution). - With
Window(agg="mean"): The monitor runs on the aggregated buffer of all inputs.
explicit Pipeline (recommended)¶
Pipeline is the preferred authoring surface when you don’t want a global context manager.
from retriever.flow import Pipeline, Rate, Latest
pipe = Pipeline("demo")
src = Source() @ Rate(hz=10)
add = AddOne() @ Rate(hz=10)
pipe.connect(src, add, sync=Latest()) # explicit adapter for this edge
then(...) and >>¶
TemporalFlow.then(...) and the >> operator connect nodes:
Important: handles must belong to the same Pipeline. The easiest way to guarantee that is to use either
explicit pipe.connect(...) calls or with pipe: when using >>.
Port mapping (map=...)¶
Edges are port-level. By default (map={"*": "*"}), Retriever auto-connects fields by matching names.
For explicit wiring:
Adapters (sync=...)¶
Adapters define how a downstream samples its input buffer:
Latest()(default if configured globally)Hold(debounce=...)Window(duration=..., agg=..., buffer_size=...)Events(duration=..., include_timestamps=..., buffer_size=...)Chunking(dt=...)Linear()
Adapters live in retriever/flow/adapter.py. The underlying buffer type is:
retriever.flow.types.TimedBuffer[T] = list[(timestamp: float, value: T)]
This runtime buffer is distinct from retriever.types.data.EventBuffer, which is used for explicit event/data/export contracts.
See: docs/guide_temporal.md.
5) Run vs debug¶
Full execution on a backend¶
pipe.run(backend="multiprocessing", duration=1.0)
pipe.run(backend="dora", duration=10.0)
# Record to file (uses in-process backend)
pipe.record("session.mcap", steps=50, dt=0.1)
In-process single-step debugging¶
Pipeline.step(...) runs the pipeline in the current Python process so you can use the VS Code debugger
inside Flow.step(...):
See: docs/guides/debugging.md.
6) Service RPC flows (advanced)¶
Retriever supports request/response “RPC edges” using generator-based flows via decorators:
@handle_service(service provider methods)@call_service(...)(service client flow thatyieldsServiceCall)
Example: examples/tutorial/b_ir_and_execution/07_request_response.py.
Notes:
- Service flows currently require a backend that supports the RPC wiring (the dora backend does).
- Pipeline.step(...) is a debug tool and currently does not support generator-based flows.
7) Closed loops (cycles)¶
Retriever allows feedback edges (cycles). Cycles are represented as SCC groups in IR.topology.groups.
Practical guidance:
- Avoid a “Trigger-only” cycle (
Trigger(...)on every node): it may deadlock with no initial event. - Prefer a clocked plant/env and an event-driven controller:
- env:
Rate(hz=...)(steps periodically, samples latest action) - controller:
Trigger("obs")(runs when a new observation arrives)
This yields a stable distributed closed-loop where the env tick uses the most recent action.
Example:
- examples/tutorial/d_closed_loop_state_feedback/01_closed_loop_env.py
- --env toy (no extra deps)
- --env pendulum (requires gymnasium or gym, MPC balancing loop)
Gym-style env wrapper notes:
- A Gym env is typically stateful and imperative; in Retriever you wrap it in a Flow:
- reset() creates the env
- run(Action) performs env.step(action) and returns Observation
- the flow can internally decide when to reset() (e.g. on done=True)
- The closed-loop becomes a normal pipeline cycle (env↔controller), which can run on
multiprocessing or Dora without changing user code.