Skip to content

API Reference

azure_functions_db

SqlAlchemySource(*, url, table=None, schema=None, query=None, cursor_column, pk_columns, where=None, parameters=None, strategy='cursor', operation_mode='upsert_only', engine_provider=None)

SQLAlchemy-based source adapter for cursor-based change polling.

Implements the SourceAdapter protocol defined in azure_functions_db.trigger.runner.

Parameters

url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Table name to poll. Mutually exclusive with query. schema: Optional schema qualifier for table. query: Raw SQL query to poll. Mutually exclusive with table. cursor_column: Column used for cursor-based ordering. pk_columns: Primary-key column(s) for tie-breaking within the same cursor value. where: Optional extra SQL WHERE clause fragment (appended with AND). parameters: Optional bind parameters for where or query. strategy: Polling strategy. Only "cursor" is supported. operation_mode: Operation mode. Only "upsert_only" is supported.

Source code in src/azure_functions_db/adapter/sqlalchemy.py
def __init__(
    self,
    *,
    url: str,
    table: str | None = None,
    schema: str | None = None,
    query: str | None = None,
    cursor_column: str,
    pk_columns: list[str],
    where: str | None = None,
    parameters: dict[str, object] | None = None,
    strategy: str = "cursor",
    operation_mode: str = "upsert_only",
    engine_provider: EngineProvider | None = None,
) -> None:
    if not url:
        msg = "url must not be empty"
        raise SourceConfigurationError(msg)
    if table and query:
        msg = "Exactly one of 'table' or 'query' must be provided, not both"
        raise SourceConfigurationError(msg)
    if not table and not query:
        msg = "Exactly one of 'table' or 'query' must be provided"
        raise SourceConfigurationError(msg)
    if not cursor_column:
        msg = "cursor_column must not be empty"
        raise SourceConfigurationError(msg)
    if not pk_columns:
        msg = "pk_columns must not be empty"
        raise SourceConfigurationError(msg)
    if strategy != "cursor":
        msg = f"Unsupported strategy: '{strategy}'. Only 'cursor' is supported."
        raise SourceConfigurationError(msg)
    if operation_mode != "upsert_only":
        msg = (
            f"Unsupported operation_mode: '{operation_mode}'. Only 'upsert_only' is supported."
        )
        raise SourceConfigurationError(msg)

    self._url = _resolve_env_vars(url)
    self._table_name = table
    self._schema = schema
    self._query = query
    self._cursor_column = cursor_column
    self._pk_columns = list(pk_columns)
    self._where = where
    self._parameters = dict(parameters) if parameters else {}
    self._strategy = strategy
    self._operation_mode = operation_mode
    self._engine_provider = engine_provider
    self._owns_engine = False
    self._db_config = DbConfig(connection_url=self._url)

    self._engine: Engine | None = None
    self._table: Table | None = None
    self._initialized = False

    self._fingerprint = self._compute_fingerprint()
    self._descriptor = SourceDescriptor(
        name=self._compute_name(),
        kind="sqlalchemy",
        fingerprint=self._fingerprint,
    )

fetch(cursor, batch_size)

Fetch a batch of records newer than cursor.

Returns an empty sequence when no new records are available.

Source code in src/azure_functions_db/adapter/sqlalchemy.py
def fetch(self, cursor: CursorValue | None, batch_size: int) -> Sequence[RawRecord]:
    """Fetch a batch of records newer than *cursor*.

    Returns an empty sequence when no new records are available.
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101

    try:
        stmt = self._build_query(cursor, batch_size)
        with self._engine.connect() as conn:
            result = conn.execute(stmt, self._parameters)
            return [dict(row._mapping) for row in result]
    except (SourceConfigurationError, FetchError):
        raise
    except Exception as exc:
        msg = "Failed to fetch records"
        raise FetchError(msg) from exc

dispose()

Dispose the underlying engine and release connection pool.

Source code in src/azure_functions_db/adapter/sqlalchemy.py
def dispose(self) -> None:
    """Dispose the underlying engine and release connection pool."""
    if self._engine is not None:
        if self._owns_engine:
            self._engine.dispose()
        self._engine = None
        self._table = None
        self._initialized = False
        self._owns_engine = False

DbReader(*, url, table=None, schema=None, engine_provider=None)

Imperative input binding for reading database rows.

Provides get() for single-row lookup by primary key and query() for arbitrary SQL queries. Uses SQLAlchemy Core under the hood and integrates with :class:EngineProvider for shared connection pooling.

Thread Safety

Instances are not safe to share across concurrent threads or async invocations. Create a separate DbReader per function invocation, or use a with block to scope the lifecycle.

Parameters

url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Table name for get() operations. Optional if only query() is used. schema: Optional schema qualifier for table. engine_provider: Optional shared :class:EngineProvider. When provided, the reader uses a pooled engine instead of creating its own.

Source code in src/azure_functions_db/binding/reader.py
def __init__(
    self,
    *,
    url: str,
    table: str | None = None,
    schema: str | None = None,
    engine_provider: EngineProvider | None = None,
) -> None:
    if not url:
        msg = "url must not be empty"
        raise ConfigurationError(msg)

    try:
        self._url = resolve_env_vars(url)
    except ConfigurationError:
        raise

    self._table_name = table
    self._schema = schema
    self._engine_provider = engine_provider
    self._db_config = DbConfig(connection_url=self._url)

    self._engine: Engine | None = None
    self._table: Table | None = None
    self._owns_engine = False
    self._initialized = False

get(*, pk)

Look up a single row by primary key.

Requires table to have been set in the constructor.

Parameters

pk: Mapping of primary-key column name to value. All keys must be actual primary key columns of the table.

Returns

dict or None The matching row as a dict, or None if no row matches.

Raises

ConfigurationError If table was not set, or pk contains unknown columns. QueryError If more than one row matches the provided key.

Source code in src/azure_functions_db/binding/reader.py
def get(self, *, pk: dict[str, object]) -> dict[str, object] | None:
    """Look up a single row by primary key.

    Requires *table* to have been set in the constructor.

    Parameters
    ----------
    pk:
        Mapping of primary-key column name to value.  All keys must be
        actual primary key columns of the table.

    Returns
    -------
    dict or None
        The matching row as a dict, or ``None`` if no row matches.

    Raises
    ------
    ConfigurationError
        If *table* was not set, or *pk* contains unknown columns.
    QueryError
        If more than one row matches the provided key.
    """
    if self._table_name is None:
        msg = "get() requires 'table' to be set in DbReader constructor"
        raise ConfigurationError(msg)

    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    self._validate_pk_columns(pk)

    try:
        conditions = [self._table.c[col] == val for col, val in pk.items()]
        stmt: Any = select(self._table).where(and_(*conditions)).limit(2)

        with self._engine.connect() as conn:
            result = conn.execute(stmt)
            rows = [dict(row._mapping) for row in result]
    except (ConfigurationError, QueryError):
        raise
    except Exception as exc:
        msg = "Failed to execute get() query"
        raise QueryError(msg) from exc

    if len(rows) == 0:
        return None

    if len(rows) > 1:
        msg = f"get() expected at most 1 row but found multiple matches for pk={pk}"
        raise QueryError(msg)

    return rows[0]

query(sql, *, params=None)

Execute a raw SQL query and return all matching rows.

Always use :name parameter placeholders and params instead of string formatting to prevent SQL injection. True read-only enforcement should be done at the database role/permission level.

Parameters

sql: SQL query string. Use :name placeholders for parameters. params: Optional mapping of parameter names to values.

Returns

list[dict] List of rows, each as a dict. Empty list if no rows match.

Raises

QueryError If the query execution fails.

Source code in src/azure_functions_db/binding/reader.py
def query(
    self,
    sql: str,
    *,
    params: dict[str, object] | None = None,
) -> list[dict[str, object]]:
    """Execute a raw SQL query and return all matching rows.

    Always use ``:name`` parameter placeholders and *params* instead of
    string formatting to prevent SQL injection.  True read-only
    enforcement should be done at the database role/permission level.

    Parameters
    ----------
    sql:
        SQL query string.  Use ``:name`` placeholders for parameters.
    params:
        Optional mapping of parameter names to values.

    Returns
    -------
    list[dict]
        List of rows, each as a dict.  Empty list if no rows match.

    Raises
    ------
    QueryError
        If the query execution fails.
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101

    try:
        stmt = text(sql)
        with self._engine.connect() as conn:
            if params:
                result = conn.execute(stmt, params)
            else:
                result = conn.execute(stmt)
            return [dict(row._mapping) for row in result]
    except (ConfigurationError, QueryError):
        raise
    except Exception as exc:
        msg = "Failed to execute query"
        raise QueryError(msg) from exc

