Skip to content

Quickstart

This page teaches the core Retriever model with the smallest runnable graph. If you want a visual sensor demo first, run pixi run demo-webcam-detection, then come back here for the API shape.

The five ideas

  1. @io defines typed message envelopes.
  2. Flow[I, O] defines stateful node logic.
  3. flow @ clock declares when a node runs.
  4. Pipeline.connect(..., sync=...) declares how data crosses an edge.
  5. pipe.run(...) executes; pipe.step(...) debugs in-process.

Minimal Runnable Graph

from retriever.flow import Flow, Latest, Pipeline, Rate, Trigger, io


@io
class Number:
    value: int


@io
class Doubled:
    value: int


class Source(Flow[None, Number]):
    def __init__(self) -> None:
        super().__init__()
        self.count = 0

    def step(self, _):  # type: ignore[override]
        self.count += 1
        return Number(value=self.count)


class Double(Flow[Number, Doubled]):
    def step(self, input: Number) -> Doubled:
        return Doubled(value=input.value * 2)


pipe = Pipeline("quickstart")
source = Source() @ Rate(hz=2)
double = Double() @ Trigger("value")
pipe.connect(source, double, sync=Latest())

pipe.run(backend="multiprocessing", duration=1.0)

Read it as a timed graph: Source wakes at 2 Hz, Double wakes when a new value arrives, and Latest() says exactly which upstream event the downstream Flow consumes.

Step Before You Scale

Use backend execution when you want real scheduling behavior:

pipe.run(backend="multiprocessing", duration=1.0)
pipe.run(backend="dora", duration=1.0)

Use the stepper when you want normal Python breakpoints and deterministic inspection:

result = pipe.step(dt=0.5)
print(result.executed)
pipe.close_stepper()

This split is intentional: debug the graph in one process first, then move the same graph to a backend.

First Things To Run

pixi run demo-webcam-detection

Runs camera -> detector -> display with a real webcam by default. If no camera is available, use the tutorial module with --camera-mode mock.

pixi run demo-basic-flow
pixi run demo-adapter-connection
pixi run demo-rt-execution
pixi run demo-stepper
pixi run demo-webcam-record
pixi run demo-webcam-replay-rrd

Rules Of Thumb

  • Define public Flow inputs and outputs with @io.
  • Keep Flow logic in step(...); put timing in clocks and edge sync policies.
  • Start with Rate, Trigger, and Latest; reach for advanced policies only when a concrete robot handoff needs them.
  • Use Pipeline.step(...) before debugging multiprocessing or dora execution.

Next Pages

Backend and visualization variants

Use these after the basic path is clear:

pixi run demo-webcam-detection-mp-rerun
pixi run demo-webcam-detection-dora-rerun
pixi run demo-webcam-window
pixi run demo-webcam-replay-mcap

The Dora demo tasks request a fresh runtime by default. If you run Dora manually and hit a stale coordinator or schema/version mismatch, restart Dora and rerun the task.