Data Pipeline Example¶
This example demonstrates the simplest graph topology: a sequential pipeline
with three nodes chained via next_node.
Overview¶
The data pipeline simulates an ETL (Extract, Transform, Load) workflow:
- Extract — fetches raw records from a source
- Transform — cleans and normalizes each record
- Load — writes the processed records to a destination
flowchart LR
A[extract] --> B[transform] --> C[load]
State Model¶
from pydantic import BaseModel, Field
class PipelineState(BaseModel):
source_url: str
raw_records: list[dict] = Field(default_factory=list)
transformed_records: list[dict] = Field(default_factory=list)
load_result: str | None = None
record_count: int = 0
Node Handlers¶
extract¶
Simulates fetching records from an external source:
def extract(state: PipelineState) -> dict:
raw = [
{"id": 1, "name": " Alice ", "score": "85"},
{"id": 2, "name": " Bob ", "score": "92"},
{"id": 3, "name": " Carol ", "score": "78"},
]
return {"raw_records": raw, "record_count": len(raw)}
transform¶
Cleans whitespace and casts types:
def transform(state: PipelineState) -> dict:
transformed = [
{
"id": r["id"],
"name": str(r["name"]).strip(),
"score": int(r["score"]),
}
for r in state.raw_records
]
return {"transformed_records": transformed}
load¶
Writes processed records to the destination:
def load(state: PipelineState) -> dict:
count = len(state.transformed_records)
return {
"load_result": f"Loaded {count} records to {state.source_url}/output",
}
Graph Definition¶
from azure_functions_durable_graph import ManifestBuilder
builder = ManifestBuilder(
graph_name="data_pipeline",
state_model=PipelineState,
version="0.1.0",
metadata={"example": True, "profile": "etl"},
)
builder.set_entrypoint("extract")
builder.add_node("extract", extract, next_node="transform")
builder.add_node("transform", transform, next_node="load")
builder.add_node("load", load, terminal=True)
registration = builder.build()
Running the Example¶
Wire it into your function_app.py:
from azure_functions_durable_graph import DurableGraphApp
from examples.data_pipeline.graph import registration
runtime = DurableGraphApp()
runtime.register_registration(registration)
app = runtime.function_app
Start a run¶
curl -X POST http://localhost:7071/api/graphs/data_pipeline/runs \
-H "Content-Type: application/json" \
-d '{"input": {"source_url": "https://example.com/data"}}'
Check status¶
Expected final state:
{
"state": {
"source_url": "https://example.com/data",
"raw_records": [
{"id": 1, "name": " Alice ", "score": "85"},
{"id": 2, "name": " Bob ", "score": "92"},
{"id": 3, "name": " Carol ", "score": "78"}
],
"transformed_records": [
{"id": 1, "name": "Alice", "score": 85},
{"id": 2, "name": "Bob", "score": 92},
{"id": 3, "name": "Carol", "score": 78}
],
"load_result": "Loaded 3 records to https://example.com/data/output",
"record_count": 3
}
}
Key Patterns Demonstrated¶
- Sequential chaining: nodes connected with
next_nodefor linear execution - State accumulation: each node adds to the state without overwriting previous fields
- Terminal node: the final
loadnode is markedterminal=Trueto end the graph