close()

Release resources held by this reader.

If the reader owns its engine (no engine_provider was given), the engine is disposed. If using a shared engine via engine_provider, only the reader's internal state is reset.

Source code in src/azure_functions_db/binding/reader.py
def close(self) -> None:
    """Release resources held by this reader.

    If the reader owns its engine (no ``engine_provider`` was given),
    the engine is disposed.  If using a shared engine via
    ``engine_provider``, only the reader's internal state is reset.
    """
    if self._engine is not None:
        if self._owns_engine:
            self._engine.dispose()
        self._engine = None
        self._table = None
        self._initialized = False
        self._owns_engine = False

DbWriter(*, url, table, schema=None, engine_provider=None)

Imperative output binding for writing database rows.

Provides insert(), upsert(), update(), delete() for single-row operations and insert_many() / upsert_many() for batch operations. Uses SQLAlchemy Core under the hood.

Thread Safety

Instances are not safe to share across concurrent threads or async invocations. Create a separate DbWriter per function invocation, or use a with block to scope the lifecycle.

Source code in src/azure_functions_db/binding/writer.py
def __init__(
    self,
    *,
    url: str,
    table: str,
    schema: str | None = None,
    engine_provider: EngineProvider | None = None,
) -> None:
    if not url:
        msg = "url must not be empty"
        raise ConfigurationError(msg)

    if not table:
        msg = "table must not be empty"
        raise ConfigurationError(msg)

    try:
        self._url = resolve_env_vars(url)
    except ConfigurationError:
        raise

    self._table_name = table
    self._schema = schema
    self._engine_provider = engine_provider
    self._db_config = DbConfig(connection_url=self._url)

    self._engine: Engine | None = None
    self._table: Table | None = None
    self._owns_engine = False
    self._initialized = False

insert(*, data)

Insert a single row.

Raises :class:WriteError on constraint violations or other database errors.

Source code in src/azure_functions_db/binding/writer.py
def insert(self, *, data: dict[str, object]) -> None:
    """Insert a single row.

    Raises :class:`WriteError` on constraint violations or other
    database errors.
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    self._validate_data_columns(data)

    try:
        with self._engine.begin() as conn:
            conn.execute(self._table.insert().values(**data))
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to insert row"
        raise WriteError(msg) from exc

insert_many(*, rows)

Insert multiple rows in a single transaction (all-or-nothing).

Source code in src/azure_functions_db/binding/writer.py
def insert_many(self, *, rows: list[dict[str, object]]) -> None:
    """Insert multiple rows in a single transaction (all-or-nothing)."""
    if not rows:
        return

    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    for row in rows:
        self._validate_data_columns(row)

    try:
        with self._engine.begin() as conn:
            conn.execute(self._table.insert(), rows)
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to insert rows"
        raise WriteError(msg) from exc

upsert(*, data, conflict_columns)

Insert or update a single row using dialect-specific upsert.

Supported dialects: PostgreSQL, SQLite, MySQL. Other dialects raise :class:ConfigurationError.

Source code in src/azure_functions_db/binding/writer.py
def upsert(
    self,
    *,
    data: dict[str, object],
    conflict_columns: list[str],
) -> None:
    """Insert or update a single row using dialect-specific upsert.

    Supported dialects: PostgreSQL, SQLite, MySQL.  Other dialects
    raise :class:`ConfigurationError`.
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    self._validate_data_columns(data)
    self._validate_conflict_columns(conflict_columns)

    try:
        stmt = self._build_upsert_stmt(data, conflict_columns)
        with self._engine.begin() as conn:
            conn.execute(stmt)
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to upsert row"
        raise WriteError(msg) from exc

upsert_many(*, rows, conflict_columns)

Upsert multiple rows in a single transaction (all-or-nothing).

Source code in src/azure_functions_db/binding/writer.py
def upsert_many(
    self,
    *,
    rows: list[dict[str, object]],
    conflict_columns: list[str],
) -> None:
    """Upsert multiple rows in a single transaction (all-or-nothing)."""
    if not rows:
        return

    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    for row in rows:
        self._validate_data_columns(row)
    self._validate_conflict_columns(conflict_columns)

    try:
        with self._engine.begin() as conn:
            for row in rows:
                stmt = self._build_upsert_stmt(row, conflict_columns)
                conn.execute(stmt)
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to upsert rows"
        raise WriteError(msg) from exc

