Queue Worker¶
Overview¶
This recipe demonstrates asynchronous background processing with an Azure Storage Queue trigger. A producer writes messages to a queue, and the worker function processes each message independently.
The paired runnable project is examples/queue_worker.
Primary use case¶
Use this pattern when you need:
- long-running operations moved out of HTTP request path
- burst handling through queued backlog
- retry-aware processing with dequeue counts
- producer/consumer separation between services
Architecture diagram (text)¶
Producer (HTTP app / external system)
|
| enqueue JSON message
v
Azure Storage Queue (work-items)
|
| trigger
v
process_queue_message(msg)
|- decode bytes to text
|- parse JSON payload
|- process task data
'- log result and dequeue metadata
Failure path:
processing error -> runtime retry -> poison queue after max dequeue
Prerequisites¶
- Python 3.10+
- Azure Functions Core Tools v4
- Azurite for local queue emulation (recommended)
AzureWebJobsStorageconfigured for local environment
Install and run:
Start Azurite in another terminal:
Step-by-step implementation guide¶
Implementation flow in examples/queue_worker/function_app.py:
- Define
app = func.FunctionApp()and logger. - Implement
_process_task(data)for business logic. - Register queue trigger for queue
work-items. - Read message id, dequeue count, and body bytes.
- Decode bytes to UTF-8 text.
- Parse JSON payload with error handling.
- Execute processing function and write completion logs.
Complete code walkthrough¶
Worker setup and task processor¶
import azure.functions as func
import json
import logging
app = func.FunctionApp()
logger = logging.getLogger(__name__)
def _process_task(data: dict) -> str:
task_id = data.get("id", "unknown")
action = data.get("action", "unknown")
logger.info("Processing task %s: %s", task_id, action)
return f"Completed task {task_id}"
_process_task is where real domain work would be performed.
Queue trigger handler¶
@app.queue_trigger(
arg_name="msg",
queue_name="work-items",
connection="AzureWebJobsStorage",
)
def process_queue_message(msg: func.QueueMessage) -> None:
message_id = msg.id
body_bytes = msg.get_body()
body_text = body_bytes.decode("utf-8")
dequeue_count = msg.dequeue_count
logger.info(
"Received message %s (dequeue_count=%d): %s",
message_id,
dequeue_count,
body_text,
)
try:
data = json.loads(body_text)
except (ValueError, json.JSONDecodeError):
logger.error("Invalid JSON in message %s — skipping", message_id)
return
result = _process_task(data)
logger.info("Message %s result: %s", message_id, result)
This is a practical baseline for safe queue payload handling.
Running locally¶
1) Configure storage connection¶
Set local connection string to development storage in local settings.
2) Start Azurite¶
3) Start worker¶
4) Enqueue a test message¶
Use any queue client/tool to push JSON text into work-items:
Expected output¶
Typical logs after message arrival:
Received message <id> (dequeue_count=1): {"id":"job-001","action":"thumbnail"}
Processing task job-001: thumbnail
Message <id> result: Completed task job-001
Invalid payload log path:
Production considerations¶
- Keep handlers idempotent because retries can reprocess messages.
- Track dequeue count and alert on repeated failures.
- Route permanent failures to poison-queue remediation flow.
- Keep message schema compact and versioned.
- Put large payloads in blob storage and queue references only.
- Tune queue extension settings for throughput and latency.
- Add structured logging with message id and correlation id.
- Protect storage access with managed identity where possible.
Warning
Do not assume strict FIFO ordering with Azure Storage Queues.
Suggested host settings (from source recipe)¶
{
"version": "2.0",
"extensions": {
"queues": {
"maxPollingInterval": "00:00:02",
"visibilityTimeout": "00:00:30",
"batchSize": 16,
"maxDequeueCount": 5,
"newBatchThreshold": 8
}
}
}
Related recipes¶
- For request-facing APIs, see HTTP API Basic.
- For scheduled processing without producer messages, see Timer Job.
Ecosystem links¶
azure-functions-loggingfor traceable worker logsazure-functions-validationfor payload schema checksazure-functions-scaffoldfor queue app bootstrapping