Debugging Pipelines: Pipeline.run() vs Pipeline.step()¶
Retriever intentionally exposes two execution surfaces:
Pipeline.run(...): execute a validated pipeline on a runtime backend (multiprocessing / dora).Pipeline.step(...): execute a pipeline in-process, one discrete step at a time, for debugging.
This note explains the design, semantics, and limitations so the behavior is predictable.
1) Pipeline.run(...) (backend execution)¶
Pipeline.run(...) is the production-facing API:
- validates the authored graph to
IRinternally (no explicitvalidate(...)needed in user code) - starts a backend
ExecutionEngine
Blocking run¶
Async/non-blocking run¶
Notes:
- blocking=False returns immediately; you must stop the engine manually to avoid orphan processes.
- Pipeline.run(...) always executes the validated IR. Use pipe.build_execution(...)
separately when you want to inspect grouping / placement decisions.
2) Pipeline.step(...) (in-process debugging)¶
Pipeline.step(...) is a debug tool, not a backend:
- runs the pipeline inside the current Python process
- advances one discrete step of the runtime semantics:
sample → step → publish - returns a
StepResultwith what executed and snapshots of inputs/outputs
res = pipe.step(dt=0.1)
print(res.executed) # list of node_ids executed in this step
pipe.close_stepper()
What the stepper actually builds¶
The in-process stepper:
- Validates the pipeline to
IR(same validator asPipeline.run). - Builds in-memory channels for each data edge.
- Loads per-edge adapters from IR (so buffer sizes match runtime).
- Uses the same Flow instances you authored (no re-import/re-instantiation).
Implementation lives in:
retriever/rt/stepper.pyretriever/flow/pipeline.py(Pipeline.step/reset_stepper/close_stepper)
Flow lifecycle in the stepper¶
- The stepper calls
Flow.reset()lazily on the firststep(). Pipeline.reset_stepper()callsFlow.reset()and clears all buffers.Pipeline.close_stepper()callsFlow.finalize()and drops the stepper.
Use close_stepper() when you’re done to release resources (e.g., camera handles).
3) Clock semantics in Pipeline.step(...)¶
Pipeline.step() is not real-time. It’s “one debug tick”.
For each node:
Rate(hz=...) / Tick(hz=...)¶
- executes once per
step() Rate(hz=...)samples all connected inputsTick(hz=...)samples no inputs
Trigger(...)¶
- executes only if a new arrival is observed on one of its trigger fields
- if multiple trigger fields have new arrivals, the first one in
Trigger.fieldswins for that step
Hybrid(hz=..., trigger=[...])¶
- if a trigger field has a new arrival: executes as a trigger step (sampling that field)
- otherwise executes once per step and samples all connected inputs
Time parameters:
now=...: pins the step timestamp.dt=...: advances an internal logical clock (useful for deterministic unit tests).- if neither is provided, the stepper uses
time.time().
4) Event buffers, adapters, and sampling¶
Retriever’s runtime model is:
- each port is a discrete-time EventStream
- concretely stored as a finite
retriever.flow.types.TimedBuffer[T] = list[(timestamp, value)] - for collection/replay/export semantics, use
retriever.types.data.EventBufferinstead of this runtime buffer - adapters sample buffers at time
nowto produce a value for the Flow input
In Pipeline.step(...), buffers are in-process and the buffer size is derived from the adapter:
Latest(buffer_size=1)keeps only the last itemWindow(buffer_size=N, duration=...)keeps N items and filters by timestamps
5) Limitations (current)¶
These are intentional constraints for the first version:
- Generator-based flows / services are not supported in
Pipeline.step(...)yet. - The dora executor supports generators for RPC; the stepper currently raises an error if
Flow.step()yields. - Service edges (
_request_out,_response_in/...) are ignored by the stepper for now. - Cycles are executed once per step using the IR’s SCC groups order; this is a debug approximation.
If you need “real execution semantics”, use Pipeline.run(...) on a backend.
6) VS Code debugging workflow (recommended)¶
Because backend execution runs flows in separate processes, the simplest way to use the VS Code debugger
to step through Flow logic is to run in-process with Pipeline.step().
Minimal example¶
Use: examples/tutorial/c_debug_and_replay/01_debug_stepper.py
What to do:
- Open
examples/tutorial/c_debug_and_replay/01_debug_stepper.py - Set a breakpoint inside
DebugFlow.step()(or anyFlow.step()you want to inspect) - Start the VS Code debugger (F5) using the provided launch config (see
.vscode/launch.json)
Breaking on exceptions¶
The example can optionally raise an exception when the counter reaches a value:
In VS Code, enable “Break on exceptions” to stop exactly where the exception is raised inside Flow.step().
Debugging the perception detector (no camera)¶
If you want to debug the real ColorDetector logic from the dora perception demo without starting dora or a camera, use:
examples/tutorial/c_debug_and_replay/02_debug_perception_stepper.py
It generates synthetic red/blue frames in-process and runs:
SyntheticCamera → ColorDetector → PrintDetections
Set breakpoints inside ColorDetector.step() / _detect_from_mask() and run under the debugger.
Debugging the perception workflow (real camera)¶
If you want to debug the perception demo with an actual camera (while still staying in-process for VS Code breakpoints), use:
examples/tutorial/c_debug_and_replay/03_debug_perception_stepper_real_camera.py
This runs:
CameraSource (real) → ColorDetector → DisplayFlow
Notes:
- By default it prints to stdout without opening an OpenCV window.
- Pass --show-window to enable the GUI window.
- Defaults to a short run (10 steps); override with --steps / --sleep.
- dt is optional and only affects timestamps (not scheduling). Pass --dt to force a fixed logical step.
Interpreter note (Pixi)¶
If you use Pixi, the interpreter usually lives at:
./.pixi/envs/default/bin/python
Make sure VS Code is using that interpreter (or run the module via the launch config).
7) Recording + replay (rosbag-like workflow)¶
The stepper is useful for “record once, debug many times” workflows:
- record a short input sequence from real hardware
- replay it later in-process so you can set breakpoints inside
Flow.step()
Library helpers (stepper-first):
- Preferred:
Pipeline.record(path, steps=..., dt=...)andPipeline.replay(handle, path=...). - Legacy stream-only helper:
Pipeline.record_to(handle, path, ...). - Low-level:
retriever.rt.stepper.EventStreamRecorder,save_timed_buffer/load_timed_buffer,replay_flow.
Perception example:
- examples/tutorial/c_debug_and_replay/04_record_replay_perception.py:
- Record: python -m examples.tutorial.c_debug_and_replay.04_record_replay_perception record ...
- Replay: python -m examples.tutorial.c_debug_and_replay.04_record_replay_perception replay --recording logs/perception.rrd --visualize stdout
The tutorial record path writes logs/perception.rrd by default and mirrors the same run to logs/perception.mcap.
Replay accepts either artifact.