Runtime/Core API (Refactored)¶
This page lists the current runtime/core API entry points and where they live.
Canonical workflow:
Pipeline / TemporalFlow → Pipeline.validate() → IR → (optional) Pipeline.build_execution() → execute_ir()
1) Public authoring surface (retriever.flow)¶
Import path:
from retriever.flow import (
Flow, Pipeline, PipelineBuilder, TemporalFlow,
io,
Rate, Tick, Trigger, Hybrid,
Latest, Hold, Window, Events,
EdgeConfig,
handle_service, call_service,
)
Key concepts:
- Flow[I, O]: user-defined node logic (step/reset/finalize, with run and init as compatibility aliases)
- @io classes: typed ports (each field is a port; use @io directly, not with @dataclass)
- flow @ clock: produces a TemporalFlow (node instance with execution config)
- Pipeline: explicit graph builder (recommended)
- PipelineBuilder: lower-level validation builder
- adapters (Latest/Hold/Window/Events): sampling policy for per-port buffers
- clocks (Rate/Tick/Trigger/Hybrid): scheduling + field sampling
Guide: docs/guide_flow.md.
2) Default-pipeline convenience API (Optional)¶
Import path:
This is a notebook/REPL convenience layer, not the primary shared-example surface.
Use it when you want a temporary thread-local graph without naming a Pipeline.
retriever.connect(src, dst, map=None, sync=None): connects twoTemporalFlows on the thread-local default pipelineretriever.default_pipeline(): returns the current thread-local pipeline, creating one lazily if neededretriever.clear_default_pipeline(): drops the current thread-local handleretriever.reset_default_pipeline(): eagerly creates a fresh empty default pipelineretriever.run(...): runs the thread-local default pipelineretriever.step(dt=0.1): manually steps the default pipeline in-processretriever.reset(): resets the default pipeline state
For scripts and shared examples, prefer explicit Pipeline(...) plus pipe.run(...),
pipe.step(...), and pipe.reset_stepper().
If you need a clean slate before wiring new notebook cells, prefer retriever.reset_default_pipeline().
If you only want to drop the current thread-local handle and let Retriever recreate it later, use retriever.clear_default_pipeline().
3) IR boundary (retriever.ir)¶
Import path:
Pipeline.validate() -> IR: converts an authored graph into backend-agnostic IR.Pipeline.build_execution() -> ExecutionGraph: creates a physical execution plan (partitioning + placement hints).
Guide: docs/guide_execution.md.
4) Runtime execution (retriever.rt)¶
Import path:
execute_ir(ir_or_graph, backend=..., duration=..., blocking=...): runs anIRor anExecutionGraphon a backend.
Backends:
- multiprocessing: (default)
- dora: High-performance, zero-copy (Rust)
- in-process: Debugging/recording (deterministic)
Architecture: docs/architecture.md.
5) Debugging / stepping (Pipeline surface)¶
Preferred entry points:
Pipeline.step(now=..., dt=...)— one in-process debug stepPipeline.reset_stepper()/Pipeline.close_stepper()- explicit stepper recording:
Pipeline.record(...)/Pipeline.replay(...) pipe.run(backend="in-process", record="file.mcap")remains available when you want a wall-clock-bounded in-process run that also persists artifacts
Implementation lives in:
- retriever/rt/stepper.py
Guide: docs/guides/debugging.md.
6) Pipelines registry + plugins (retriever.registry.pipeline)¶
Import path:
This enables external packages to register pipelines via entry points:
- group: retriever.plugins
Use build_ir(...) as the explicit execution-adjacent registry surface.
run_pipeline(...) still exists for compatibility, but it is not the primary
surface taught in these docs.
See: docs/architecture.md (“Registry + plugins”).