SQLAlchemy FastAPI middleware

May 9, 2026 · View on GitHub

ci ci codecov License: MIT pip Downloads Updates

Description

Provides SQLAlchemy middleware for FastAPI using AsyncSession and async engine.

Install

      pip install fastapi-async-sqlalchemy

It also works with sqlmodel

Examples

Note that the session object provided by db.session is based on the Python3.7+ ContextVar. This means that each session is linked to the individual request context in which it was created.


from fastapi import FastAPI
from fastapi_async_sqlalchemy import SQLAlchemyMiddleware
from fastapi_async_sqlalchemy import db  # provide access to a database session
from sqlalchemy import column
from sqlalchemy import table

app = FastAPI()
app.add_middleware(
    SQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
    engine_args={              # engine arguments example
        "echo": True,          # print all SQL statements
        "pool_pre_ping": True, # feature will normally emit SQL equivalent to “SELECT 1” each time a connection is checked out from the pool
        "pool_size": 5,        # number of connections to keep open at a time
        "max_overflow": 10,    # number of connections to allow to be opened above pool_size
    },
)
# Engines created from ``db_url`` are owned by the middleware and are disposed
# during the application shutdown lifespan. Tests that need shutdown behavior
# should run the app lifespan, for example with ``with TestClient(app)``.
# once the middleware is applied, any route can then access the database session
# from the global ``db``

foo = table("ms_files", column("id"))

# Usage inside of a route
@app.get("/")
async def get_files():
    result = await db.session.execute(foo.select())
    return result.fetchall()

async def get_db_fetch():
    # It uses the same ``db`` object and use it as a context manager:
    async with db():
        result = await db.session.execute(foo.select())
        return result.fetchall()

# Usage inside of a route using a db context
@app.get("/db_context")
async def db_context():
    return await get_db_fetch()

# Usage outside of a route using a db context
@app.on_event("startup")
async def on_startup():
    # We are outside of a request context, therefore we cannot rely on ``SQLAlchemyMiddleware``
    # to create a database session for us.
    result = await get_db_fetch()


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8002)

Engine ownership

When the middleware receives db_url, it creates and owns the async engine. The engine is kept for the application lifetime and disposed when the ASGI lifespan shutdown completes. It is not disposed per request. Disposal also runs when the lifespan ends with a failure (lifespan.shutdown.failed or lifespan.startup.failed), so a raising user shutdown handler does not leak the connection pool.

Engine disposal happens before the lifespan acknowledgement is forwarded to the ASGI server, so a stuck pool drain will block the server's graceful shutdown ack. Configure your ASGI server's graceful shutdown timeout (for example uvicorn's --timeout-graceful-shutdown) so it accommodates the worst-case time required to close active connections.

When the middleware receives custom_engine, the caller owns that engine. The middleware will use it but will not dispose it during application shutdown:

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
app.add_middleware(SQLAlchemyMiddleware, custom_engine=engine)

# Later, in caller-managed shutdown code or test cleanup:
await engine.dispose()

Manual disposal outside ASGI lifespan

When SQLAlchemyMiddleware(db_url=...) is constructed outside an ASGI application lifespan — for example in a script, an ad-hoc test harness, or when embedding the middleware in a non-ASGI runtime — there is no lifespan.shutdown event to trigger engine disposal. In that case call await middleware.dispose() explicitly so the middleware-owned engine is released:

middleware = SQLAlchemyMiddleware(app, db_url="postgresql+asyncpg://...")
try:
    ...  # use db.session
finally:
    await middleware.dispose()

dispose() is idempotent on success and is safe to retry if it raises: the proxy session bindings are cleared deterministically so a subsequent call actually re-attempts the underlying engine.dispose(). The same guidance applies to each pair created by create_middleware_and_session_proxy().

Request transactions and streaming responses

When SQLAlchemyMiddleware(..., commit_on_exit=True) manages a normal non-streaming HTTP request, the request session is committed before http.response.start is forwarded to the ASGI server. If commit, rollback, or close fails, the failure happens before a successful response is reported to the client.

Streaming response body generation has a different lifetime from a normal request transaction. Do not rely on the middleware-managed request session to stay open while a StreamingResponse/FileResponse yields chunks. Open an explicit session inside the generator so the body owns the database lifetime:

from fastapi.responses import StreamingResponse

@app.get("/export")
async def export():
    async def rows():
        async with db():
            result = await db.session.stream(foo.select())
            async for row in result:
                yield f"{row.id}\n".encode()
    return StreamingResponse(rows(), media_type="text/plain")

Implicit commit_on_exit=True is not a safe way to report streaming write success: the response may have already started before an unbounded body is finished. If a streaming route needs database writes, either complete and commit the write in a separate explicit async with db(commit_on_exit=True) block before creating the streaming response, or make the streaming generator use an explicit async with db(commit_on_exit=True) block and design the API so clients do not treat early chunks as write success.

For applications that previously used db.session directly inside streaming generators, move that code into an explicit generator-owned context as shown above. This keeps database access available for the whole body while making it clear that the session lifetime belongs to the stream, not the original request transaction.

Type hints for db

Use DBSessionMeta (or its alias DBSessionType) when you need to annotate a function or attribute that holds the db proxy:

from fastapi_async_sqlalchemy import DBSessionMeta, db

def get_db() -> DBSessionMeta:
    return db

At runtime DBSessionMeta is the metaclass of db, so isinstance(db, DBSessionMeta) and type(db) is DBSessionMeta both work. For static type checkers (mypy, pyright) and IDE autocompletion DBSessionMeta resolves to a structural Protocol describing the public API (session, connection, gather, and the db(...) call).

SQLAlchemy events (before_insert, after_insert, ...)

SQLAlchemy's event system is independent of the session/engine — register listeners on your mapped classes (or on Mapper/Session) with sqlalchemy.event.listens_for exactly as you would with a synchronous SQLAlchemy setup. The middleware does not change how events fire.

from datetime import datetime
from sqlalchemy import Column, DateTime, Integer, String, event
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow)


