This guide covers how to build workflows with duroxide-python. For architecture details and internals, see architecture.md.
pip install duroxide| Concept | What It Is |
|---|---|
| Orchestration | A durable workflow (generator function). Survives restarts via replay. |
| Activity | A regular function for side effects (HTTP calls, DB writes). Runs once, result is cached. |
| Provider | Storage backend (SQLite or PostgreSQL). Stores orchestration state. |
| Client | Starts orchestrations, raises events, queries status. |
| Runtime | Runs the dispatchers that execute orchestrations and activities. |
from duroxide import SqliteProvider, Client, Runtime
provider = SqliteProvider.in_memory()
client = Client(provider)
runtime = Runtime(provider)
@runtime.register_activity("SayHello")
def say_hello(ctx, name):
return f"Hello, {name}!"
@runtime.register_orchestration("HelloWorkflow")
def hello_workflow(ctx, input):
greeting = yield ctx.schedule_activity("SayHello", input["name"])
return greeting
runtime.start()
client.start_orchestration("hello-1", "HelloWorkflow", {"name": "World"})
result = client.wait_for_orchestration("hello-1")
print(result.output) # "Hello, World!"
runtime.shutdown()@runtime.register_orchestration("Pipeline")
def pipeline(ctx, input):
step1 = yield ctx.schedule_activity("Extract", input)
step2 = yield ctx.schedule_activity("Transform", step1)
step3 = yield ctx.schedule_activity("Load", step2)
return step3Run multiple tasks in parallel and wait for all to complete. ctx.all() supports all task types — activities, timers, waits, and sub-orchestrations:
@runtime.register_orchestration("FanOut")
def fan_out(ctx, input):
tasks = [ctx.schedule_activity("ProcessItem", item) for item in input["items"]]
results = yield ctx.all(tasks)
return resultsMixed task types in ctx.all():
@runtime.register_orchestration("MixedAll")
def mixed_all(ctx, input):
act_result, _, event_data = yield ctx.all([
ctx.schedule_activity("Work", input),
ctx.schedule_timer(1000), # timers return {"ok": None}
ctx.wait_for_event("approval"), # waits return {"ok": event_data}
])
return act_resultNote: Nesting
ctx.all()orctx.race()inside each other is not supported — the runtime will reject it.
Wait for the first of two tasks to complete. ctx.race() supports all task types:
@runtime.register_orchestration("RaceExample")
def race_example(ctx, input):
winner = yield ctx.race(
ctx.schedule_activity("FastService", input),
ctx.schedule_timer(5000), # 5 second timeout
)
if winner["index"] == 0:
return winner["value"] # FastService completed first
else:
return "timed out"ctx.race() supports exactly 2 tasks (maps to Rust select2). Nesting ctx.all() or ctx.race() inside each other is not supported.
@runtime.register_orchestration("DelayedNotification")
def delayed_notification(ctx, input):
yield ctx.schedule_timer(60 * 60 * 1000) # wait 1 hour (survives restarts)
yield ctx.schedule_activity("SendReminder", input)
return "done"Wait for a signal from outside the orchestration:
@runtime.register_orchestration("ApprovalWorkflow")
def approval_workflow(ctx, input):
yield ctx.schedule_activity("RequestApproval", input)
ctx.trace_info("waiting for approval...")
approval = yield ctx.wait_for_event("approval")
if approval["approved"]:
yield ctx.schedule_activity("Execute", input)
return "approved"
else:
return "rejected"
# Raise the event from outside:
client.raise_event("instance-1", "approval", {"approved": True})Compose workflows from smaller workflows:
@runtime.register_orchestration("Parent")
def parent(ctx, input):
child_result = yield ctx.schedule_sub_orchestration("Child", input)
return {"parent_result": child_result}
@runtime.register_orchestration("Child")
def child(ctx, input):
r = yield ctx.schedule_activity("DoWork", input)
return rWith an explicit instance ID:
result = yield ctx.schedule_sub_orchestration_with_id(
"Child", f"child-{input['id']}", input
)Start another orchestration without waiting for it to complete:
@runtime.register_orchestration("CreateInstance")
def create_instance(ctx, input):
yield ctx.schedule_activity("ProvisionVM", input)
# Launch monitor — runs independently
yield ctx.start_orchestration(
"InstanceMonitor",
f"monitor-{input['instance_id']}",
{"instance_id": input["instance_id"]},
)
return {"status": "provisioned"}For long-running orchestrations that need periodic refresh (e.g., monitoring loops):
@runtime.register_orchestration("Monitor")
def monitor(ctx, input):
state = input.get("state", {"check_count": 0})
# Do periodic work
health = yield ctx.schedule_activity("CheckHealth", input["target"])
ctx.trace_info(f"health check #{state['check_count']}: {health['status']}")
# Wait before next check
yield ctx.schedule_timer(30000) # 30 seconds
# Restart with updated state (keeps history from growing unbounded)
yield ctx.continue_as_new({
"target": input["target"],
"state": {"check_count": state["check_count"] + 1},
})Use try/except around yielded operations:
@runtime.register_orchestration("SafeWorkflow")
def safe_workflow(ctx, input):
try:
result = yield ctx.schedule_activity("RiskyCall", input)
return result
except Exception as e:
ctx.trace_error(f"activity failed: {e}")
yield ctx.schedule_activity("Cleanup", {"error": str(e)})
return {"status": "failed", "error": str(e)}result = yield ctx.schedule_activity_with_retry("FlakeyApi", input, {
"max_attempts": 3,
"backoff": "exponential",
"timeout_ms": 5000, # per-attempt timeout
"total_timeout_ms": 30000, # total timeout across all attempts
})Sessions provide activity affinity — all activities scheduled with the same session_id are routed to the same worker slot on the same runtime instance. This is useful when activities need to share in-memory state (e.g., a database connection, a loaded ML model, or a stateful client).
Use schedule_activity_on_session or pass session_id as a keyword argument to schedule_activity:
@runtime.register_orchestration("SessionWorkflow")
def session_workflow(ctx, input):
# Explicit method
r1 = yield ctx.schedule_activity_on_session("ProcessData", input, "my-session-1")
# Keyword argument (equivalent)
r2 = yield ctx.schedule_activity("ProcessData", input, session_id="my-session-1")
return [r1, r2]Both activities above run on the same worker slot because they share session_id="my-session-1".
Activities can check their session ID via ctx.session_id. Regular (non-session) activities get None:
@runtime.register_activity("MyActivity")
def my_activity(ctx, input):
if ctx.session_id:
print(f"Running in session: {ctx.session_id}")
else:
print("No session (regular activity)")
return inputConfigure session behavior via RuntimeOptions:
from duroxide import Runtime, RuntimeOptions
runtime = Runtime(provider, RuntimeOptions(
max_sessions_per_runtime=10, # Max concurrent sessions (default: 10)
session_idle_timeout_ms=300000, # Idle timeout before releasing slot (default: 5 min)
worker_node_id="pod-abc-123", # Stable worker identity (e.g., K8s pod name)
))| Option | Default | Description |
|---|---|---|
max_sessions_per_runtime |
10 | Maximum number of concurrent session slots per runtime instance |
session_idle_timeout_ms |
300000 (5 min) | How long an idle session slot is held before being released |
worker_node_id |
Auto-generated | Stable identity for session ownership. Set this to a K8s pod name or hostname so sessions survive runtime restarts on the same node. |
from duroxide import SqliteProvider, Client, Runtime, RuntimeOptions
provider = SqliteProvider.in_memory()
client = Client(provider)
runtime = Runtime(provider, RuntimeOptions(
max_sessions_per_runtime=5,
session_idle_timeout_ms=60000,
worker_node_id="worker-1",
))
# Stateful activity — shares in-memory state within a session
session_state = {}
@runtime.register_activity("Accumulate")
def accumulate(ctx, input):
sid = ctx.session_id
if sid not in session_state:
session_state[sid] = []
session_state[sid].append(input)
return session_state[sid]
@runtime.register_orchestration("BatchProcess")
def batch_process(ctx, input):
for item in input["items"]:
result = yield ctx.schedule_activity_on_session(
"Accumulate", item, f"batch-{input['batch_id']}"
)
return result # All items accumulated in order
runtime.start()
client.start_orchestration("batch-1", "BatchProcess", {
"batch_id": "001",
"items": ["a", "b", "c"],
})
result = client.wait_for_orchestration("batch-1")
print(result.output) # ["a", "b", "c"]
runtime.shutdown()Custom status lets orchestrations report progress visible to external clients. Status updates are fire-and-forget (no yield needed) and survive replays.
@runtime.register_orchestration("ProvisionServer")
def provision_server(ctx, input):
ctx.set_custom_status("validating configuration")
yield ctx.schedule_activity("ValidateConfig", input)
ctx.set_custom_status("creating VM")
vm = yield ctx.schedule_activity("CreateVM", input)
ctx.set_custom_status("installing software")
yield ctx.schedule_activity("InstallSoftware", vm)
ctx.reset_custom_status() # clear status when done
return {"vm_id": vm["id"]}Use client.wait_for_status_change() for efficient status polling — it blocks until the status version changes or the timeout expires:
# Start orchestration
client.start_orchestration("prov-1", "ProvisionServer", config)
# Poll for status changes
last_version = 0
while True:
result = client.wait_for_status_change("prov-1", last_version, 50, 10000)
if result is None:
break # timeout — orchestration may have completed
print(f"Status: {result.custom_status}")
last_version = result.custom_status_versionParameters:
instance_id— the orchestration instance to watchlast_seen_version— the version you last saw (0 to start); returns when version exceeds thispoll_interval_ms— how often to poll the providertimeout_ms— max time to wait before returningNone
OrchestrationResult fields:
result.custom_status— the custom status string, orNoneif not setresult.custom_status_version— monotonically increasing version counter
KV entries are durable metadata scoped to a single orchestration instance. They can be updated from inside the orchestration without yielding and read externally through the client.
@runtime.register_orchestration("RequestServer")
def request_server(ctx, input):
ctx.set_kv_value("status", "ready")
request = yield ctx.wait_for_event("request")
response = request["command"][::-1]
ctx.set_kv_value(f"response:{request['op_id']}", response)
return "done"
status = client.wait_for_kv_value("server-1", "status", 10000)
response = client.get_kv_value("server-1", "response:op-1")OrchestrationContext methods:
ctx.set_kv_value(key, value)— set or overwrite a keyctx.get_kv_value(key)— read the current value for a key in the active instancectx.get_kv_all_values()— return a snapshot dict of all current KV entriesctx.get_kv_all_keys()— return the list of active keysctx.get_kv_length()— return the number of active keysctx.clear_kv_value(key)— remove a single keyctx.clear_all_kv_values()— clear all keys for the active instancectx.prune_kv_values_updated_before(cutoff_ms)— prune keys older than the provided persisted-update cutoffctx.get_kv_value_from_instance(instance_id, key)— read another instance's KV via the built-in syscall activity
Client methods:
client.get_kv_value(instance_id, key)— read a key immediatelyclient.get_kv_value_typed(instance_id, key)— read a key and JSON-decode itclient.wait_for_kv_value(instance_id, key, timeout_ms)— block until the key exists or timeoutclient.wait_for_kv_value_typed(instance_id, key, timeout_ms)— wait for a key and JSON-decode it
Limits:
MAX_KV_KEYS = 150MAX_KV_VALUE_BYTES = 65536
Event queues provide durable, ordered message passing between external clients and orchestrations. Unlike wait_for_event() which waits for a single named event, event queues support FIFO ordering with multiple messages on named queues. Messages survive continue_as_new.
@runtime.register_orchestration("RequestProcessor")
def request_processor(ctx, input):
# Block until a message arrives on the "requests" queue
request_json = yield ctx.dequeue_event("requests")
request = json.loads(request_json)
result = yield ctx.schedule_activity("ProcessRequest", request)
return resultclient.enqueue_event("proc-1", "requests", json.dumps({
"action": "process",
"data": {"id": 42},
}))Orchestrations can dequeue from different named queues:
@runtime.register_orchestration("MultiQueue")
def multi_queue(ctx, input):
# Each queue is independent — FIFO within each queue
command = yield ctx.dequeue_event("commands")
config = yield ctx.dequeue_event("config")
return {"command": command, "config": config}schedule_activity_with_retry_on_session combines retry policies with session affinity — all retry attempts are pinned to the same worker session:
@runtime.register_orchestration("RetrySessionWorkflow")
def retry_session_workflow(ctx, input):
result = yield ctx.schedule_activity_with_retry_on_session(
"FlakeyGpuTask",
input,
{"max_attempts": 3, "backoff_strategy": "none"},
"gpu-session-1",
)
return resultThis is useful when the activity relies on in-memory state (e.g., a loaded ML model or GPU context) that would be lost if retries landed on a different worker.
This pattern combines event queues, custom status, and continue-as-new to build an interactive chat bot that processes messages one at a time:
import json
@runtime.register_activity("Generate")
def generate(ctx, text):
# Call an LLM or generate a response
return f"Echo: {text}"
@runtime.register_orchestration("ChatBot")
def chat_bot(ctx, input):
# Wait for next message on the "inbox" queue
msg_json = yield ctx.dequeue_event("inbox")
msg = json.loads(msg_json)
# Process the message
response = yield ctx.schedule_activity("Generate", msg["text"])
# Report the response via custom status
ctx.set_custom_status(json.dumps({
"state": "replied",
"response": response,
"seq": msg["seq"],
}))
# Exit on "bye", otherwise loop
if "bye" in msg["text"].lower():
return f"Done after {msg['seq']} msgs"
# Restart with fresh history (messages survive continue-as-new)
return (yield ctx.continue_as_new(""))
# --- Client side ---
# Start the chat bot
client.start_orchestration("chat-1", "ChatBot", "")
# Send a message
client.enqueue_event("chat-1", "inbox", json.dumps({"seq": 1, "text": "Hello!"}))
# Wait for the response
status = client.wait_for_status_change("chat-1", 0, 50, 10000)
reply = json.loads(status.custom_status)
print(reply["response"]) # "Echo: Hello!"
# End the conversation
client.enqueue_event("chat-1", "inbox", json.dumps({"seq": 2, "text": "Bye!"}))@runtime.register_activity("SendEmail")
def send_email(ctx, input):
ctx.trace_info(f"sending to {input['to']}")
# ... actual email sending ...
return {"sent": True}Activities can start other orchestrations or raise events:
@runtime.register_activity("TriggerCleanup")
def trigger_cleanup(ctx, input):
client = ctx.get_client()
client.start_orchestration(
f"cleanup-{input['id']}",
"CleanupWorkflow",
{"resource_id": input["id"]},
)
return {"triggered": True}Raise an exception to mark the activity as failed:
@runtime.register_activity("ValidateInput")
def validate_input(ctx, input):
if not input.get("email"):
raise Exception("email is required")
return {"valid": True}Activities can check for cancellation (e.g., when they lose a race):
@runtime.register_activity("LongRunning")
def long_running(ctx, input):
for i in range(100):
if ctx.is_cancelled():
ctx.trace_info("cancelled, cleaning up")
return "cancelled"
time.sleep(0.1) # do work
return "done"Tracing calls are automatically suppressed during replay — no duplicates:
@runtime.register_orchestration("Traced")
def traced(ctx, input):
ctx.trace_info("[v1.0.0] starting workflow")
ctx.trace_debug(f"input: {input}")
result = yield ctx.schedule_activity("Work", input)
ctx.trace_info(f"[v1.0.0] completed: {result}")
return resultActivity traces include full structured metadata (activity name, ID, worker ID):
@runtime.register_activity("FetchData")
def fetch_data(ctx, input):
ctx.trace_info(f"fetching data for {input['id']}")
data = requests.get(input["url"]).json()
ctx.trace_info(f"got {len(data)} records")
return dataRUST_LOG=info pytest -s # INFO and above
RUST_LOG=duroxide::orchestration=debug pytest -s # Orchestration debug
RUST_LOG=duroxide::activity=info pytest -s # Activity info onlyGood for development, testing, and single-node deployments:
# File-backed (persistent)
provider = SqliteProvider.open("myapp.db")
# In-memory (ephemeral, great for tests)
provider = SqliteProvider.in_memory()For production multi-node deployments:
provider = PostgresProvider.connect("postgresql://user:pass@host:5432/mydb")
# With schema isolation
provider = PostgresProvider.connect_with_schema(
"postgresql://user:pass@host:5432/mydb",
"duroxide_app",
)from duroxide import Runtime, RuntimeOptions
runtime = Runtime(provider, RuntimeOptions(
orchestration_concurrency=4, # Max concurrent orchestration dispatches
worker_concurrency=8, # Max concurrent activity workers
dispatcher_poll_interval_ms=100, # Polling interval in ms
log_level="info", # Tracing log level
log_format="pretty", # "pretty" or "json"
service_name="my-service", # Service name for tracing metadata
service_version="1.0.0", # Service version for tracing metadata
max_sessions_per_runtime=10, # Max concurrent session slots
session_idle_timeout_ms=300000, # Session idle timeout (5 min default)
worker_node_id="pod-name", # Stable worker identity for sessions
))Get a snapshot of runtime metrics (requires observability to be configured):
snapshot = runtime.metrics_snapshot()
if snapshot:
print(f"Orchestrations started: {snapshot['orch_starts']}")
print(f"Orchestrations completed: {snapshot['orch_completions']}")
print(f"Activity successes: {snapshot['activity_success']}")
print(f"Provider errors: {snapshot['provider_errors']}")Returns None if observability is not enabled. The snapshot includes counters for orchestration starts/completions/failures, activity results, dispatcher stats, and provider errors.
client = Client(provider)
# Start an orchestration
client.start_orchestration("id", "WorkflowName", input_data)
client.start_orchestration_versioned("id", "WorkflowName", input_data, "1.0.2")
# Wait for completion (with timeout in ms)
result = client.wait_for_orchestration("id", 30000)
# result.status: "Completed" | "Failed" | "Running" | "Terminated" | ...
# result.custom_status: custom status string or None
# result.custom_status_version: monotonically increasing version counter
# Cancel a running orchestration
client.cancel_instance("id", "reason")
# Raise an event
client.raise_event("id", "event_name", event_data)
# Enqueue event to a named queue (FIFO, survives continue-as-new)
client.enqueue_event("id", "queue_name", data_string)
# Poll for custom status changes
status = client.wait_for_status_change("id", last_seen_version, poll_interval_ms, timeout_ms)
# Get status without waiting
status = client.get_status("id")
# Admin operations
client.delete_instance("id", force=False)
metrics = client.get_system_metrics()
depths = client.get_queue_depths()Register multiple versions of an orchestration:
@runtime.register_orchestration("MyWorkflow")
def my_workflow_v1(ctx, input):
# v1.0.0 — original
r = yield ctx.schedule_activity("Work", input)
return r
@runtime.register_orchestration_versioned("MyWorkflow", "1.0.1")
def my_workflow_v2(ctx, input):
# v1.0.1 — with validation
yield ctx.schedule_activity("Validate", input)
r = yield ctx.schedule_activity("Work", input)
return rNew orchestrations use the latest version. Running orchestrations stay on their original version until they complete or call continue_as_new.
In Rust, arbitrary async blocks can be composed with join()/select(). In the Python SDK, all()/race() only accept single task descriptors — multi-step blocks must be wrapped as sub-orchestrations.
# Pattern: wrap multi-step logic as a sub-orchestration
@runtime.register_orchestration("BlockA")
def block_a(ctx, input):
first = yield ctx.schedule_activity("Step", "A1")
if "step" in first:
second = yield ctx.schedule_activity("Step", "A2")
return f"A:[{first},{second}]"
return "A:fallback"
@runtime.register_orchestration("BlockB")
def block_b(ctx, input):
yield ctx.schedule_timer(5)
result = yield ctx.schedule_activity("Step", "B1")
return f"B:[timer,{result}]"
# Parent: join/race sub-orchestration descriptors
@runtime.register_orchestration("Parent")
def parent(ctx, input):
# Join multiple multi-step blocks
a, b = yield ctx.all([
ctx.schedule_sub_orchestration("BlockA", ""),
ctx.schedule_sub_orchestration("BlockB", ""),
])
return f"{a},{b}"Use all() for joining (all must complete) and race() for racing (first wins, loser is cancelled). For 3+ way races, nest race() calls. See test_async_blocks.py for 12 examples covering join, race, nested chains, and timeout patterns.
Orchestration functions must be deterministic. The replay engine re-executes the generator from the beginning on every dispatch, feeding back cached results. If the code path changes, replay breaks.
Do:
- Use
yield ctx.utc_now()for timestamps - Use
yield ctx.new_guid()for random IDs - Use
ctx.trace_info()for logging (auto-suppressed during replay)
Don't:
- Use
time.time(),random.random(),uuid.uuid4() - Make HTTP calls or read files in orchestrations
- Use
print()(will duplicate on replay — usectx.trace_info()instead) - Read environment variables that might change between restarts
Activities have no such restrictions — they run once and can do anything.