Skip to content

API Reference

azure_functions_db

SqlAlchemySource(*, url, table=None, schema=None, query=None, cursor_column, pk_columns, where=None, parameters=None, strategy='cursor', operation_mode='upsert_only', engine_provider=None)

SQLAlchemy-based source adapter for cursor-based change polling.

Implements the SourceAdapter protocol defined in azure_functions_db.trigger.runner.

Parameters

url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Table name to poll. Mutually exclusive with query. schema: Optional schema qualifier for table. query: Raw SQL query to poll. Mutually exclusive with table. cursor_column: Column used for cursor-based ordering. pk_columns: Primary-key column(s) for tie-breaking within the same cursor value. where: Optional extra SQL WHERE clause fragment (appended with AND). parameters: Optional bind parameters for where or query. strategy: Polling strategy. Only "cursor" is supported. operation_mode: Operation mode. Only "upsert_only" is supported.

Source code in src/azure_functions_db/adapter/sqlalchemy.py
def __init__(
    self,
    *,
    url: str,
    table: str | None = None,
    schema: str | None = None,
    query: str | None = None,
    cursor_column: str,
    pk_columns: list[str],
    where: str | None = None,
    parameters: dict[str, object] | None = None,
    strategy: str = "cursor",
    operation_mode: str = "upsert_only",
    engine_provider: EngineProvider | None = None,
) -> None:
    if not url:
        msg = "url must not be empty"
        raise SourceConfigurationError(msg)
    if table and query:
        msg = "Exactly one of 'table' or 'query' must be provided, not both"
        raise SourceConfigurationError(msg)
    if not table and not query:
        msg = "Exactly one of 'table' or 'query' must be provided"
        raise SourceConfigurationError(msg)
    if not cursor_column:
        msg = "cursor_column must not be empty"
        raise SourceConfigurationError(msg)
    if not pk_columns:
        msg = "pk_columns must not be empty"
        raise SourceConfigurationError(msg)
    if strategy != "cursor":
        msg = f"Unsupported strategy: '{strategy}'. Only 'cursor' is supported."
        raise SourceConfigurationError(msg)
    if operation_mode != "upsert_only":
        msg = (
            f"Unsupported operation_mode: '{operation_mode}'. Only 'upsert_only' is supported."
        )
        raise SourceConfigurationError(msg)

    self._url = _resolve_env_vars(url)
    self._table_name = table
    self._schema = schema
    self._query = query
    self._cursor_column = cursor_column
    self._pk_columns = list(pk_columns)
    self._where = where
    self._parameters = dict(parameters) if parameters else {}
    self._strategy = strategy
    self._operation_mode = operation_mode
    self._engine_provider = engine_provider
    self._owns_engine = False
    self._db_config = DbConfig(connection_url=self._url)

    self._engine: Engine | None = None
    self._table: Table | None = None
    self._initialized = False

    self._fingerprint = self._compute_fingerprint()
    self._descriptor = SourceDescriptor(
        name=self._compute_name(),
        kind="sqlalchemy",
        fingerprint=self._fingerprint,
    )

fetch(cursor, batch_size)

Fetch a batch of records newer than cursor.

Returns an empty sequence when no new records are available.

