ETL Enrichment¶
Trigger: Blob / Timer | State: stateless | Guarantee: at-least-once | Difficulty: intermediate
Overview¶
The examples/data-and-pipelines/etl_enrichment/ sample shows an Extract-Transform-Load workflow
with an enrichment step in the middle. A blob-triggered Azure Function reads raw JSON records,
normalizes core fields, enriches each record with reference data such as region metadata or
geocoding-like lookups, and then writes the enriched rows to a database through
azure-functions-db-python.
This pattern is useful when source data arrives incomplete and must be joined with external context before it becomes analytically useful. The enrichment logic stays deterministic and replay-safe so the pipeline can tolerate retried blob executions.
When to Use¶
- Raw files arrive in blob storage and must be processed asynchronously.
- Source records need reference-data lookups before loading into a reporting or serving store.
- You want a simple, function-based ETL stage without provisioning a larger orchestration system.
When NOT to Use¶
- You need exactly-once delivery and the destination cannot deduplicate replayed writes.
- Enrichment depends on slow, high-latency external APIs that should be decoupled behind queues.
- The workflow requires cross-file transactions or long-running stateful orchestration.
Architecture¶
flowchart LR
blob[(raw-data blob)] --> extract[extract records]
extract --> enrich[enrich with lookups]
enrich --> load[load enriched rows]
load --> db[(database)]
Behavior¶
sequenceDiagram
participant Blob as Blob Storage
participant Func as etl_enrich_blob
participant Lookup as Lookup table
participant DB as Database
Blob->>Func: Trigger with raw JSON blob
Func->>Func: Parse and validate records
loop for each record
Func->>Lookup: Resolve enrichment metadata
Lookup-->>Func: Region / coordinates / tags
Func->>Func: Build enriched row
end
Func->>DB: Batch insert enriched rows
Func-->>Blob: Invocation completed
Prerequisites¶
- Python 3.10+
- Azure Functions Core Tools v4
- Azurite or an Azure Storage account for the blob trigger
- A database reachable through
DB_URL azure-functions-db-pythonandazure-functions-logging-python
Project Structure¶
examples/data-and-pipelines/etl_enrichment/
|-- function_app.py
|-- host.json
|-- local.settings.json.example
|-- pyproject.toml
`-- README.md
Implementation¶
The blob trigger reads a JSON array from the raw-data container and logs the extraction stage.
Each input record is normalized into a consistent shape before enrichment.
@app.blob_trigger(
arg_name="myblob",
path="raw-data/{name}",
connection="AzureWebJobsStorage",
)
@db.output("out", url="%DB_URL%", table="enriched_customers")
def etl_enrich_blob(myblob: func.InputStream, out: DbOut) -> None:
raw_records = _load_raw_records(myblob.read())
enriched_rows = [_enrich_record(record) for record in raw_records]
out.set(enriched_rows)
The enrichment stage can call lookup helpers, cached reference tables, or SDK clients. In this example it derives region details and pseudo-geocode data from deterministic local mappings so the pipeline stays replay-safe.
def _enrich_record(record: RawRecord) -> dict[str, str | float]:
region = REGION_LOOKUPS.get(record["country_code"], FALLBACK_LOOKUP)
return {
"customer_id": record["customer_id"],
"city": record["city"],
"country_code": record["country_code"],
"region": region["region"],
"market_tier": region["market_tier"],
"latitude": region["latitude"],
"longitude": region["longitude"],
}
Run Locally¶
cd examples/data-and-pipelines/etl_enrichment
python -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"
cp local.settings.json.example local.settings.json
func start
Upload a blob such as customers-2026-04-17.json to the raw-data container:
[
{"customer_id": "cust-1001", "city": "Seattle", "country_code": "US"},
{"customer_id": "cust-1002", "city": "Berlin", "country_code": "DE"}
]
Expected Output¶
[Information] ETL extraction started blob=raw-data/customers-2026-04-17.json size_bytes=126
[Information] ETL enrichment completed blob=raw-data/customers-2026-04-17.json input_count=2 enriched_count=2
[Information] Loaded enriched rows target_table=enriched_customers count=2
Production Considerations¶
- Scaling: blob-trigger concurrency affects downstream database pressure; size connection pools accordingly.
- Retries: blob-trigger replays can reinsert rows, so use upserts or natural keys in the destination.
- Idempotency: include source blob name, record key, or version fields to support deduplication.
- Observability: emit per-stage counts, enrichment misses, and load latency with structured logs.
- Reference data: cache slow lookups or materialize them locally when latency or rate limits matter.