@event.listens_for(User, "before_insert")
def normalize(mapper, connection, target):
    target.username = target.username.lower().strip()


@event.listens_for(User, "before_update")
def touch_updated_at(mapper, connection, target):
    target.updated_at = datetime.utcnow()


@event.listens_for(User, "after_insert")
def log_insert(mapper, connection, target):
    print(f"user created: id={target.id}")

Mapper-level events (before_insert, after_insert, before_update, after_update, before_delete, after_delete) receive a synchronous connection argument — do not await inside them and do not call async ORM APIs there. If you need async work after a write, do it after await db.session.commit() returns, or use Session-level events such as after_flush / after_commit and schedule async work from there.

A complete runnable example with validation, timestamps, logging, and soft-delete hooks lives at examples/events_example.py.

Usage of multiple databases

databases.py

from fastapi import FastAPI
from fastapi_async_sqlalchemy import create_middleware_and_session_proxy

FirstSQLAlchemyMiddleware, first_db = create_middleware_and_session_proxy()
SecondSQLAlchemyMiddleware, second_db = create_middleware_and_session_proxy()

Use a separate middleware/session proxy pair for each independent app or database. Reusing the same proxy with a different live engine is rejected so requests cannot silently switch to another database binding.

main.py

from fastapi import FastAPI

from databases import FirstSQLAlchemyMiddleware, SecondSQLAlchemyMiddleware
from routes import router

app = FastAPI()

app.include_router(router)

app.add_middleware(
    FirstSQLAlchemyMiddleware,
    db_url="postgresql+asyncpg://user:user@192.168.88.200:5432/primary_db",
    engine_args={
        "pool_size": 5,
        "max_overflow": 10,
    },
)
app.add_middleware(
    SecondSQLAlchemyMiddleware,
    db_url="mysql+aiomysql://user:user@192.168.88.200:5432/primary_db",
    engine_args={
        "pool_size": 5,
        "max_overflow": 10,
    },
)

routes.py

import asyncio

from fastapi import APIRouter
from sqlalchemy import column, table, text

from databases import first_db, second_db

router = APIRouter()

foo = table("ms_files", column("id"))

@router.get("/first-db-files")
async def get_files_from_first_db():
    result = await first_db.session.execute(foo.select())
    return result.fetchall()


@router.get("/second-db-files")
async def get_files_from_second_db():
    result = await second_db.session.execute(foo.select())
    return result.fetchall()


@router.get("/concurrent-queries")
async def parallel_select():
    async with first_db(multi_sessions=True, max_concurrent=10):
        async def execute_query(query):
            async with first_db.connection() as session:
                return await session.execute(text(query))

        tasks = [
            asyncio.create_task(execute_query("SELECT 1")),
            asyncio.create_task(execute_query("SELECT 2")),
            asyncio.create_task(execute_query("SELECT 3")),
            asyncio.create_task(execute_query("SELECT 4")),
            asyncio.create_task(execute_query("SELECT 5")),
            asyncio.create_task(execute_query("SELECT 6")),
        ]

        await asyncio.gather(*tasks)

Child tasks that use database sessions must finish before the owning async with db(multi_sessions=True) block exits. When max_concurrent is set, child tasks should use db.connection() or pass coroutine objects to db.gather() so the middleware can own both the session lifetime and the semaphore slot. Already-created Task or Future objects are rejected by throttled db.gather() because they may have started outside the semaphore.