Source code in src/azure_functions_db/adapter/sqlalchemy.py
def fetch(self, cursor: CursorValue | None, batch_size: int) -> Sequence[RawRecord]:
    """Fetch a batch of records newer than *cursor*.

    Returns an empty sequence when no new records are available.
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101

    try:
        stmt = self._build_query(cursor, batch_size)
        with self._engine.connect() as conn:
            result = conn.execute(stmt, self._parameters)
            return [dict(row._mapping) for row in result]
    except (SourceConfigurationError, FetchError):
        raise
    except Exception as exc:
        msg = "Failed to fetch records"
        raise FetchError(msg) from exc

dispose()

Dispose the underlying engine and release connection pool.

Source code in src/azure_functions_db/adapter/sqlalchemy.py
def dispose(self) -> None:
    """Dispose the underlying engine and release connection pool."""
    if self._engine is not None:
        if self._owns_engine:
            self._engine.dispose()
        self._engine = None
        self._table = None
        self._initialized = False
        self._owns_engine = False

DbReader(*, url, table=None, schema=None, engine_provider=None)

Imperative input binding for reading database rows.

Provides get() for single-row lookup by primary key and query() for arbitrary SQL queries. Uses SQLAlchemy Core under the hood and integrates with :class:EngineProvider for shared connection pooling.

Thread Safety

Instances are not safe to share across concurrent threads or async invocations. Create a separate DbReader per function invocation, or use a with block to scope the lifecycle.

Parameters

url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Table name for get() operations. Optional if only query() is used. schema: Optional schema qualifier for table. engine_provider: Optional shared :class:EngineProvider. When provided, the reader uses a pooled engine instead of creating its own.

Source code in src/azure_functions_db/binding/reader.py
def __init__(
    self,
    *,
    url: str,
    table: str | None = None,
    schema: str | None = None,
    engine_provider: EngineProvider | None = None,
) -> None:
    if not url:
        msg = "url must not be empty"
        raise ConfigurationError(msg)

    try:
        self._url = resolve_env_vars(url)
    except ConfigurationError:
        raise

    self._table_name = table
    self._schema = schema
    self._engine_provider = engine_provider
    self._db_config = DbConfig(connection_url=self._url)

    self._engine: Engine | None = None
    self._table: Table | None = None
    self._owns_engine = False
    self._initialized = False

get(*, pk)

Look up a single row by primary key.

Requires table to have been set in the constructor.

Parameters

pk: Mapping of primary-key column name to value. All keys must be actual primary key columns of the table.

Returns

dict or None The matching row as a dict, or None if no row matches.

Raises

ConfigurationError If table was not set, or pk contains unknown columns. QueryError If more than one row matches the provided key.

Source code in src/azure_functions_db/binding/reader.py
def get(self, *, pk: dict[str, object]) -> dict[str, object] | None:
    """Look up a single row by primary key.

    Requires *table* to have been set in the constructor.

    Parameters
    ----------
    pk:
        Mapping of primary-key column name to value.  All keys must be
        actual primary key columns of the table.

    Returns
    -------
    dict or None
        The matching row as a dict, or ``None`` if no row matches.

    Raises
    ------
    ConfigurationError
        If *table* was not set, or *pk* contains unknown columns.
    QueryError
        If more than one row matches the provided key.
    """
    if self._table_name is None:
        msg = "get() requires 'table' to be set in DbReader constructor"
        raise ConfigurationError(msg)

    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    self._validate_pk_columns(pk)

    try:
        conditions = [self._table.c[col] == val for col, val in pk.items()]
        stmt: Any = select(self._table).where(and_(*conditions)).limit(2)

        with self._engine.connect() as conn:
            result = conn.execute(stmt)
            rows = [dict(row._mapping) for row in result]
    except (ConfigurationError, QueryError):
        raise
    except Exception as exc:
        msg = "Failed to execute get() query"
        raise QueryError(msg) from exc

    if len(rows) == 0:
        return None

    if len(rows) > 1:
        msg = f"get() expected at most 1 row but found multiple matches for pk={pk}"
        raise QueryError(msg)

    return rows[0]

query(sql, *, params=None)

Execute a raw SQL query and return all matching rows.

Always use :name parameter placeholders and params instead of string formatting to prevent SQL injection. True read-only enforcement should be done at the database role/permission level.

Parameters

sql: SQL query string. Use :name placeholders for parameters. params: Optional mapping of parameter names to values.

Returns

list[dict] List of rows, each as a dict. Empty list if no rows match.

Raises

QueryError If the query execution fails.

Source code in src/azure_functions_db/binding/reader.py
def query(
    self,
    sql: str,
    *,
    params: dict[str, object] | None = None,
) -> list[dict[str, object]]:
    """Execute a raw SQL query and return all matching rows.

    Always use ``:name`` parameter placeholders and *params* instead of
    string formatting to prevent SQL injection.  True read-only
    enforcement should be done at the database role/permission level.

    Parameters
    ----------
    sql:
        SQL query string.  Use ``:name`` placeholders for parameters.
    params:
        Optional mapping of parameter names to values.

    Returns
    -------
    list[dict]
        List of rows, each as a dict.  Empty list if no rows match.

    Raises
    ------
    QueryError
        If the query execution fails.
    """
    if params is None:
        warnings.warn(
            "query() called without params: verify that sql does not"
            " concatenate user input. Use :name placeholders and pass"
            " params=\"{'name': value}\" to prevent SQL injection.",
            UserWarning,
            stacklevel=2,
        )
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101

    try:
        stmt = text(sql)
        with self._engine.connect() as conn:
            if params:
                result = conn.execute(stmt, params)
            else:
                result = conn.execute(stmt)
            return [dict(row._mapping) for row in result]
    except (ConfigurationError, QueryError):
        raise
    except Exception as exc:
        msg = "Failed to execute query"
        raise QueryError(msg) from exc

scalar(sql, *, params=None)

Execute a SQL query and return a single scalar value.

Convenience wrapper around :meth:query for queries such as SELECT COUNT(*) FROM ... or SELECT MAX(updated_at) FROM ....

Returns the first column of the first row, or None if no rows match. Raises :class:QueryError if the query returns more than one row.

Parameters

sql: SQL query string. Use :name placeholders for parameters. params: Optional mapping of parameter names to values.

Returns

object or None The single scalar value, or None if no rows match.

Raises

QueryError If the query execution fails or returns multiple rows.

Source code in src/azure_functions_db/binding/reader.py
def scalar(
    self,
    sql: str,
    *,
    params: dict[str, object] | None = None,
) -> object | None:
    """Execute a SQL query and return a single scalar value.

    Convenience wrapper around :meth:`query` for queries such as
    ``SELECT COUNT(*) FROM ...`` or ``SELECT MAX(updated_at) FROM ...``.

    Returns the first column of the first row, or ``None`` if no rows
    match.  Raises :class:`QueryError` if the query returns more than
    one row.

    Parameters
    ----------
    sql:
        SQL query string.  Use ``:name`` placeholders for parameters.
    params:
        Optional mapping of parameter names to values.

    Returns
    -------
    object or None
        The single scalar value, or ``None`` if no rows match.

    Raises
    ------
    QueryError
        If the query execution fails or returns multiple rows.
    """
    if params is None:
        warnings.warn(
            "scalar() called without params: verify that sql does not"
            " concatenate user input. Use :name placeholders and pass"
            " params=\"{'name': value}\" to prevent SQL injection.",
            UserWarning,
            stacklevel=2,
        )
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101

    try:
        stmt = text(sql)
        with self._engine.connect() as conn:
            if params:
                result = conn.execute(stmt, params)
            else:
                result = conn.execute(stmt)
            rows = result.fetchmany(2)
    except (ConfigurationError, QueryError):
        raise
    except Exception as exc:
        msg = "Failed to execute scalar query"
        raise QueryError(msg) from exc

    if not rows:
        return None
    if len(rows) > 1:
        msg = "scalar() expected at most 1 row but the query returned multiple rows"
        raise QueryError(msg)
    # First column of the single row.
    first_row = rows[0]
    if hasattr(first_row, "_mapping"):
        mapping: dict[str, object] = dict(first_row._mapping)
        if not mapping:
            return None
        value: object = next(iter(mapping.values()))
        return value
    # Defensive fallback: row is a plain tuple-like.
    if len(first_row):
        fallback: object = first_row[0]
        return fallback
    return None

one(sql, *, params=None)

Execute a SQL query and return exactly one row.

Convenience wrapper around :meth:query for queries that must return a single row.

Parameters

sql: SQL query string. Use :name placeholders for parameters. params: Optional mapping of parameter names to values.

Returns

dict The single matching row as a dict.

Raises

QueryError If the query execution fails, returns zero rows, or returns more than one row.

Source code in src/azure_functions_db/binding/reader.py
def one(
    self,
    sql: str,
    *,
    params: dict[str, object] | None = None,
) -> dict[str, object]:
    """Execute a SQL query and return exactly one row.

    Convenience wrapper around :meth:`query` for queries that must
    return a single row.

    Parameters
    ----------
    sql:
        SQL query string.  Use ``:name`` placeholders for parameters.
    params:
        Optional mapping of parameter names to values.

    Returns
    -------
    dict
        The single matching row as a dict.

    Raises
    ------
    QueryError
        If the query execution fails, returns zero rows, or returns
        more than one row.
    """
    rows = self._fetch_at_most_two(sql, params)
    if not rows:
        msg = "one() expected exactly 1 row but the query returned no rows"
        raise QueryError(msg)
    if len(rows) > 1:
        msg = "one() expected exactly 1 row but the query returned multiple rows"
        raise QueryError(msg)
    return rows[0]

one_or_none(sql, *, params=None)

Execute a SQL query and return one row or None.

Convenience wrapper around :meth:query for queries that must return zero or one row.

Parameters

sql: SQL query string. Use :name placeholders for parameters. params: Optional mapping of parameter names to values.

Returns

dict or None The single matching row as a dict, or None if no rows match.

Raises

QueryError If the query execution fails or returns more than one row.

Source code in src/azure_functions_db/binding/reader.py
def one_or_none(
    self,
    sql: str,
    *,
    params: dict[str, object] | None = None,
) -> dict[str, object] | None:
    """Execute a SQL query and return one row or ``None``.

    Convenience wrapper around :meth:`query` for queries that must
    return zero or one row.

    Parameters
    ----------
    sql:
        SQL query string.  Use ``:name`` placeholders for parameters.
    params:
        Optional mapping of parameter names to values.

    Returns
    -------
    dict or None
        The single matching row as a dict, or ``None`` if no rows
        match.

    Raises
    ------
    QueryError
        If the query execution fails or returns more than one row.
    """
    rows = self._fetch_at_most_two(sql, params)
    if not rows:
        return None
    if len(rows) > 1:
        msg = "one_or_none() expected at most 1 row but the query returned multiple rows"
        raise QueryError(msg)
    return rows[0]

close()

Release resources held by this reader.

If the reader owns its engine (no engine_provider was given), the engine is disposed. If using a shared engine via engine_provider, only the reader's internal state is reset.

Source code in src/azure_functions_db/binding/reader.py
def close(self) -> None:
    """Release resources held by this reader.

    If the reader owns its engine (no ``engine_provider`` was given),
    the engine is disposed.  If using a shared engine via
    ``engine_provider``, only the reader's internal state is reset.
    """
    if self._engine is not None:
        if self._owns_engine:
            self._engine.dispose()
        self._engine = None
        self._table = None
        self._initialized = False
        self._owns_engine = False

DbWriter(*, url, table, schema=None, engine_provider=None)

Imperative output binding for writing database rows.

Provides insert(), upsert(), update(), delete() for single-row operations and insert_many() / upsert_many() for batch operations. Uses SQLAlchemy Core under the hood.

Thread Safety

Instances are not safe to share across concurrent threads or async invocations. Create a separate DbWriter per function invocation, or use a with block to scope the lifecycle.

Source code in src/azure_functions_db/binding/writer.py
def __init__(
    self,
    *,
    url: str,
    table: str,
    schema: str | None = None,
    engine_provider: EngineProvider | None = None,
) -> None:
    if not url:
        msg = "url must not be empty"
        raise ConfigurationError(msg)

    if not table:
        msg = "table must not be empty"
        raise ConfigurationError(msg)

    try:
        self._url = resolve_env_vars(url)
    except ConfigurationError:
        raise

    self._table_name = table
    self._schema = schema
    self._engine_provider = engine_provider
    self._db_config = DbConfig(connection_url=self._url)

    self._engine: Engine | None = None
    self._table: Table | None = None
    self._owns_engine = False
    self._initialized = False
    self._tx_conn: Connection | None = None
    self._closed_in_active_tx = False
    self._tx: Transaction | None = None

transaction()

Group multiple write operations into a single SQL transaction.

Inside the with block, every call to insert, insert_many, upsert, upsert_many, update, and delete executes on the same connection and shares one transaction. The transaction is committed on normal exit and rolled back if any exception leaves the block.

Nested transaction() calls on the same writer are not supported and raise :class:WriteError.

Example

with writer.transaction(): ... writer.insert(data={"id": 1, "status": "pending"}) ... writer.update(data={"status": "shipped"}, pk={"id": 1})

Raises

WriteError If a transaction is already active on this writer, if the connection cannot be acquired, or if commit fails. The original exception is preserved as the cause when applicable.

Source code in src/azure_functions_db/binding/writer.py
@contextmanager
def transaction(self) -> Iterator[DbWriter]:
    """Group multiple write operations into a single SQL transaction.

    Inside the ``with`` block, every call to ``insert``, ``insert_many``,
    ``upsert``, ``upsert_many``, ``update``, and ``delete`` executes on
    the same connection and shares one transaction.  The transaction is
    committed on normal exit and rolled back if any exception leaves
    the block.

    Nested ``transaction()`` calls on the same writer are not supported
    and raise :class:`WriteError`.

    Example
    -------
    >>> with writer.transaction():
    ...     writer.insert(data={"id": 1, "status": "pending"})
    ...     writer.update(data={"status": "shipped"}, pk={"id": 1})

    Raises
    ------
    WriteError
        If a transaction is already active on this writer, if the
        connection cannot be acquired, or if commit fails.  The
        original exception is preserved as the cause when applicable.
    """
    if self._tx_conn is not None:
        msg = (
            "transaction() is already active on this writer; "
            "nested transactions are not supported"
        )
        raise WriteError(msg)

    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101

    try:
        conn = self._engine.connect()
    except Exception as exc:
        msg = "Failed to acquire connection for transaction"
        raise WriteError(msg) from exc

    tx = conn.begin()
    self._tx_conn = conn
    self._tx = tx
    self._closed_in_active_tx = False
    try:
        yield self
    except BaseException:
        if self._tx is None:
            raise
        try:
            tx.rollback()
        except Exception:
            logger.warning(
                "Failed to roll back transaction on exception;"
                " original exception will still be raised",
                exc_info=True,
            )
        finally:
            self._tx = None
            self._tx_conn = None
            conn.close()
        raise
    else:
        if self._tx is None:
            return
        try:
            tx.commit()
        except Exception as exc:
            self._tx = None
            self._tx_conn = None
            conn.close()
            msg = "Failed to commit transaction"
            raise WriteError(msg) from exc
        self._tx = None
        self._tx_conn = None
        conn.close()

insert(*, data)

Insert a single row.

Raises :class:WriteError on constraint violations or other database errors.

Source code in src/azure_functions_db/binding/writer.py
def insert(self, *, data: dict[str, object]) -> None:
    """Insert a single row.

    Raises :class:`WriteError` on constraint violations or other
    database errors.
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    self._validate_data_columns(data)

    try:
        with self._execution_scope() as conn:
            conn.execute(self._table.insert().values(**data))
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to insert row"
        raise WriteError(msg) from exc

