Service Bus Worker¶
Overview¶
The examples/servicebus/servicebus_worker/ recipe consumes messages from the tasks queue using
@app.service_bus_queue_trigger. It extracts delivery metadata (correlation_id, delivery_count),
parses JSON safely, and routes valid payloads to a processing helper.
Service Bus is appropriate when you need durable messaging, lock-based processing, dead-lettering, and enterprise integration features. This sample focuses on operational visibility and failure handling patterns that are important for production background workers.
When to Use¶
- You need reliable queue processing with built-in delivery semantics.
- You want message metadata for tracing, retries, and diagnostics.
- You require dead-letter queues for poison message isolation.
Architecture¶
+----------------------+ send task JSON +----------------------+
| Producer / API | -----------------> | Service Bus queue |
| sets correlation_id | | tasks |
+----------------------+ +----------+-----------+
|
v
+----------+-----------+
| Azure Functions Host |
| service_bus trigger |
+----------+-----------+
|
v
+----------+-----------+
| process_service_bus_ |
| message |
| - parse JSON |
| - log delivery_count |
+----------+-----------+
|
v
+----------------------+
| DLQ on poison flow |
+----------------------+
Prerequisites¶
- Python 3.10+
- Azure Functions Core Tools v4
- Azure Service Bus namespace with queue
tasks ServiceBusConnectionapp setting (connection string or identity-based config)
Project Structure¶
examples/servicebus/servicebus_worker/
|-- function_app.py
|-- host.json
|-- local.settings.json.example
|-- requirements.txt
`-- README.md
Implementation¶
The function binds directly to tasks and reads both payload and transport metadata before doing
business logic. This makes retries and diagnostics explicit.
@app.service_bus_queue_trigger(
arg_name="msg",
queue_name="tasks",
connection="ServiceBusConnection",
)
def process_service_bus_message(msg: func.ServiceBusMessage) -> None:
raw_body = msg.get_body().decode("utf-8", errors="replace")
correlation_id = getattr(msg, "correlation_id", None)
delivery_count = int(getattr(msg, "delivery_count", 1))
The parser guards against invalid JSON and logs context that operators need to evaluate whether the message should be fixed and replayed from dead-letter storage.
try:
payload: dict[str, Any] = json.loads(raw_body)
except json.JSONDecodeError:
logger.error(
"Invalid Service Bus JSON (correlation_id=%s delivery_count=%d): %s",
correlation_id,
delivery_count,
raw_body,
)
return
outcome = _process_service_bus_message(payload)
logger.info("Service Bus processing complete: %s", outcome)
delivery_count should inform alerting and remediation. When it approaches your max delivery policy,
the message may dead-letter and require manual or automated replay.
Run Locally¶
Expected Output¶
[Information] Service Bus message received correlation_id=req-142 delivery_count=1
[Information] Service Bus processing complete: task=generate_report priority=high status=processed
[Error] Invalid Service Bus JSON (correlation_id=req-188 delivery_count=5): {bad payload}
[Warning] Message likely to dead-letter after max delivery attempts
Production Considerations¶
- Scaling: increase instance count and
maxConcurrentCallsfor I/O-bound handlers. - Retries: rely on Service Bus lock + redelivery; avoid swallowing transient exceptions silently.
- Idempotency: use message ID or business key to avoid duplicate side effects on redelivery.
- Observability: always log
correlation_id,delivery_count, and processing outcome. - Security: prefer managed identity with least-privilege
Azure Service Bus Data Receiverrole.