update(*, data, pk)

Update a single row identified by primary key.

This is a no-op if no row matches the given pk (idempotent).

Source code in src/azure_functions_db/binding/writer.py
def update(self, *, data: dict[str, object], pk: dict[str, object]) -> None:
    """Update a single row identified by primary key.

    This is a no-op if no row matches the given *pk* (idempotent).
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    self._validate_data_columns(data)
    self._validate_pk_columns(pk)

    try:
        conditions = [self._table.c[col] == val for col, val in pk.items()]
        stmt: Any = update(self._table).where(and_(*conditions)).values(**data)
        with self._engine.begin() as conn:
            conn.execute(stmt)
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to update row"
        raise WriteError(msg) from exc

delete(*, pk)

Delete a single row identified by primary key.

This is a no-op if no row matches the given pk (idempotent).

Source code in src/azure_functions_db/binding/writer.py
def delete(self, *, pk: dict[str, object]) -> None:
    """Delete a single row identified by primary key.

    This is a no-op if no row matches the given *pk* (idempotent).
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    self._validate_pk_columns(pk)

    try:
        conditions = [self._table.c[col] == val for col, val in pk.items()]
        stmt: Any = delete(self._table).where(and_(*conditions))
        with self._engine.begin() as conn:
            conn.execute(stmt)
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to delete row"
        raise WriteError(msg) from exc

close()

Release resources held by this writer.

Source code in src/azure_functions_db/binding/writer.py
def close(self) -> None:
    """Release resources held by this writer."""
    if self._engine is not None:
        if self._owns_engine:
            self._engine.dispose()
        self._engine = None
        self._table = None
        self._initialized = False
        self._owns_engine = False

DbBindings

Azure Functions-style decorator API for database integration.

Provides trigger, input, output, inject_reader, and inject_writer decorator methods that wrap the imperative API (PollTrigger, DbReader, DbWriter) in an Azure Functions-native decorator experience.

Data injection (input / output): input injects query results into handler parameters. output injects a :class:DbOut instance; call .set() to write data explicitly.

Client injection (inject_reader / inject_writer): Handlers receive DbReader / DbWriter instances for imperative control.

Decorator order contract

Decorator composition rules: - Azure decorators outermost, db decorators closest to the function - trigger + output can be combined (process events and write results) - trigger + inject_writer can be combined (imperative write in trigger handler) - input + output can be combined (read data, write results) - input and inject_reader are mutually exclusive - output and inject_writer are mutually exclusive - No decorator can be applied twice to the same handler

Valid combinations::

@app.schedule(...)
@db.trigger(...)        # Azure trigger outermost
@db.output("out", ...)  # db output innermost
def handler(events, out: DbOut) -> None:
    out.set([...])

@db.input("user", ...)
@db.output("out", ...)
def handler(user, out: DbOut) -> dict:
    out.set({...})
    return user

Note: This is a pseudo-trigger implementation. trigger requires an actual Azure Functions trigger (e.g. @app.schedule) to fire. It does not register a native Azure Functions binding.

trigger(*, arg_name, source, checkpoint_store, name=None, normalizer=None, batch_size=100, max_batches_per_tick=1, lease_ttl_seconds=120, retry_policy=None, metrics=None)

Decorator for database change detection (pseudo-trigger).

Wraps a handler function so that on each invocation it polls the database source for new/changed rows and passes them to the handler.

The decorated function's arg_name parameter will receive the list of :class:RowChange events. An optional parameter named context will receive the :class:PollContext.

Must be used together with an actual Azure Functions trigger (e.g. @app.schedule(...)).

Parameters

arg_name: Name of the handler parameter that receives the events list. source: Database source adapter (e.g. SqlAlchemySource). checkpoint_store: State store for checkpointing (e.g. BlobCheckpointStore). name: Trigger name for logging/metrics. Defaults to the function name.

.. note:: Only synchronous handlers are supported. Async handlers will raise ConfigurationError at decoration time. This is because PollTrigger.run is synchronous.