insert_many(*, rows)

Insert multiple rows in a single transaction (all-or-nothing).

Source code in src/azure_functions_db/binding/writer.py
def insert_many(self, *, rows: list[dict[str, object]]) -> None:
    """Insert multiple rows in a single transaction (all-or-nothing)."""
    if not rows:
        return

    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    for row in rows:
        self._validate_data_columns(row)

    try:
        with self._execution_scope() as conn:
            conn.execute(self._table.insert(), rows)
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to insert rows"
        raise WriteError(msg) from exc

upsert(*, data, conflict_columns)

Insert or update a single row using dialect-specific upsert.

Supported dialects: PostgreSQL, SQLite, MySQL. Other dialects raise :class:ConfigurationError.

Source code in src/azure_functions_db/binding/writer.py
def upsert(
    self,
    *,
    data: dict[str, object],
    conflict_columns: list[str],
) -> None:
    """Insert or update a single row using dialect-specific upsert.

    Supported dialects: PostgreSQL, SQLite, MySQL.  Other dialects
    raise :class:`ConfigurationError`.
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    self._validate_data_columns(data)
    self._validate_conflict_columns(conflict_columns)

    try:
        stmt = self._build_upsert_stmt(data, conflict_columns)
        with self._execution_scope() as conn:
            conn.execute(stmt)
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to upsert row"
        raise WriteError(msg) from exc

upsert_many(*, rows, conflict_columns)

Upsert multiple rows in a single transaction (all-or-nothing).

Source code in src/azure_functions_db/binding/writer.py
def upsert_many(
    self,
    *,
    rows: list[dict[str, object]],
    conflict_columns: list[str],
) -> None:
    """Upsert multiple rows in a single transaction (all-or-nothing)."""
    if not rows:
        return

    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    for row in rows:
        self._validate_data_columns(row)
    self._validate_conflict_columns(conflict_columns)

    try:
        with self._execution_scope() as conn:
            for row in rows:
                stmt = self._build_upsert_stmt(row, conflict_columns)
                conn.execute(stmt)
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to upsert rows"
        raise WriteError(msg) from exc

update(*, data, pk)

Update a single row identified by primary key.

This is a no-op if no row matches the given pk (idempotent).

Source code in src/azure_functions_db/binding/writer.py
def update(self, *, data: dict[str, object], pk: dict[str, object]) -> None:
    """Update a single row identified by primary key.

    This is a no-op if no row matches the given *pk* (idempotent).
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    self._validate_data_columns(data)
    self._validate_pk_columns(pk)

    try:
        conditions = [self._table.c[col] == val for col, val in pk.items()]
        stmt: Any = update(self._table).where(and_(*conditions)).values(**data)
        with self._execution_scope() as conn:
            conn.execute(stmt)
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to update row"
        raise WriteError(msg) from exc

delete(*, pk)

Delete a single row identified by primary key.

This is a no-op if no row matches the given pk (idempotent).

Source code in src/azure_functions_db/binding/writer.py
def delete(self, *, pk: dict[str, object]) -> None:
    """Delete a single row identified by primary key.

    This is a no-op if no row matches the given *pk* (idempotent).
    """
    self._ensure_initialized()
    assert self._engine is not None  # noqa: S101  # nosec B101
    assert self._table is not None  # noqa: S101  # nosec B101

    self._validate_pk_columns(pk)

    try:
        conditions = [self._table.c[col] == val for col, val in pk.items()]
        stmt: Any = delete(self._table).where(and_(*conditions))
        with self._execution_scope() as conn:
            conn.execute(stmt)
    except (ConfigurationError, WriteError):
        raise
    except Exception as exc:
        msg = "Failed to delete row"
        raise WriteError(msg) from exc

close()

Tear down resources held by this writer.

If called while a transaction() with block is still active, the transaction is rolled back, the connection is released, and a sentinel is set so that any subsequent write call inside the same with block raises :class:WriteError. The surrounding context manager's __exit__ observes the teardown and skips its own commit/rollback. Rollback failures are logged at WARNING and do not prevent connection close or engine disposal.

.. warning:: Do not continue using the same writer inside that transaction() block after calling close(). The writer has been torn down and any further insert / upsert / update / delete call will raise :class:WriteError. If you need to keep writing, exit the with block first and start a new transaction() on a fresh writer instance.

