File Processing Pipeline¶
Trigger: Blob (Event Grid) | State: stateless | Guarantee: at-least-once | Difficulty: intermediate
Overview¶
The examples/data-and-pipelines/file_processing_pipeline/ sample shows a multi-stage blob processing
pipeline where an uploaded CSV or JSON file triggers validation, transformation, and persistence to a
database. The function uses Event Grid-backed blob notifications for low-latency activation,
azure-functions-logging-python for structured telemetry, and azure-functions-db-python to write the processed
result to a downstream table.
This recipe is a strong fit for ingestion workflows where files arrive asynchronously and need a clear, repeatable pipeline before they become queryable application data.
When to Use¶
- You ingest CSV or JSON files into a storage container and need automated processing.
- You want lightweight, stateless stage execution inside a single Azure Function invocation.
- You need structured logs plus a normalized database record for each successfully processed file.
- Your downstream writes can be made replay-safe for at-least-once delivery.
When NOT to Use¶
- Processing requires human approval or long-running orchestration between stages.
- Files are too large for single-invocation parsing and should be chunked or fan-out processed.
- The downstream database cannot tolerate duplicate writes or replay without idempotency keys.
- You need synchronous client feedback before persistence completes.
Architecture¶
flowchart LR
upload[Upload CSV / JSON file] --> blob[Blob Storage container]
blob --> trigger[Blob Trigger<br/>source=Event Grid]
trigger --> validate[Validate schema and values]
validate --> transform[Transform into normalized records]
transform --> persist[Persist with azure-functions-db-python]
persist --> db[(Application DB)]
trigger -. telemetry .-> logging[azure-functions-logging-python]
Behavior¶
sequenceDiagram
participant U as Uploader
participant B as Blob Storage
participant F as file_processing_pipeline
participant D as Database
U->>B: Upload CSV / JSON file
B->>F: Fire Event Grid-backed blob trigger
F->>F: Read payload and detect file type
F->>F: Validate required fields and value types
F->>F: Transform rows into normalized output
F->>D: Persist processed summary and rows
F-->>B: Invocation completes
Implementation¶
The example uses a blob trigger configured with source=func.BlobSource.EVENT_GRID so the runtime is
activated by Event Grid notifications instead of blob polling.
@app.blob_trigger(
arg_name="myblob",
path="incoming/{name}",
connection="AzureWebJobsStorage",
source=func.BlobSource.EVENT_GRID,
)
@db.output("out", url="%DB_URL%", table="processed_files")
def process_uploaded_file(myblob: func.InputStream, out: DbOut) -> None:
...
Inside the invocation, the pipeline executes three explicit stages:
- Validate: parse CSV or JSON, require
id,category, andamount, and reject empty/invalid files. - Transform: normalize categories, coerce numeric amounts, and compute file-level summary fields.
- Persist: write one processed-file record to the configured database table.
Structured logging surrounds the stages so each invocation emits searchable metadata such as blob name, record counts, and persisted row identifiers.
Project Structure¶
examples/data-and-pipelines/file_processing_pipeline/
|-- function_app.py
|-- host.json
|-- local.settings.json.example
|-- pyproject.toml
`-- README.md
Config¶
Set these values in local.settings.json when running locally:
| Variable | Purpose | Example |
|---|---|---|
AzureWebJobsStorage |
Storage account used by the blob trigger | UseDevelopmentStorage=true |
FUNCTIONS_WORKER_RUNTIME |
Azure Functions language worker | python |
DB_URL |
Connection string or URL consumed by azure-functions-db-python |
postgresql://postgres:postgres@localhost:5432/appdb |
Use an incoming blob container for uploaded files such as orders.csv or orders.json.
Run Locally¶
cd examples/data-and-pipelines/file_processing_pipeline
python -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"
cp local.settings.json.example local.settings.json
func start
Then upload a file into the incoming container.
Example JSON payload:
[
{"id": "order-1001", "category": "Retail", "amount": 19.95},
{"id": "order-1002", "category": "Wholesale", "amount": 40.0}
]
Expected Output¶
[Information] Blob pipeline started blob_name=incoming/orders.json size_bytes=118
[Information] File validated blob_name=incoming/orders.json record_count=2
[Information] File transformed blob_name=incoming/orders.json normalized_count=2 total_amount=59.95
[Information] Blob pipeline completed blob_name=incoming/orders.json persisted_id=<uuid>
Production Considerations¶
- Idempotency: include blob name, ETag, or content hash in the persisted record to avoid duplicate side effects.
- Schema evolution: version file formats explicitly when adding or renaming columns.
- Large files: move to chunked parsing or queue-based fan-out when files exceed single-invocation memory limits.
- Observability: log validation failures, transformation counts, and persistence latency with consistent fields.
- Security: restrict storage and database access with managed identity, private networking, and least privilege.