Source code in src/azure_functions_db/decorator.py
def trigger(
    self,
    *,
    arg_name: str,
    source: SourceAdapter,
    checkpoint_store: StateStore,
    name: str | None = None,
    normalizer: EventNormalizer | None = None,
    batch_size: int = 100,
    max_batches_per_tick: int = 1,
    lease_ttl_seconds: int = 120,
    retry_policy: RetryPolicy | None = None,
    metrics: MetricsCollector | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator for database change detection (pseudo-trigger).

    Wraps a handler function so that on each invocation it polls the
    database source for new/changed rows and passes them to the handler.

    The decorated function's ``arg_name`` parameter will receive the
    list of :class:`RowChange` events.  An optional parameter named
    ``context`` will receive the :class:`PollContext`.

    Must be used together with an actual Azure Functions trigger
    (e.g. ``@app.schedule(...)``).

    Parameters
    ----------
    arg_name:
        Name of the handler parameter that receives the events list.
    source:
        Database source adapter (e.g. ``SqlAlchemySource``).
    checkpoint_store:
        State store for checkpointing (e.g. ``BlobCheckpointStore``).
    name:
        Trigger name for logging/metrics.  Defaults to the function name.

    .. note::
        Only synchronous handlers are supported. Async handlers will raise
        ``ConfigurationError`` at decoration time. This is because
        ``PollTrigger.run`` is synchronous.
    """

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        _check_composition(fn, "trigger")
        _validate_arg_name(arg_name, fn, "trigger")

        # Reject async handlers: PollTrigger.run is synchronous and
        # calling an async function without await would silently return
        # an unawaited coroutine.
        if inspect.iscoroutinefunction(fn):
            msg = (
                "trigger does not support async handlers because PollTrigger.run() "
                "is synchronous. Use a sync handler instead."
            )
            raise ConfigurationError(msg)

        trigger_name = name or fn.__name__
        trigger = PollTrigger(
            name=trigger_name,
            source=source,
            checkpoint_store=checkpoint_store,
            normalizer=normalizer,
            batch_size=batch_size,
            max_batches_per_tick=max_batches_per_tick,
            lease_ttl_seconds=lease_ttl_seconds,
            retry_policy=retry_policy,
            metrics=metrics,
        )

        fn_sig = inspect.signature(fn, follow_wrapped=False)
        has_context = "context" in fn_sig.parameters

        db_injected = {arg_name}
        if has_context:
            db_injected.add("context")
        host_params = [p_name for p_name in fn_sig.parameters if p_name not in db_injected]

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> int:
            timer: Any = None
            if args:
                timer = args[0]
            elif host_params:
                timer = kwargs.get(host_params[0])

            bound_args = dict(kwargs)
            if args and host_params:
                for i, val in enumerate(args):
                    if i < len(host_params):
                        bound_args[host_params[i]] = val

            def invoke_handler(events: Any, context: Any | None = None) -> Any:
                call_kwargs: dict[str, Any] = dict(bound_args)
                call_kwargs[arg_name] = events
                if has_context and context is not None:
                    call_kwargs["context"] = context
                return fn(**call_kwargs)

            return trigger.run(timer=timer, handler=invoke_handler)

        # Keep host trigger params visible in __signature__ so Azure
        # worker binding validation can find them.  Only hide the
        # db-injected params (events, context).
        setattr(wrapper, "__signature__", _build_host_signature(fn, db_injected))
        _mark_decorator(wrapper, "trigger")
        _merge_toolkit_metadata(
            wrapper,
            "db",
            {
                "version": 1,
                "bindings": [
                    {
                        "kind": "trigger",
                        "parameter": arg_name,
                    }
                ],
                "injections": [],
            },
        )

        return wrapper

    return decorator

input(arg_name, *, url, table=None, schema=None, pk=None, query=None, params=None, model=None, on_not_found='none', engine_provider=None)

Decorator that injects query results into the handler.

The handler parameter named arg_name will receive the actual data from the database, not a DbReader instance. Exactly one of pk or query must be provided.

PK mode (single row): The parameter receives dict[str, object] | None. Use a static dict for fixed lookups or a callable for dynamic resolution from other handler parameters::

    @db.input("user", url=..., table="users",
                 pk=lambda req: {"id": req.params["id"]})
    def handler(req, user): ...

Query mode (multiple rows): The parameter receives list[dict[str, object]]::

    @db.input("users", url=...,
                 query="SELECT * FROM users WHERE active = :active",
                 params={"active": True})
    def handler(users): ...
Parameters

arg_name: Name of the handler parameter that receives the data. url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Table name. Required when using pk. schema: Optional schema qualifier. pk: Primary key for single-row lookup. Either a static dict or a callable whose parameter names match other handler parameters. query: SQL query string for multi-row results. Use :name placeholders for parameters. params: Parameters for query. Either a static dict or a callable. on_not_found: Behavior when pk lookup returns no row. "none" (default) injects None; "raise" raises NotFoundError. engine_provider: Optional shared EngineProvider for connection pooling.

Supports both sync and async handlers. For async handlers, blocking database I/O is automatically offloaded via asyncio.to_thread().

Source code in src/azure_functions_db/decorator.py
def input(
    self,
    arg_name: str,
    *,
    url: str,
    table: str | None = None,
    schema: str | None = None,
    pk: dict[str, object] | Callable[..., dict[str, object]] | None = None,
    query: str | None = None,
    params: dict[str, object] | Callable[..., dict[str, object]] | None = None,
    model: type[BaseModel] | None = None,
    on_not_found: Literal["none", "raise"] = "none",
    engine_provider: EngineProvider | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator that injects query results into the handler.

    The handler parameter named ``arg_name`` will receive the actual
    data from the database, not a ``DbReader`` instance.  Exactly one
    of ``pk`` or ``query`` must be provided.

    **PK mode** (single row):
        The parameter receives ``dict[str, object] | None``.  Use a
        static dict for fixed lookups or a callable for dynamic
        resolution from other handler parameters::

            @db.input("user", url=..., table="users",
                         pk=lambda req: {"id": req.params["id"]})
            def handler(req, user): ...

    **Query mode** (multiple rows):
        The parameter receives ``list[dict[str, object]]``::

            @db.input("users", url=...,
                         query="SELECT * FROM users WHERE active = :active",
                         params={"active": True})
            def handler(users): ...

    Parameters
    ----------
    arg_name:
        Name of the handler parameter that receives the data.
    url:
        SQLAlchemy connection URL.  Supports ``%VAR%`` env-var substitution.
    table:
        Table name.  Required when using ``pk``.
    schema:
        Optional schema qualifier.
    pk:
        Primary key for single-row lookup.  Either a static dict or a
        callable whose parameter names match other handler parameters.
    query:
        SQL query string for multi-row results.  Use ``:name``
        placeholders for parameters.
    params:
        Parameters for ``query``.  Either a static dict or a callable.
    on_not_found:
        Behavior when ``pk`` lookup returns no row.  ``"none"`` (default)
        injects ``None``; ``"raise"`` raises ``NotFoundError``.
    engine_provider:
        Optional shared ``EngineProvider`` for connection pooling.

    Supports both sync and async handlers. For async handlers, blocking
    database I/O is automatically offloaded via ``asyncio.to_thread()``.
    """
    if on_not_found not in ("none", "raise"):
        msg = f"input on_not_found must be 'none' or 'raise', got '{on_not_found}'"
        raise ConfigurationError(msg)
    if pk is not None and query is not None:
        msg = "input requires exactly one of 'pk' or 'query', not both"
        raise ConfigurationError(msg)
    if pk is None and query is None:
        msg = "input requires exactly one of 'pk' or 'query'"
        raise ConfigurationError(msg)
    if pk is not None and table is None:
        msg = "input with 'pk' requires 'table' to be set"
        raise ConfigurationError(msg)
    if params is not None and query is None:
        msg = "input 'params' is only valid with 'query'"
        raise ConfigurationError(msg)
    _validate_model_type(model)

    use_pk = pk is not None
    pk_callable: Callable[..., dict[str, object]] | None = pk if callable(pk) else None
    pk_static: dict[str, object] | None = None if callable(pk) else pk
    params_callable: Callable[..., dict[str, object]] | None = (
        params if callable(params) else None
    )
    params_static: dict[str, object] | None = None if callable(params) else params

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        _check_composition(fn, "input")
        _validate_arg_name(arg_name, fn, "input")

        pk_resolver_params: list[str] = []
        params_resolver_params: list[str] = []
        if pk_callable is not None:
            pk_resolver_params = _validate_resolver(pk_callable, fn, {arg_name}, "pk", "input")
        if params_callable is not None:
            params_resolver_params = _validate_resolver(
                params_callable, fn, {arg_name}, "params", "input"
            )

        is_async = inspect.iscoroutinefunction(fn)

        binding_info: dict[str, Any] = {
            "kind": "input",
            "parameter": arg_name,
            "connection_setting": url,
            "resource": {"table": table} if table else {},
            "query_kind": "pk" if use_pk else "text",
        }
        if model is not None:
            binding_info["model_ref"] = f"{model.__module__}:{model.__qualname__}"

        def _resolve_read_args(
            all_kwargs: dict[str, Any],
        ) -> tuple[dict[str, object] | None, dict[str, object] | None]:
            """Resolve pk/params from handler kwargs (safe to call on any thread)."""
            resolved_pk: dict[str, object] | None = None
            resolved_params: dict[str, object] | None = None
            if use_pk:
                if pk_callable is not None:
                    resolved_pk = _resolve_callable(pk_callable, pk_resolver_params, all_kwargs)
                elif pk_static is not None:
                    resolved_pk = pk_static
                else:
                    msg = "input: unreachable – neither pk callable nor pk static"
                    raise ConfigurationError(msg)
            else:
                if params_callable is not None:
                    resolved_params = _resolve_callable(
                        params_callable, params_resolver_params, all_kwargs
                    )
                elif params_static is not None:
                    resolved_params = params_static
            return resolved_pk, resolved_params

        def _execute_read(
            resolved_pk: dict[str, object] | None,
            resolved_params: dict[str, object] | None,
        ) -> Any:
            """Execute DB I/O (blocking — run in worker thread for async)."""
            reader = DbReader(
                url=url,
                table=table,
                schema=schema,
                engine_provider=engine_provider,
            )
            try:
                if use_pk:
                    if resolved_pk is None:
                        msg = "input: unreachable – pk mode but resolved_pk is None"
                        raise ConfigurationError(msg)
                    result = reader.get(pk=resolved_pk)
                    if result is None and on_not_found == "raise":
                        from .core.errors import NotFoundError

                        msg = f"input: no row found for pk={resolved_pk} in table '{table}'"
                        raise NotFoundError(msg)
                    return result
                else:
                    if query is None:
                        msg = "input: unreachable – query mode but query is None"
                        raise ConfigurationError(msg)
                    return reader.query(query, params=resolved_params)
            finally:
                reader.close()

        if is_async:

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                r_pk, r_params = _resolve_read_args(kwargs)
                data = await asyncio.to_thread(_execute_read, r_pk, r_params)
                kwargs[arg_name] = _apply_input_model(data, model)
                return await fn(*args, **kwargs)

            setattr(async_wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
            _mark_decorator(async_wrapper, "input")
            _merge_toolkit_metadata(
                async_wrapper,
                "db",
                {
                    "version": 1,
                    "bindings": [binding_info],
                    "injections": [],
                },
            )
            return async_wrapper

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            r_pk, r_params = _resolve_read_args(kwargs)
            data = _execute_read(r_pk, r_params)
            kwargs[arg_name] = _apply_input_model(data, model)
            return fn(*args, **kwargs)

        setattr(wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
        _mark_decorator(wrapper, "input")
        _merge_toolkit_metadata(
            wrapper,
            "db",
            {
                "version": 1,
                "bindings": [binding_info],
                "injections": [],
            },
        )
        return wrapper

    return decorator

output(arg_name, *, url, table, schema=None, action='insert', conflict_columns=None, engine_provider=None)

Decorator that injects a :class:DbOut instance into the handler.

Follows the native Azure Functions output binding pattern (func.Out[T] with .set()). The handler parameter named arg_name will receive a DbOut instance for sync handlers or an _AsyncDbOutProxy for async handlers.

The handler's return value is not intercepted — use out.set(data) to write explicitly.

Parameters

arg_name: Name of the handler parameter that receives the DbOut. url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Table name for write operations. schema: Optional schema qualifier. action: Write action: "insert" (default) or "upsert". conflict_columns: Columns for upsert conflict resolution. Required when action="upsert". engine_provider: Optional shared EngineProvider for connection pooling.

Supports both sync and async handlers. For async handlers, blocking database I/O is automatically offloaded via asyncio.to_thread().

Source code in src/azure_functions_db/decorator.py
def output(
    self,
    arg_name: str,
    *,
    url: str,
    table: str,
    schema: str | None = None,
    action: Literal["insert", "upsert"] = "insert",
    conflict_columns: list[str] | None = None,
    engine_provider: EngineProvider | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator that injects a :class:`DbOut` instance into the handler.

    Follows the native Azure Functions output binding pattern
    (``func.Out[T]`` with ``.set()``).  The handler parameter named
    ``arg_name`` will receive a ``DbOut`` instance for sync handlers
    or an ``_AsyncDbOutProxy`` for async handlers.

    The handler's return value is **not** intercepted — use
    ``out.set(data)`` to write explicitly.

    Parameters
    ----------
    arg_name:
        Name of the handler parameter that receives the ``DbOut``.
    url:
        SQLAlchemy connection URL.  Supports ``%VAR%`` env-var substitution.
    table:
        Table name for write operations.
    schema:
        Optional schema qualifier.
    action:
        Write action: ``"insert"`` (default) or ``"upsert"``.
    conflict_columns:
        Columns for upsert conflict resolution.  Required when
        ``action="upsert"``.
    engine_provider:
        Optional shared ``EngineProvider`` for connection pooling.

    Supports both sync and async handlers. For async handlers, blocking
    database I/O is automatically offloaded via ``asyncio.to_thread()``.
    """
    if action not in ("insert", "upsert"):
        msg = f"output action must be 'insert' or 'upsert', got '{action}'"
        raise ConfigurationError(msg)
    if action == "upsert" and not conflict_columns:
        msg = "output with action='upsert' requires 'conflict_columns'"
        raise ConfigurationError(msg)

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        _check_composition(fn, "output")
        _validate_arg_name(arg_name, fn, "output")
        is_async = inspect.iscoroutinefunction(fn)

        out = DbOut(
            url=url,
            table=table,
            schema=schema,
            action=action,
            conflict_columns=conflict_columns,
            engine_provider=engine_provider,
        )

        if is_async:
            proxy = _AsyncDbOutProxy(out)

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                kwargs[arg_name] = proxy
                return await fn(*args, **kwargs)

            setattr(async_wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
            _mark_decorator(async_wrapper, "output")
            _merge_toolkit_metadata(
                async_wrapper,
                "db",
                {
                    "version": 1,
                    "bindings": [
                        {
                            "kind": "output",
                            "parameter": arg_name,
                            "connection_setting": url,
                            "resource": {"table": table},
                        }
                    ],
                    "injections": [],
                },
            )
            return async_wrapper

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            kwargs[arg_name] = out
            return fn(*args, **kwargs)

        setattr(wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
        _mark_decorator(wrapper, "output")
        _merge_toolkit_metadata(
            wrapper,
            "db",
            {
                "version": 1,
                "bindings": [
                    {
                        "kind": "output",
                        "parameter": arg_name,
                        "connection_setting": url,
                        "resource": {"table": table},
                    }
                ],
                "injections": [],
            },
        )
        return wrapper

    return decorator

inject_reader(arg_name, *, url, table=None, schema=None, engine_provider=None)

Decorator that injects a :class:DbReader instance into the handler.

Use this when you need imperative control over reads (multiple queries, dynamic SQL, etc.). For simple data injection, prefer :meth:input.

The handler parameter named arg_name will receive a pre-configured DbReader instance. The reader is created fresh per invocation and closed automatically after the handler returns.

Parameters

arg_name: Name of the handler parameter that receives the DbReader. url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Optional table name for get() operations. schema: Optional schema qualifier. engine_provider: Optional shared EngineProvider for connection pooling.

Supports both sync and async handlers. For async handlers, blocking database I/O is automatically offloaded via asyncio.to_thread().

Source code in src/azure_functions_db/decorator.py
def inject_reader(
    self,
    arg_name: str,
    *,
    url: str,
    table: str | None = None,
    schema: str | None = None,
    engine_provider: EngineProvider | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator that injects a :class:`DbReader` instance into the handler.

    Use this when you need imperative control over reads (multiple
    queries, dynamic SQL, etc.).  For simple data injection, prefer
    :meth:`input`.

    The handler parameter named ``arg_name`` will receive a pre-configured
    ``DbReader`` instance.  The reader is created fresh per invocation and
    closed automatically after the handler returns.

    Parameters
    ----------
    arg_name:
        Name of the handler parameter that receives the ``DbReader``.
    url:
        SQLAlchemy connection URL.  Supports ``%VAR%`` env-var substitution.
    table:
        Optional table name for ``get()`` operations.
    schema:
        Optional schema qualifier.
    engine_provider:
        Optional shared ``EngineProvider`` for connection pooling.

    Supports both sync and async handlers. For async handlers, blocking
    database I/O is automatically offloaded via ``asyncio.to_thread()``.
    """

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        _check_composition(fn, "inject_reader")
        _validate_arg_name(arg_name, fn, "inject_reader")
        is_async = inspect.iscoroutinefunction(fn)

        if is_async:

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                reader = DbReader(
                    url=url,
                    table=table,
                    schema=schema,
                    engine_provider=engine_provider,
                )
                proxy = _AsyncDbReaderProxy(reader)
                try:
                    kwargs[arg_name] = proxy
                    return await fn(*args, **kwargs)
                finally:
                    reader.close()

            setattr(async_wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
            _mark_decorator(async_wrapper, "inject_reader")
            _merge_toolkit_metadata(
                async_wrapper,
                "db",
                {
                    "version": 1,
                    "bindings": [],
                    "injections": [{"kind": "reader", "parameter": arg_name}],
                },
            )
            return async_wrapper

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            reader = DbReader(
                url=url,
                table=table,
                schema=schema,
                engine_provider=engine_provider,
            )
            try:
                kwargs[arg_name] = reader
                return fn(*args, **kwargs)
            finally:
                reader.close()

        setattr(wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
        _mark_decorator(wrapper, "inject_reader")
        _merge_toolkit_metadata(
            wrapper,
            "db",
            {
                "version": 1,
                "bindings": [],
                "injections": [{"kind": "reader", "parameter": arg_name}],
            },
        )
        return wrapper

    return decorator

inject_writer(arg_name, *, url, table, schema=None, engine_provider=None)

Decorator that injects a :class:DbWriter instance into the handler.

Use this when you need imperative control over writes (multiple operations, transactions, update/delete, etc.). For simple auto-write, prefer :meth:output.

The handler parameter named arg_name will receive a pre-configured DbWriter instance. The writer is created fresh per invocation and closed automatically after the handler returns.

Parameters

arg_name: Name of the handler parameter that receives the DbWriter. url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Table name for write operations. schema: Optional schema qualifier. engine_provider: Optional shared EngineProvider for connection pooling.

Supports both sync and async handlers. For async handlers, blocking database I/O is automatically offloaded via asyncio.to_thread().

Source code in src/azure_functions_db/decorator.py
def inject_writer(
    self,
    arg_name: str,
    *,
    url: str,
    table: str,
    schema: str | None = None,
    engine_provider: EngineProvider | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator that injects a :class:`DbWriter` instance into the handler.

    Use this when you need imperative control over writes (multiple
    operations, transactions, update/delete, etc.).  For simple
    auto-write, prefer :meth:`output`.

    The handler parameter named ``arg_name`` will receive a pre-configured
    ``DbWriter`` instance.  The writer is created fresh per invocation and
    closed automatically after the handler returns.

    Parameters
    ----------
    arg_name:
        Name of the handler parameter that receives the ``DbWriter``.
    url:
        SQLAlchemy connection URL.  Supports ``%VAR%`` env-var substitution.
    table:
        Table name for write operations.
    schema:
        Optional schema qualifier.
    engine_provider:
        Optional shared ``EngineProvider`` for connection pooling.

    Supports both sync and async handlers. For async handlers, blocking
    database I/O is automatically offloaded via ``asyncio.to_thread()``.
    """

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        _check_composition(fn, "inject_writer")
        _validate_arg_name(arg_name, fn, "inject_writer")
        is_async = inspect.iscoroutinefunction(fn)

        if is_async:

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                writer = DbWriter(
                    url=url,
                    table=table,
                    schema=schema,
                    engine_provider=engine_provider,
                )
                proxy = _AsyncDbWriterProxy(writer)
                try:
                    kwargs[arg_name] = proxy
                    return await fn(*args, **kwargs)
                finally:
                    writer.close()

            setattr(async_wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
            _mark_decorator(async_wrapper, "inject_writer")
            _merge_toolkit_metadata(
                async_wrapper,
                "db",
                {
                    "version": 1,
                    "bindings": [],
                    "injections": [{"kind": "writer", "parameter": arg_name}],
                },
            )
            return async_wrapper

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            writer = DbWriter(
                url=url,
                table=table,
                schema=schema,
                engine_provider=engine_provider,
            )
            try:
                kwargs[arg_name] = writer
                return fn(*args, **kwargs)
            finally:
                writer.close()

        setattr(wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
        _mark_decorator(wrapper, "inject_writer")
        _merge_toolkit_metadata(
            wrapper,
            "db",
            {
                "version": 1,
                "bindings": [],
                "injections": [{"kind": "writer", "parameter": arg_name}],
            },
        )
        return wrapper

    return decorator

DbOut(*, url, table, schema, action, conflict_columns, engine_provider)

Output binding parameter injected by the output decorator.

Mirrors the native Azure Functions func.Out[T] pattern. The handler calls .set() to write data to the database explicitly, leaving the handler's return value free for other purposes (e.g. HttpResponse).

Example::

@db.output("order", url="%DB_URL%", table="orders")
def create_order(req, order: DbOut) -> func.HttpResponse:
    order.set({"id": 1, "status": "pending"})
    return func.HttpResponse("Created", status_code=201)

Accepted types for .set(): - dict — single-row write - list[dict] — batch write - BaseModel / list[BaseModel] — auto-dumped to dict - None — no-op (skip write)

Source code in src/azure_functions_db/decorator.py
def __init__(
    self,
    *,
    url: str,
    table: str,
    schema: str | None,
    action: Literal["insert", "upsert"],
    conflict_columns: list[str] | None,
    engine_provider: EngineProvider | None,
) -> None:
    self._url = url
    self._table = table
    self._schema = schema
    self._action = action
    self._conflict_columns = conflict_columns
    self._engine_provider = engine_provider

set(data)

Write data to the configured table.

Parameters

data: dict for single row, list[dict] for batch, BaseModel / list[BaseModel] for Pydantic models, or None to skip.

Source code in src/azure_functions_db/decorator.py
def set(
    self,
    data: (
        dict[str, object] | Sequence[dict[str, object]] | BaseModel | Sequence[BaseModel] | None
    ),
) -> None:
    """Write *data* to the configured table.

    Parameters
    ----------
    data:
        ``dict`` for single row, ``list[dict]`` for batch,
        ``BaseModel`` / ``list[BaseModel]`` for Pydantic models,
        or ``None`` to skip.
    """
    if data is None:
        return

    writer = DbWriter(
        url=self._url,
        table=self._table,
        schema=self._schema,
        engine_provider=self._engine_provider,
    )
    try:
        if isinstance(data, (dict, BaseModel)):
            row = _normalize_output_row(data)
            if self._action == "upsert":
                if self._conflict_columns is None:
                    msg = "output: unreachable – upsert without conflict_columns"
                    raise ConfigurationError(msg)
                writer.upsert(data=row, conflict_columns=self._conflict_columns)
            else:
                writer.insert(data=row)
        elif isinstance(data, list):
            bad = next(
                (i for i, row in enumerate(data) if not isinstance(row, (dict, BaseModel))),
                None,
            )
            if bad is not None:
                bad_type = type(data[bad]).__name__
                msg = (
                    f"output: DbOut.set() received list with non-dict element "
                    f"at index {bad} ({bad_type}); expected list[dict | BaseModel]"
                )
                raise ConfigurationError(msg)
            rows = [_normalize_output_row(row) for row in data]
            if self._action == "upsert":
                if self._conflict_columns is None:
                    msg = "output: unreachable – upsert without conflict_columns"
                    raise ConfigurationError(msg)
                writer.upsert_many(rows=rows, conflict_columns=self._conflict_columns)
            else:
                writer.insert_many(rows=rows)
        else:
            msg = (
                f"output: DbOut.set() received {type(data).__name__}, "
                f"expected dict, list[dict], BaseModel, list[BaseModel], or None"
            )
            raise ConfigurationError(msg)
    finally:
        writer.close()

BlobCheckpointStore(*, container_client, source_fingerprint)

StateStore implementation backed by Azure Blob Storage.

Uses a single JSON blob per poller with ETag-based CAS (compare-and-swap) for all state mutations. Lease ownership is verified via owner_id and monotonically-increasing fencing tokens.

Source code in src/azure_functions_db/state/blob.py
def __init__(
    self,
    *,
    container_client: ContainerClient,
    source_fingerprint: str,
) -> None:
    self._container_client = container_client
    self._source_fingerprint = source_fingerprint

acquire_lease(poller_name, ttl_seconds)

Acquire a lease on the poller's state blob.

Creates the blob if it does not exist. If the blob exists and the lease has expired (past expires_at + grace), the lease is stolen with an incremented fencing token.

Returns a lease_id string in the format {owner_id}:{fencing_token}.

Raises LeaseConflictError when a lease is already active or another instance won the CAS race.

Source code in src/azure_functions_db/state/blob.py
def acquire_lease(self, poller_name: str, ttl_seconds: int) -> str:
    """Acquire a lease on the poller's state blob.

    Creates the blob if it does not exist.  If the blob exists and the
    lease has expired (past ``expires_at + grace``), the lease is stolen
    with an incremented fencing token.

    Returns a lease_id string in the format ``{owner_id}:{fencing_token}``.

    Raises ``LeaseConflictError`` when a lease is already active or
    another instance won the CAS race.
    """
    owner_id = uuid.uuid4().hex
    grace = _effective_grace(ttl_seconds)
    result = self._read_state(poller_name)

    if result is None:
        # Blob doesn't exist — create with fencing_token=1
        state = self._make_initial_state(
            poller_name, self._source_fingerprint, owner_id, ttl_seconds
        )
        try:
            self._write_state_create(poller_name, state)
        except ResourceExistsError:
            raise LeaseConflictError(
                f"Another instance created the state blob for poller '{poller_name}' first"
            ) from None
        lease_id = f"{owner_id}:1"
        logger.info(
            "Poller '%s': lease acquired (new blob), lease_id=%s",
            poller_name,
            lease_id,
        )
        return lease_id

    state, etag = result
    lease = state.get("lease", {})

    if not self._is_lease_expired(lease, grace):
        raise LeaseConflictError(
            f"Active lease on poller '{poller_name}' held by '{lease.get('owner_id')}'"
        )

    # Lease expired — steal it with incremented fencing token
    old_token = lease.get("fencing_token", 0)
    new_token = old_token + 1
    now = _now_utc()
    state["lease"] = {
        "owner_id": owner_id,
        "fencing_token": new_token,
        "acquired_at": _iso(now),
        "heartbeat_at": _iso(now),
        "expires_at": _iso(now + timedelta(seconds=ttl_seconds)),
    }

    try:
        self._write_state_conditional(poller_name, state, etag)
    except HttpResponseError as exc:
        if exc.status_code == 412:  # noqa: PLR2004
            raise LeaseConflictError(
                f"CAS conflict acquiring lease for poller '{poller_name}'"
            ) from exc
        raise StateStoreError(f"Failed to acquire lease for poller '{poller_name}'") from exc

    lease_id = f"{owner_id}:{new_token}"
    logger.info(
        "Poller '%s': lease acquired (expired steal), lease_id=%s, fencing_token=%d",
        poller_name,
        lease_id,
        new_token,
    )
    return lease_id

renew_lease(poller_name, lease_id, ttl_seconds)

Renew an existing lease by extending its expiry.

Raises LostLeaseError if the lease is not held by this caller or the CAS write fails.

Source code in src/azure_functions_db/state/blob.py
def renew_lease(self, poller_name: str, lease_id: str, ttl_seconds: int) -> None:
    """Renew an existing lease by extending its expiry.

    Raises ``LostLeaseError`` if the lease is not held by this caller
    or the CAS write fails.
    """
    owner_id, fencing_token = _parse_lease_id(lease_id)
    result = self._read_state(poller_name)

    if result is None:
        raise LostLeaseError(f"State blob not found for poller '{poller_name}'")

    state, etag = result
    self._verify_lease(state, owner_id, fencing_token)

    now = _now_utc()
    state["lease"]["heartbeat_at"] = _iso(now)
    state["lease"]["expires_at"] = _iso(now + timedelta(seconds=ttl_seconds))

    try:
        self._write_state_conditional(poller_name, state, etag)
    except HttpResponseError as exc:
        if exc.status_code == 412:  # noqa: PLR2004
            raise LostLeaseError(
                f"CAS conflict renewing lease for poller '{poller_name}'"
            ) from exc
        raise StateStoreError(f"Failed to renew lease for poller '{poller_name}'") from exc

release_lease(poller_name, lease_id)

Release the lease by setting expires_at to now.

Preserves the fencing token so the next acquisition increments it. Only owner_id and fencing_token are verified — expiry is intentionally skipped so that a holder can still release a lease that has nominally expired but has not yet been stolen.

Raises LostLeaseError if owner/token do not match or CAS fails.

Source code in src/azure_functions_db/state/blob.py
def release_lease(self, poller_name: str, lease_id: str) -> None:
    """Release the lease by setting ``expires_at`` to now.

    Preserves the fencing token so the next acquisition increments it.
    Only owner_id and fencing_token are verified — expiry is intentionally
    skipped so that a holder can still release a lease that has nominally
    expired but has not yet been stolen.

    Raises ``LostLeaseError`` if owner/token do not match or CAS fails.
    """
    owner_id, fencing_token = _parse_lease_id(lease_id)
    result = self._read_state(poller_name)

    if result is None:
        raise LostLeaseError(f"State blob not found for poller '{poller_name}'")

    state, etag = result

    lease = state.get("lease")
    if lease is None:
        raise LostLeaseError("No lease present in state")
    if lease.get("owner_id") != owner_id:
        raise LostLeaseError(
            f"Lease owner mismatch: expected '{owner_id}', found '{lease.get('owner_id')}'"
        )
    if lease.get("fencing_token") != fencing_token:
        raise LostLeaseError(
            f"Fencing token mismatch: expected {fencing_token}, "
            f"found {lease.get('fencing_token')}"
        )

    state["lease"]["expires_at"] = _iso(_now_utc())

    try:
        self._write_state_conditional(poller_name, state, etag)
    except HttpResponseError as exc:
        if exc.status_code == 412:  # noqa: PLR2004
            raise LostLeaseError(
                f"CAS conflict releasing lease for poller '{poller_name}'"
            ) from exc
        raise StateStoreError(f"Failed to release lease for poller '{poller_name}'") from exc

load_checkpoint(poller_name)

Load the checkpoint for the given poller.

Returns an empty dict if the state blob does not exist. This method is read-only and has no side effects.

Source code in src/azure_functions_db/state/blob.py
def load_checkpoint(self, poller_name: str) -> dict[str, object]:
    """Load the checkpoint for the given poller.

    Returns an empty dict if the state blob does not exist.
    This method is read-only and has no side effects.
    """
    result = self._read_state(poller_name)
    if result is None:
        return {}
    state, _etag = result
    checkpoint: dict[str, object] = state.get("checkpoint", {})
    return checkpoint

commit_checkpoint(poller_name, checkpoint, lease_id)

Commit a new checkpoint under the protection of the current lease.

Raises LostLeaseError if the lease is not held by this caller or the CAS write fails.

Source code in src/azure_functions_db/state/blob.py
def commit_checkpoint(
    self,
    poller_name: str,
    checkpoint: dict[str, object],
    lease_id: str,
) -> None:
    """Commit a new checkpoint under the protection of the current lease.

    Raises ``LostLeaseError`` if the lease is not held by this caller
    or the CAS write fails.
    """
    owner_id, fencing_token = _parse_lease_id(lease_id)
    result = self._read_state(poller_name)

    if result is None:
        raise LostLeaseError(f"State blob not found for poller '{poller_name}'")

    state, etag = result
    self._verify_lease(state, owner_id, fencing_token)

    state["checkpoint"] = dict(checkpoint)

    try:
        self._write_state_conditional(poller_name, state, etag)
    except HttpResponseError as exc:
        if exc.status_code == 412:  # noqa: PLR2004
            raise LostLeaseError(
                f"CAS conflict committing checkpoint for poller '{poller_name}'"
            ) from exc
        raise StateStoreError(
            f"Failed to commit checkpoint for poller '{poller_name}'"
        ) from exc

get_db_metadata(func)

Return db metadata if the function was decorated with DbBindings decorators.

Returns None if the function has no db metadata attached.

Source code in src/azure_functions_db/decorator.py
def get_db_metadata(func: Any) -> dict[str, Any] | None:
    """Return db metadata if the function was decorated with DbBindings decorators.

    Returns None if the function has no db metadata attached.
    """
    toolkit_meta = getattr(func, _TOOLKIT_META_ATTR, None)
    if isinstance(toolkit_meta, dict):
        db_meta = toolkit_meta.get("db")
        if isinstance(db_meta, dict):
            return db_meta
    return None