Source code in src/azure_functions_db/binding/writer.py
def close(self) -> None:
    """Tear down resources held by this writer.

    If called while a ``transaction()`` ``with`` block is still active,
    the transaction is rolled back, the connection is released, and a
    sentinel is set so that any subsequent write call inside the same
    ``with`` block raises :class:`WriteError`. The surrounding context
    manager's ``__exit__`` observes the teardown and skips its own
    commit/rollback. Rollback failures are logged at WARNING and do
    not prevent connection close or engine disposal.

    .. warning::
        Do not continue using the same writer inside that
        ``transaction()`` block after calling ``close()``. The writer
        has been torn down and any further ``insert`` / ``upsert`` /
        ``update`` / ``delete`` call will raise :class:`WriteError`.
        If you need to keep writing, exit the ``with`` block first
        and start a new ``transaction()`` on a fresh writer instance.
    """
    had_active_tx = self._tx is not None
    if self._tx is not None:
        tx = self._tx
        self._tx = None
        try:
            tx.rollback()
        except Exception:
            logger.warning(
                "Failed to roll back active transaction on writer.close()",
                exc_info=True,
            )
    if self._tx_conn is not None:
        tx_conn = self._tx_conn
        self._tx_conn = None
        try:
            tx_conn.close()
        except Exception:
            logger.warning(
                "Failed to close active transaction connection on writer.close()",
                exc_info=True,
            )
    if self._engine is not None:
        if self._owns_engine:
            self._engine.dispose()
        self._engine = None
        self._table = None
        self._initialized = False
        self._owns_engine = False
    if had_active_tx:
        self._closed_in_active_tx = True

DbBindings

Azure Functions-style decorator API for database integration.

Provides trigger, input, output, inject_reader, and inject_writer decorator methods that wrap the imperative API (PollTrigger, DbReader, DbWriter) in an Azure Functions-native decorator experience.

Data injection (input / output): input injects query results into handler parameters. output injects a :class:DbOut instance; call .set() to write data explicitly.

Client injection (inject_reader / inject_writer): Handlers receive DbReader / DbWriter instances for imperative control.

Decorator order contract

Decorator composition rules: - Azure decorators outermost, db decorators closest to the function - trigger + output can be combined (process events and write results) - trigger + inject_writer can be combined (imperative write in trigger handler) - input + output can be combined (read data, write results) - input and inject_reader are mutually exclusive - output and inject_writer are mutually exclusive - No decorator can be applied twice to the same handler

Valid combinations::

@app.schedule(...)
@db.trigger(...)        # Azure trigger outermost
@db.output("out", ...)  # db output innermost
def handler(events, out: DbOut) -> None:
    out.set([...])

@db.input("user", ...)
@db.output("out", ...)
def handler(user, out: DbOut) -> dict:
    out.set({...})
    return user

Note: This is a pseudo-trigger implementation. trigger requires an actual Azure Functions trigger (e.g. @app.schedule) to fire. It does not register a native Azure Functions binding.

trigger(*, arg_name, source, checkpoint_store, name=None, normalizer=None, batch_size=100, max_batches_per_tick=1, lease_ttl_seconds=120, retry_policy=None, metrics=None)

Decorator for database change detection (pseudo-trigger).

Wraps a handler function so that on each invocation it polls the database source for new/changed rows and passes them to the handler.

The decorated function's arg_name parameter will receive the list of :class:RowChange events. An optional parameter named context will receive the :class:PollContext.

Must be used together with an actual Azure Functions trigger (e.g. @app.schedule(...)).

Parameters

arg_name: Name of the handler parameter that receives the events list. source: Database source adapter (e.g. SqlAlchemySource). checkpoint_store: State store for checkpointing (e.g. BlobCheckpointStore). name: Trigger name for logging/metrics. Defaults to the function name.

.. note:: Only synchronous handlers are supported. Async handlers will raise ConfigurationError at decoration time. This is because PollTrigger.run is synchronous.

Source code in src/azure_functions_db/decorator.py
def trigger(
    self,
    *,
    arg_name: str,
    source: SourceAdapter,
    checkpoint_store: StateStore,
    name: str | None = None,
    normalizer: EventNormalizer | None = None,
    batch_size: int = 100,
    max_batches_per_tick: int = 1,
    lease_ttl_seconds: int = 120,
    retry_policy: RetryPolicy | None = None,
    metrics: MetricsCollector | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator for database change detection (pseudo-trigger).

    Wraps a handler function so that on each invocation it polls the
    database source for new/changed rows and passes them to the handler.

    The decorated function's ``arg_name`` parameter will receive the
    list of :class:`RowChange` events.  An optional parameter named
    ``context`` will receive the :class:`PollContext`.

    Must be used together with an actual Azure Functions trigger
    (e.g. ``@app.schedule(...)``).

    Parameters
    ----------
    arg_name:
        Name of the handler parameter that receives the events list.
    source:
        Database source adapter (e.g. ``SqlAlchemySource``).
    checkpoint_store:
        State store for checkpointing (e.g. ``BlobCheckpointStore``).
    name:
        Trigger name for logging/metrics.  Defaults to the function name.

    .. note::
        Only synchronous handlers are supported. Async handlers will raise
        ``ConfigurationError`` at decoration time. This is because
        ``PollTrigger.run`` is synchronous.
    """

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        _check_composition(fn, "trigger")
        _validate_arg_name(arg_name, fn, "trigger")

        # Reject async handlers: PollTrigger.run is synchronous and
        # calling an async function without await would silently return
        # an unawaited coroutine.
        if inspect.iscoroutinefunction(fn):
            msg = (
                "trigger does not support async handlers because PollTrigger.run() "
                "is synchronous. Use a sync handler instead."
            )
            raise ConfigurationError(msg)

        trigger_name = name or fn.__name__
        trigger = PollTrigger(
            name=trigger_name,
            source=source,
            checkpoint_store=checkpoint_store,
            normalizer=normalizer,
            batch_size=batch_size,
            max_batches_per_tick=max_batches_per_tick,
            lease_ttl_seconds=lease_ttl_seconds,
            retry_policy=retry_policy,
            metrics=metrics,
        )

        fn_sig = inspect.signature(fn, follow_wrapped=False)
        has_context = "context" in fn_sig.parameters

        db_injected = {arg_name}
        if has_context:
            db_injected.add("context")
        host_params = [p_name for p_name in fn_sig.parameters if p_name not in db_injected]

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> int:
            timer: Any = None
            if args:
                timer = args[0]
            elif host_params:
                timer = kwargs.get(host_params[0])

            bound_args = dict(kwargs)
            if args and host_params:
                for i, val in enumerate(args):
                    if i < len(host_params):
                        bound_args[host_params[i]] = val

            def invoke_handler(events: Any, context: Any | None = None) -> Any:
                call_kwargs: dict[str, Any] = dict(bound_args)
                call_kwargs[arg_name] = events
                if has_context and context is not None:
                    call_kwargs["context"] = context
                return fn(**call_kwargs)

            return trigger.run(timer=timer, handler=invoke_handler)

        # Keep host trigger params visible in __signature__ so Azure
        # worker binding validation can find them.  Only hide the
        # db-injected params (events, context).
        setattr(wrapper, "__signature__", _build_host_signature(fn, db_injected))
        _mark_decorator(wrapper, "trigger")
        _merge_toolkit_metadata(
            wrapper,
            "db",
            {
                "version": 1,
                "bindings": [
                    {
                        "kind": "trigger",
                        "parameter": arg_name,
                    }
                ],
                "injections": [],
            },
        )

        return wrapper

    return decorator

input(arg_name, *, url, table=None, schema=None, pk=None, query=None, params=None, model=None, on_not_found='none', engine_provider=None)

Decorator that injects query results into the handler.

The handler parameter named arg_name will receive the actual data from the database, not a DbReader instance. Exactly one of pk or query must be provided.

PK mode (single row): The parameter receives dict[str, object] | None. Use a static dict for fixed lookups or a callable for dynamic resolution from other handler parameters::

    @db.input("user", url=..., table="users",
                 pk=lambda req: {"id": req.params["id"]})
    def handler(req, user): ...

Query mode (multiple rows): The parameter receives list[dict[str, object]]::

    @db.input("users", url=...,
                 query="SELECT * FROM users WHERE active = :active",
                 params={"active": True})
    def handler(users): ...
Parameters

arg_name: Name of the handler parameter that receives the data. url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Table name. Required when using pk. schema: Optional schema qualifier. pk: Primary key for single-row lookup. Either a static dict or a callable whose parameter names match other handler parameters. query: SQL query string for multi-row results. Use :name placeholders for parameters. params: Parameters for query. Either a static dict or a callable. on_not_found: Behavior when pk lookup returns no row. "none" (default) injects None; "raise" raises NotFoundError. engine_provider: Optional shared EngineProvider for connection pooling.

Supports both sync and async handlers. For async handlers, blocking database I/O is automatically offloaded via asyncio.to_thread().

Source code in src/azure_functions_db/decorator.py
def input(
    self,
    arg_name: str,
    *,
    url: str,
    table: str | None = None,
    schema: str | None = None,
    pk: dict[str, object] | Callable[..., dict[str, object]] | None = None,
    query: str | None = None,
    params: dict[str, object] | Callable[..., dict[str, object]] | None = None,
    model: type[BaseModel] | None = None,
    on_not_found: Literal["none", "raise"] = "none",
    engine_provider: EngineProvider | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator that injects query results into the handler.

    The handler parameter named ``arg_name`` will receive the actual
    data from the database, not a ``DbReader`` instance.  Exactly one
    of ``pk`` or ``query`` must be provided.

    **PK mode** (single row):
        The parameter receives ``dict[str, object] | None``.  Use a
        static dict for fixed lookups or a callable for dynamic
        resolution from other handler parameters::

            @db.input("user", url=..., table="users",
                         pk=lambda req: {"id": req.params["id"]})
            def handler(req, user): ...

    **Query mode** (multiple rows):
        The parameter receives ``list[dict[str, object]]``::

            @db.input("users", url=...,
                         query="SELECT * FROM users WHERE active = :active",
                         params={"active": True})
            def handler(users): ...

    Parameters
    ----------
    arg_name:
        Name of the handler parameter that receives the data.
    url:
        SQLAlchemy connection URL.  Supports ``%VAR%`` env-var substitution.
    table:
        Table name.  Required when using ``pk``.
    schema:
        Optional schema qualifier.
    pk:
        Primary key for single-row lookup.  Either a static dict or a
        callable whose parameter names match other handler parameters.
    query:
        SQL query string for multi-row results.  Use ``:name``
        placeholders for parameters.
    params:
        Parameters for ``query``.  Either a static dict or a callable.
    on_not_found:
        Behavior when ``pk`` lookup returns no row.  ``"none"`` (default)
        injects ``None``; ``"raise"`` raises ``NotFoundError``.
    engine_provider:
        Optional shared ``EngineProvider`` for connection pooling.

    Supports both sync and async handlers. For async handlers, blocking
    database I/O is automatically offloaded via ``asyncio.to_thread()``.
    """
    if on_not_found not in ("none", "raise"):
        msg = f"input on_not_found must be 'none' or 'raise', got '{on_not_found}'"
        raise ConfigurationError(msg)
    if pk is not None and query is not None:
        msg = "input requires exactly one of 'pk' or 'query', not both"
        raise ConfigurationError(msg)
    if pk is None and query is None:
        msg = "input requires exactly one of 'pk' or 'query'"
        raise ConfigurationError(msg)
    if pk is not None and table is None:
        msg = "input with 'pk' requires 'table' to be set"
        raise ConfigurationError(msg)
    if params is not None and query is None:
        msg = "input 'params' is only valid with 'query'"
        raise ConfigurationError(msg)
    _validate_model_type(model)

    use_pk = pk is not None
    pk_callable: Callable[..., dict[str, object]] | None = pk if callable(pk) else None
    pk_static: dict[str, object] | None = None if callable(pk) else pk
    params_callable: Callable[..., dict[str, object]] | None = (
        params if callable(params) else None
    )
    params_static: dict[str, object] | None = None if callable(params) else params

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        _check_composition(fn, "input")
        _validate_arg_name(arg_name, fn, "input")

        pk_resolver_params: list[str] = []
        params_resolver_params: list[str] = []
        if pk_callable is not None:
            pk_resolver_params = _validate_resolver(pk_callable, fn, {arg_name}, "pk", "input")
        if params_callable is not None:
            params_resolver_params = _validate_resolver(
                params_callable, fn, {arg_name}, "params", "input"
            )

        is_async = inspect.iscoroutinefunction(fn)

        binding_info: dict[str, Any] = {
            "kind": "input",
            "parameter": arg_name,
            "connection_setting": url,
            "resource": {"table": table} if table else {},
            "query_kind": "pk" if use_pk else "text",
        }
        if model is not None:
            binding_info["model_ref"] = f"{model.__module__}:{model.__qualname__}"

        def _resolve_read_args(
            all_kwargs: dict[str, Any],
        ) -> tuple[dict[str, object] | None, dict[str, object] | None]:
            """Resolve pk/params from handler kwargs (safe to call on any thread)."""
            resolved_pk: dict[str, object] | None = None
            resolved_params: dict[str, object] | None = None
            if use_pk:
                if pk_callable is not None:
                    resolved_pk = _resolve_callable(pk_callable, pk_resolver_params, all_kwargs)
                elif pk_static is not None:
                    resolved_pk = pk_static
                else:
                    msg = "input: unreachable – neither pk callable nor pk static"
                    raise ConfigurationError(msg)
            else:
                if params_callable is not None:
                    resolved_params = _resolve_callable(
                        params_callable, params_resolver_params, all_kwargs
                    )
                elif params_static is not None:
                    resolved_params = params_static
            return resolved_pk, resolved_params

        def _execute_read(
            resolved_pk: dict[str, object] | None,
            resolved_params: dict[str, object] | None,
        ) -> Any:
            """Execute DB I/O (blocking — run in worker thread for async)."""
            reader = DbReader(
                url=url,
                table=table,
                schema=schema,
                engine_provider=engine_provider,
            )
            try:
                if use_pk:
                    if resolved_pk is None:
                        msg = "input: unreachable – pk mode but resolved_pk is None"
                        raise ConfigurationError(msg)
                    result = reader.get(pk=resolved_pk)
                    if result is None and on_not_found == "raise":
                        from .core.errors import NotFoundError

                        msg = f"input: no row found for pk={resolved_pk} in table '{table}'"
                        raise NotFoundError(msg)
                    return result
                else:
                    if query is None:
                        msg = "input: unreachable – query mode but query is None"
                        raise ConfigurationError(msg)
                    return reader.query(query, params=resolved_params)
            finally:
                reader.close()

        if is_async:

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                r_pk, r_params = _resolve_read_args(kwargs)
                data = await asyncio.to_thread(_execute_read, r_pk, r_params)
                kwargs[arg_name] = _apply_input_model(data, model)
                return await fn(*args, **kwargs)

            setattr(async_wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
            _mark_decorator(async_wrapper, "input")
            _merge_toolkit_metadata(
                async_wrapper,
                "db",
                {
                    "version": 1,
                    "bindings": [binding_info],
                    "injections": [],
                },
            )
            return async_wrapper

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            r_pk, r_params = _resolve_read_args(kwargs)
            data = _execute_read(r_pk, r_params)
            kwargs[arg_name] = _apply_input_model(data, model)
            return fn(*args, **kwargs)

        setattr(wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
        _mark_decorator(wrapper, "input")
        _merge_toolkit_metadata(
            wrapper,
            "db",
            {
                "version": 1,
                "bindings": [binding_info],
                "injections": [],
            },
        )
        return wrapper

    return decorator

output(arg_name, *, url, table, schema=None, action='insert', conflict_columns=None, engine_provider=None)

Decorator that injects a :class:DbOut instance into the handler.

Follows the native Azure Functions output binding pattern (func.Out[T] with .set()). The handler parameter named arg_name will receive a DbOut instance for sync handlers or an _AsyncDbOutProxy for async handlers.

The handler's return value is not intercepted — use out.set(data) to write explicitly.

Parameters

arg_name: Name of the handler parameter that receives the DbOut. url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Table name for write operations. schema: Optional schema qualifier. action: Write action: "insert" (default) or "upsert". conflict_columns: Columns for upsert conflict resolution. Required when action="upsert". engine_provider: Optional shared EngineProvider for connection pooling.

Supports both sync and async handlers. For async handlers, blocking database I/O is automatically offloaded via asyncio.to_thread().

Source code in src/azure_functions_db/decorator.py
def output(
    self,
    arg_name: str,
    *,
    url: str,
    table: str,
    schema: str | None = None,
    action: Literal["insert", "upsert"] = "insert",
    conflict_columns: list[str] | None = None,
    engine_provider: EngineProvider | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator that injects a :class:`DbOut` instance into the handler.

    Follows the native Azure Functions output binding pattern
    (``func.Out[T]`` with ``.set()``).  The handler parameter named
    ``arg_name`` will receive a ``DbOut`` instance for sync handlers
    or an ``_AsyncDbOutProxy`` for async handlers.

    The handler's return value is **not** intercepted — use
    ``out.set(data)`` to write explicitly.

    Parameters
    ----------
    arg_name:
        Name of the handler parameter that receives the ``DbOut``.
    url:
        SQLAlchemy connection URL.  Supports ``%VAR%`` env-var substitution.
    table:
        Table name for write operations.
    schema:
        Optional schema qualifier.
    action:
        Write action: ``"insert"`` (default) or ``"upsert"``.
    conflict_columns:
        Columns for upsert conflict resolution.  Required when
        ``action="upsert"``.
    engine_provider:
        Optional shared ``EngineProvider`` for connection pooling.

    Supports both sync and async handlers. For async handlers, blocking
    database I/O is automatically offloaded via ``asyncio.to_thread()``.
    """
    if action not in ("insert", "upsert"):
        msg = f"output action must be 'insert' or 'upsert', got '{action}'"
        raise ConfigurationError(msg)
    if action == "upsert" and not conflict_columns:
        msg = "output with action='upsert' requires 'conflict_columns'"
        raise ConfigurationError(msg)

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        _check_composition(fn, "output")
        _validate_arg_name(arg_name, fn, "output")
        is_async = inspect.iscoroutinefunction(fn)

        out = DbOut(
            url=url,
            table=table,
            schema=schema,
            action=action,
            conflict_columns=conflict_columns,
            engine_provider=engine_provider,
        )

        if is_async:
            proxy = _AsyncDbOutProxy(out)

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                kwargs[arg_name] = proxy
                return await fn(*args, **kwargs)

            setattr(async_wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
            _mark_decorator(async_wrapper, "output")
            _merge_toolkit_metadata(
                async_wrapper,
                "db",
                {
                    "version": 1,
                    "bindings": [
                        {
                            "kind": "output",
                            "parameter": arg_name,
                            "connection_setting": url,
                            "resource": {"table": table},
                        }
                    ],
                    "injections": [],
                },
            )
            return async_wrapper

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            kwargs[arg_name] = out
            return fn(*args, **kwargs)

        setattr(wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
        _mark_decorator(wrapper, "output")
        _merge_toolkit_metadata(
            wrapper,
            "db",
            {
                "version": 1,
                "bindings": [
                    {
                        "kind": "output",
                        "parameter": arg_name,
                        "connection_setting": url,
                        "resource": {"table": table},
                    }
                ],
                "injections": [],
            },
        )
        return wrapper

    return decorator

inject_reader(arg_name, *, url, table=None, schema=None, engine_provider=None)

Decorator that injects a :class:DbReader instance into the handler.

Use this when you need imperative control over reads (multiple queries, dynamic SQL, etc.). For simple data injection, prefer :meth:input.

The handler parameter named arg_name will receive a pre-configured DbReader instance. The reader is created fresh per invocation and closed automatically after the handler returns.

Parameters

arg_name: Name of the handler parameter that receives the DbReader. url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Optional table name for get() operations. schema: Optional schema qualifier. engine_provider: Optional shared EngineProvider for connection pooling.

Supports both sync and async handlers. For async handlers, blocking database I/O is automatically offloaded via asyncio.to_thread().

Source code in src/azure_functions_db/decorator.py
def inject_reader(
    self,
    arg_name: str,
    *,
    url: str,
    table: str | None = None,
    schema: str | None = None,
    engine_provider: EngineProvider | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator that injects a :class:`DbReader` instance into the handler.

    Use this when you need imperative control over reads (multiple
    queries, dynamic SQL, etc.).  For simple data injection, prefer
    :meth:`input`.

    The handler parameter named ``arg_name`` will receive a pre-configured
    ``DbReader`` instance.  The reader is created fresh per invocation and
    closed automatically after the handler returns.

    Parameters
    ----------
    arg_name:
        Name of the handler parameter that receives the ``DbReader``.
    url:
        SQLAlchemy connection URL.  Supports ``%VAR%`` env-var substitution.
    table:
        Optional table name for ``get()`` operations.
    schema:
        Optional schema qualifier.
    engine_provider:
        Optional shared ``EngineProvider`` for connection pooling.

    Supports both sync and async handlers. For async handlers, blocking
    database I/O is automatically offloaded via ``asyncio.to_thread()``.
    """

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        _check_composition(fn, "inject_reader")
        _validate_arg_name(arg_name, fn, "inject_reader")
        is_async = inspect.iscoroutinefunction(fn)

        if is_async:

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                reader = DbReader(
                    url=url,
                    table=table,
                    schema=schema,
                    engine_provider=engine_provider,
                )
                proxy = _AsyncDbReaderProxy(reader)
                try:
                    kwargs[arg_name] = proxy
                    return await fn(*args, **kwargs)
                finally:
                    reader.close()

            setattr(async_wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
            _mark_decorator(async_wrapper, "inject_reader")
            _merge_toolkit_metadata(
                async_wrapper,
                "db",
                {
                    "version": 1,
                    "bindings": [],
                    "injections": [{"kind": "reader", "parameter": arg_name}],
                },
            )
            return async_wrapper

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            reader = DbReader(
                url=url,
                table=table,
                schema=schema,
                engine_provider=engine_provider,
            )
            try:
                kwargs[arg_name] = reader
                return fn(*args, **kwargs)
            finally:
                reader.close()

        setattr(wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
        _mark_decorator(wrapper, "inject_reader")
        _merge_toolkit_metadata(
            wrapper,
            "db",
            {
                "version": 1,
                "bindings": [],
                "injections": [{"kind": "reader", "parameter": arg_name}],
            },
        )
        return wrapper

    return decorator

inject_writer(arg_name, *, url, table, schema=None, engine_provider=None)

Decorator that injects a :class:DbWriter instance into the handler.

Use this when you need imperative control over writes (multiple operations, transactions, update/delete, etc.). For simple auto-write, prefer :meth:output.

The handler parameter named arg_name will receive a pre-configured DbWriter instance. The writer is created fresh per invocation and closed automatically after the handler returns.

Parameters

arg_name: Name of the handler parameter that receives the DbWriter. url: SQLAlchemy connection URL. Supports %VAR% env-var substitution. table: Table name for write operations. schema: Optional schema qualifier. engine_provider: Optional shared EngineProvider for connection pooling.

Supports both sync and async handlers. For async handlers, blocking database I/O is automatically offloaded via asyncio.to_thread().

Source code in src/azure_functions_db/decorator.py
def inject_writer(
    self,
    arg_name: str,
    *,
    url: str,
    table: str,
    schema: str | None = None,
    engine_provider: EngineProvider | None = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Decorator that injects a :class:`DbWriter` instance into the handler.

    Use this when you need imperative control over writes (multiple
    operations, transactions, update/delete, etc.).  For simple
    auto-write, prefer :meth:`output`.

    The handler parameter named ``arg_name`` will receive a pre-configured
    ``DbWriter`` instance.  The writer is created fresh per invocation and
    closed automatically after the handler returns.

    Parameters
    ----------
    arg_name:
        Name of the handler parameter that receives the ``DbWriter``.
    url:
        SQLAlchemy connection URL.  Supports ``%VAR%`` env-var substitution.
    table:
        Table name for write operations.
    schema:
        Optional schema qualifier.
    engine_provider:
        Optional shared ``EngineProvider`` for connection pooling.

    Supports both sync and async handlers. For async handlers, blocking
    database I/O is automatically offloaded via ``asyncio.to_thread()``.
    """

    def decorator(fn: Callable[..., Any]) -> Callable[..., Any]:
        _check_composition(fn, "inject_writer")
        _validate_arg_name(arg_name, fn, "inject_writer")
        is_async = inspect.iscoroutinefunction(fn)

        if is_async:

            @functools.wraps(fn)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                writer = DbWriter(
                    url=url,
                    table=table,
                    schema=schema,
                    engine_provider=engine_provider,
                )
                proxy = _AsyncDbWriterProxy(writer)
                try:
                    kwargs[arg_name] = proxy
                    return await fn(*args, **kwargs)
                finally:
                    writer.close()

            setattr(async_wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
            _mark_decorator(async_wrapper, "inject_writer")
            _merge_toolkit_metadata(
                async_wrapper,
                "db",
                {
                    "version": 1,
                    "bindings": [],
                    "injections": [{"kind": "writer", "parameter": arg_name}],
                },
            )
            return async_wrapper

        @functools.wraps(fn)
        def wrapper(*args: Any, **kwargs: Any) -> Any:
            writer = DbWriter(
                url=url,
                table=table,
                schema=schema,
                engine_provider=engine_provider,
            )
            try:
                kwargs[arg_name] = writer
                return fn(*args, **kwargs)
            finally:
                writer.close()

        setattr(wrapper, "__signature__", _build_host_signature(fn, {arg_name}))
        _mark_decorator(wrapper, "inject_writer")
        _merge_toolkit_metadata(
            wrapper,
            "db",
            {
                "version": 1,
                "bindings": [],
                "injections": [{"kind": "writer", "parameter": arg_name}],
            },
        )
        return wrapper

    return decorator

DbOut(*, url, table, schema, action, conflict_columns, engine_provider)

Output binding parameter injected by the output decorator.

Mirrors the native Azure Functions func.Out[T] pattern. The handler calls .set() to write data to the database explicitly, leaving the handler's return value free for other purposes (e.g. HttpResponse).

Example::

@db.output("order", url="%DB_URL%", table="orders")
def create_order(req, order: DbOut) -> func.HttpResponse:
    order.set({"id": 1, "status": "pending"})
    return func.HttpResponse("Created", status_code=201)

Accepted types for .set(): - dict — single-row write - list[dict] — batch write - BaseModel / list[BaseModel] — auto-dumped to dict - None — no-op (skip write)

Source code in src/azure_functions_db/decorator.py
def __init__(
    self,
    *,
    url: str,
    table: str,
    schema: str | None,
    action: Literal["insert", "upsert"],
    conflict_columns: list[str] | None,
    engine_provider: EngineProvider | None,
) -> None:
    self._url = url
    self._table = table
    self._schema = schema
    self._action = action
    self._conflict_columns = conflict_columns
    self._engine_provider = engine_provider

set(data)

Write data to the configured table.

Parameters

data: dict for single row, list[dict] for batch, BaseModel / list[BaseModel] for Pydantic models, or None to skip. Tuples and other non-list sequences are rejected with :class:ConfigurationError.

Source code in src/azure_functions_db/decorator.py
def set(
    self,
    data: (
        dict[str, object]
        | Sequence[dict[str, object]]
        | BaseModel
        | Sequence[BaseModel]
        | None
    ),
) -> None:
    """Write *data* to the configured table.

    Parameters
    ----------
    data:
        ``dict`` for single row, ``list[dict]`` for batch,
        ``BaseModel`` / ``list[BaseModel]`` for Pydantic models,
        or ``None`` to skip. Tuples and other non-``list`` sequences
        are rejected with :class:`ConfigurationError`.
    """
    if data is None:
        return

    writer = DbWriter(
        url=self._url,
        table=self._table,
        schema=self._schema,
        engine_provider=self._engine_provider,
    )
    try:
        if isinstance(data, (dict, BaseModel)):
            row = _normalize_output_row(data)
            if self._action == "upsert":
                if self._conflict_columns is None:
                    msg = "output: unreachable – upsert without conflict_columns"
                    raise ConfigurationError(msg)
                writer.upsert(data=row, conflict_columns=self._conflict_columns)
            else:
                writer.insert(data=row)
        elif isinstance(data, list):
            bad = next(
                (i for i, row in enumerate(data) if not isinstance(row, (dict, BaseModel))),
                None,
            )
            if bad is not None:
                bad_type = type(data[bad]).__name__
                msg = (
                    f"output: DbOut.set() received list with non-dict element "
                    f"at index {bad} ({bad_type}); expected list[dict | BaseModel]"
                )
                raise ConfigurationError(msg)
            rows = [_normalize_output_row(row) for row in data]
            if self._action == "upsert":
                if self._conflict_columns is None:
                    msg = "output: unreachable – upsert without conflict_columns"
                    raise ConfigurationError(msg)
                writer.upsert_many(rows=rows, conflict_columns=self._conflict_columns)
            else:
                writer.insert_many(rows=rows)
        else:
            msg = (
                f"output: DbOut.set() received {type(data).__name__}, "
                f"expected dict, list[dict], BaseModel, list[BaseModel], or None"
            )
            raise ConfigurationError(msg)
    finally:
        writer.close()

BlobCheckpointStore(*, container_client, source_fingerprint)

StateStore implementation backed by Azure Blob Storage.

Uses a single JSON blob per poller with ETag-based CAS (compare-and-swap) for all state mutations. Lease ownership is verified via owner_id and monotonically-increasing fencing tokens.

Source code in src/azure_functions_db/state/blob.py
def __init__(
    self,
    *,
    container_client: ContainerClient,
    source_fingerprint: str,
) -> None:
    self._container_client = container_client
    self._source_fingerprint = source_fingerprint

acquire_lease(poller_name, ttl_seconds)

Acquire a lease on the poller's state blob.

Creates the blob if it does not exist. If the blob exists and the lease has expired (past expires_at + grace), the lease is stolen with an incremented fencing token.

Returns a lease_id string in the format {owner_id}:{fencing_token}.

Raises LeaseConflictError when a lease is already active or another instance won the CAS race.

Source code in src/azure_functions_db/state/blob.py
def acquire_lease(self, poller_name: str, ttl_seconds: int) -> str:
    """Acquire a lease on the poller's state blob.

    Creates the blob if it does not exist.  If the blob exists and the
    lease has expired (past ``expires_at + grace``), the lease is stolen
    with an incremented fencing token.

    Returns a lease_id string in the format ``{owner_id}:{fencing_token}``.

    Raises ``LeaseConflictError`` when a lease is already active or
    another instance won the CAS race.
    """
    owner_id = uuid.uuid4().hex
    grace = _effective_grace(ttl_seconds)
    result = self._read_state(poller_name)

    if result is None:
        # Blob doesn't exist — create with fencing_token=1
        state = self._make_initial_state(
            poller_name, self._source_fingerprint, owner_id, ttl_seconds
        )
        try:
            self._write_state_create(poller_name, state)
        except ResourceExistsError:
            raise LeaseConflictError(
                f"Another instance created the state blob for poller '{poller_name}' first"
            ) from None
        lease_id = f"{owner_id}:1"
        logger.info(
            "Poller '%s': lease acquired (new blob), lease_id=%s",
            poller_name,
            lease_id,
        )
        return lease_id

    state, etag = result
    lease = state.get("lease", {})

    if not self._is_lease_expired(lease, grace):
        raise LeaseConflictError(
            f"Active lease on poller '{poller_name}' held by '{lease.get('owner_id')}'"
        )

    # Lease expired — steal it with incremented fencing token
    old_token = lease.get("fencing_token", 0)
    new_token = old_token + 1
    now = _now_utc()
    state["lease"] = {
        "owner_id": owner_id,
        "fencing_token": new_token,
        "acquired_at": _iso(now),
        "heartbeat_at": _iso(now),
        "expires_at": _iso(now + timedelta(seconds=ttl_seconds)),
    }

    try:
        self._write_state_conditional(poller_name, state, etag)
    except HttpResponseError as exc:
        if exc.status_code == 412:  # noqa: PLR2004
            raise LeaseConflictError(
                f"CAS conflict acquiring lease for poller '{poller_name}'"
            ) from exc
        raise StateStoreError(f"Failed to acquire lease for poller '{poller_name}'") from exc

    lease_id = f"{owner_id}:{new_token}"
    logger.info(
        "Poller '%s': lease acquired (expired steal), lease_id=%s, fencing_token=%d",
        poller_name,
        lease_id,
        new_token,
    )
    return lease_id

renew_lease(poller_name, lease_id, ttl_seconds)

Renew an existing lease by extending its expiry.

Raises LostLeaseError if the lease is not held by this caller or the CAS write fails.

Source code in src/azure_functions_db/state/blob.py
def renew_lease(self, poller_name: str, lease_id: str, ttl_seconds: int) -> None:
    """Renew an existing lease by extending its expiry.

    Raises ``LostLeaseError`` if the lease is not held by this caller
    or the CAS write fails.
    """
    owner_id, fencing_token = _parse_lease_id(lease_id)
    result = self._read_state(poller_name)

    if result is None:
        raise LostLeaseError(f"State blob not found for poller '{poller_name}'")

    state, etag = result
    self._verify_lease(state, owner_id, fencing_token)

    now = _now_utc()
    state["lease"]["heartbeat_at"] = _iso(now)
    state["lease"]["expires_at"] = _iso(now + timedelta(seconds=ttl_seconds))

    try:
        self._write_state_conditional(poller_name, state, etag)
    except HttpResponseError as exc:
        if exc.status_code == 412:  # noqa: PLR2004
            raise LostLeaseError(
                f"CAS conflict renewing lease for poller '{poller_name}'"
            ) from exc
        raise StateStoreError(f"Failed to renew lease for poller '{poller_name}'") from exc

release_lease(poller_name, lease_id)

Release the lease by setting expires_at to now.

Preserves the fencing token so the next acquisition increments it. Only owner_id and fencing_token are verified — expiry is intentionally skipped so that a holder can still release a lease that has nominally expired but has not yet been stolen.

Raises LostLeaseError if owner/token do not match or CAS fails.

Source code in src/azure_functions_db/state/blob.py
def release_lease(self, poller_name: str, lease_id: str) -> None:
    """Release the lease by setting ``expires_at`` to now.

    Preserves the fencing token so the next acquisition increments it.
    Only owner_id and fencing_token are verified — expiry is intentionally
    skipped so that a holder can still release a lease that has nominally
    expired but has not yet been stolen.

    Raises ``LostLeaseError`` if owner/token do not match or CAS fails.
    """
    owner_id, fencing_token = _parse_lease_id(lease_id)
    result = self._read_state(poller_name)

    if result is None:
        raise LostLeaseError(f"State blob not found for poller '{poller_name}'")

    state, etag = result

    lease = state.get("lease")
    if lease is None:
        raise LostLeaseError("No lease present in state")
    if lease.get("owner_id") != owner_id:
        raise LostLeaseError(
            f"Lease owner mismatch: expected '{owner_id}', found '{lease.get('owner_id')}'"
        )
    if lease.get("fencing_token") != fencing_token:
        raise LostLeaseError(
            f"Fencing token mismatch: expected {fencing_token}, "
            f"found {lease.get('fencing_token')}"
        )

    state["lease"]["expires_at"] = _iso(_now_utc())

    try:
        self._write_state_conditional(poller_name, state, etag)
    except HttpResponseError as exc:
        if exc.status_code == 412:  # noqa: PLR2004
            raise LostLeaseError(
                f"CAS conflict releasing lease for poller '{poller_name}'"
            ) from exc
        raise StateStoreError(f"Failed to release lease for poller '{poller_name}'") from exc

load_checkpoint(poller_name)

Load the checkpoint for the given poller.

Returns an empty dict if the state blob does not exist. This method is read-only and has no side effects.

Source code in src/azure_functions_db/state/blob.py
def load_checkpoint(self, poller_name: str) -> dict[str, object]:
    """Load the checkpoint for the given poller.

    Returns an empty dict if the state blob does not exist.
    This method is read-only and has no side effects.
    """
    result = self._read_state(poller_name)
    if result is None:
        return {}
    state, _etag = result
    checkpoint: dict[str, object] = state.get("checkpoint", {})
    return checkpoint

commit_checkpoint(poller_name, checkpoint, lease_id)

Commit a new checkpoint under the protection of the current lease.

Raises LostLeaseError if the lease is not held by this caller or the CAS write fails.

Source code in src/azure_functions_db/state/blob.py
def commit_checkpoint(
    self,
    poller_name: str,
    checkpoint: dict[str, object],
    lease_id: str,
) -> None:
    """Commit a new checkpoint under the protection of the current lease.

    Raises ``LostLeaseError`` if the lease is not held by this caller
    or the CAS write fails.
    """
    owner_id, fencing_token = _parse_lease_id(lease_id)
    result = self._read_state(poller_name)

    if result is None:
        raise LostLeaseError(f"State blob not found for poller '{poller_name}'")

    state, etag = result
    self._verify_lease(state, owner_id, fencing_token)

    state["checkpoint"] = dict(checkpoint)

    try:
        self._write_state_conditional(poller_name, state, etag)
    except HttpResponseError as exc:
        if exc.status_code == 412:  # noqa: PLR2004
            raise LostLeaseError(
                f"CAS conflict committing checkpoint for poller '{poller_name}'"
            ) from exc
        raise StateStoreError(
            f"Failed to commit checkpoint for poller '{poller_name}'"
        ) from exc

get_db_metadata(func)

Return db metadata if the function was decorated with DbBindings decorators.

Returns None if the function has no db metadata attached.

Source code in src/azure_functions_db/decorator.py
def get_db_metadata(func: Any) -> dict[str, Any] | None:
    """Return db metadata if the function was decorated with DbBindings decorators.

    Returns None if the function has no db metadata attached.
    """
    toolkit_meta = getattr(func, _TOOLKIT_META_ATTR, None)
    if isinstance(toolkit_meta, dict):
        db_meta = toolkit_meta.get("db")
        if isinstance(db_meta, dict):
            return db_meta
    return None