sqlalchemy async
on this page
overview
sqlalchemy - python sql toolkit and orm with async support
- mature, battle-tested orm (20+ years)
- async support since v1.4
- type hints with Mapped[]syntax
- supports postgresql, mysql, sqlite, and more
- integrates with pydantic for validation
when to use
use sqlalchemy when:
- need database persistence
- complex queries and relationships
- transactions and data integrity matter
- building production applications
- need database migrations (with alembic)
don’t use when:
- simple data containers suffice (use dataclasses)
- only need validation (use pydantic)
- working with nosql databases
- extreme performance requirements (use raw sql)
installation
# core with async
uv add 'sqlalchemy[asyncio]'
# database drivers
uv add asyncpg        # postgresql
uv add aiosqlite      # sqlite
uv add aiomysql       # mysql
# optional: migration tool
uv add alembicbasic setup
async engine and base
from typing import Optional, List
from datetime import datetime
from sqlalchemy import String, ForeignKey, select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, AsyncAttrs
# critical: include AsyncAttrs for lazy loading support
class Base(AsyncAttrs, DeclarativeBase):
    pass
# postgresql
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    echo=True,  # log sql statements
    pool_pre_ping=True,  # verify connections
    pool_size=5,
    max_overflow=10
)
# sqlite (for development)
# engine = create_async_engine("sqlite+aiosqlite:///./app.db", echo=True)
async_session = async_sessionmaker(engine, expire_on_commit=False)basic model
class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
    name: Mapped[str] = mapped_column(String(100))
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[Optional[datetime]] = mapped_column(onupdate=datetime.utcnow)
    # relationships - always specify loading strategy
    posts: Mapped[List["Post"]] = relationship(
        back_populates="author",
        lazy="selectin"  # prevent greenlet errors
    )async patterns
context manager pattern
async def create_user(email: str, name: str) -> User:
    async with async_session() as session:
        async with session.begin():  # auto-commit on success
            user = User(email=email, name=name)
            session.add(user)
            # no explicit commit needed
        return user
# explicit transaction control
async def transfer_funds(from_id: int, to_id: int, amount: float):
    async with async_session() as session:
        try:
            async with session.begin():
                from_account = await session.get(Account, from_id)
                to_account = await session.get(Account, to_id)
                from_account.balance -= amount
                to_account.balance += amount
                # auto-commits if no exception
        except Exception:
            # auto-rollback on exception
            raisepreventing greenlet errors
from sqlalchemy.orm import selectinload, joinedload
# wrong - causes MissingGreenlet error
async def get_user_wrong(session: AsyncSession, user_id: int):
    user = await session.get(User, user_id)
    posts = user.posts  # error: lazy loading in async context
# correct - eager load relationships
async def get_user_with_posts(session: AsyncSession, user_id: int):
    stmt = (
        select(User)
        .where(User.id == user_id)
        .options(selectinload(User.posts))  # eager load
    )
    result = await session.execute(stmt)
    return result.scalar_one_or_none()relationships
one-to-many
class Post(Base):
    __tablename__ = "posts"
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(Text)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    # back reference
    author: Mapped["User"] = relationship(
        back_populates="posts",
        lazy="joined"  # always join
    )many-to-many
from sqlalchemy import Table
# association table
user_role = Table(
    "user_roles",
    Base.metadata,
    mapped_column("user_id", ForeignKey("users.id"), primary_key=True),
    mapped_column("role_id", ForeignKey("roles.id"), primary_key=True),
)
class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(100), unique=True)
    roles: Mapped[List["Role"]] = relationship(
        secondary=user_role,
        back_populates="users",
        lazy="selectin"
    )
class Role(Base):
    __tablename__ = "roles"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)
    users: Mapped[List["User"]] = relationship(
        secondary=user_role,
        back_populates="roles",
        lazy="selectin"
    )query patterns
basic queries
from sqlalchemy import select, and_, or_, func
async def query_examples(session: AsyncSession):
    # select with where
    stmt = select(User).where(User.email == "test@example.com")
    result = await session.execute(stmt)
    user = result.scalar_one_or_none()
    # multiple conditions
    stmt = select(User).where(
        and_(
            User.created_at > datetime(2023, 1, 1),
            or_(
                User.email.like("%@company.com"),
                User.is_admin == True
            )
        )
    )
    # join with eager loading
    stmt = (
        select(User)
        .join(User.posts)
        .where(Post.published == True)
        .options(contains_eager(User.posts))
    )
    # aggregation
    stmt = (
        select(User.id, func.count(Post.id).label("post_count"))
        .join(Post)
        .group_by(User.id)
        .having(func.count(Post.id) > 5)
    )pagination
async def get_users_paginated(
    session: AsyncSession,
    page: int = 1,
    per_page: int = 20
) -> tuple[List[User], int]:
    # count total
    count_stmt = select(func.count()).select_from(User)
    total = await session.scalar(count_stmt)
    # get page
    stmt = (
        select(User)
        .offset((page - 1) * per_page)
        .limit(per_page)
    )
    result = await session.execute(stmt)
    users = result.scalars().all()
    return users, totaldatabase types
json/jsonb (postgresql)
from sqlalchemy import JSON
from sqlalchemy.dialects.postgresql import JSONB
class Document(Base):
    __tablename__ = "documents"
    id: Mapped[int] = mapped_column(primary_key=True)
    # portable json
    metadata: Mapped[dict] = mapped_column(
        JSON().with_variant(JSONB, "postgresql")
    )
    # json queries (postgresql)
    # await session.execute(
    #     select(Document).where(
    #         Document.metadata["type"] == "report"
    #     )
    # )uuid support
import uuid
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
class User(Base):
    __tablename__ = "users"
    # postgresql: native uuid, sqlite: char(36)
    id: Mapped[uuid.UUID] = mapped_column(
        PG_UUID(as_uuid=True).with_variant(String(36), "sqlite"),
        primary_key=True,
        default=uuid.uuid4
    )integration with pydantic
from pydantic import BaseModel, ConfigDict
# pydantic model for validation
class UserCreate(BaseModel):
    email: str
    name: str
class UserResponse(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    id: int
    email: str
    name: str
    created_at: datetime
# service layer
async def create_user(session: AsyncSession, user_data: UserCreate) -> UserResponse:
    user = User(**user_data.model_dump())
    session.add(user)
    await session.commit()
    await session.refresh(user)
    return UserResponse.model_validate(user)performance optimization
bulk operations
async def bulk_insert_users(users_data: List[dict]):
    async with async_session() as session:
        async with session.begin():
            # use bulk_insert_mappings for performance
            await session.run_sync(
                lambda sync_session: sync_session.bulk_insert_mappings(
                    User, users_data
                )
            )
# modern insert with returning
from sqlalchemy import insert
async def bulk_create_with_returning(users_data: List[dict]):
    async with async_session() as session:
        stmt = insert(User).values(users_data).returning(User)
        result = await session.execute(stmt)
        await session.commit()
        return result.scalars().all()streaming results
async def process_large_dataset():
    async with async_session() as session:
        stmt = select(User).execution_options(yield_per=100)
        # stream results instead of loading all into memory
        async with session.stream(stmt) as result:
            async for user in result.scalars():
                await process_user(user)hybrid properties
from sqlalchemy.ext.hybrid import hybrid_property
class Order(Base):
    __tablename__ = "orders"
    id: Mapped[int] = mapped_column(primary_key=True)
    quantity: Mapped[int]
    unit_price: Mapped[float]
    @hybrid_property
    def total_price(self) -> float:
        return self.quantity * self.unit_price
    @total_price.expression
    def total_price(cls):
        return cls.quantity * cls.unit_price
# use in queries
# select(Order).where(Order.total_price > 100)migrations with alembic
# initialize alembic
uv run alembic init migrations
# create migration
uv run alembic revision --autogenerate -m "add users table"
# apply migrations
uv run alembic upgrade head
# rollback
uv run alembic downgrade -1testing
import pytest
from sqlalchemy.pool import NullPool
@pytest.fixture
async def test_session():
    # use in-memory sqlite for tests
    engine = create_async_engine(
        "sqlite+aiosqlite:///:memory:",
        poolclass=NullPool,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    async_session = async_sessionmaker(engine, expire_on_commit=False)
    async with async_session() as session:
        yield session
    await engine.dispose()
async def test_create_user(test_session: AsyncSession):
    user = User(email="test@example.com", name="test user")
    test_session.add(user)
    await test_session.commit()
    result = await test_session.get(User, user.id)
    assert result.email == "test@example.com"comparison with alternatives
| feature | dataclasses | pydantic | sqlalchemy | 
|---|---|---|---|
| persistence | ✗ | ✗ | ✓ | 
| relationships | ✗ | manual | automatic | 
| queries | ✗ | ✗ | full sql | 
| transactions | ✗ | ✗ | ✓ | 
| migrations | ✗ | ✗ | ✓ (alembic) | 
| complexity | minimal | moderate | high | 
best practices
do
- use AsyncAttrsin your base class
- set expire_on_commit=Falsefor async sessions
- always specify relationship loading strategy
- use selectinload()for collections
- handle database operations in try/except blocks
- use connection pooling appropriately
- test with both sqlite and postgresql
don’t
- access unloaded relationships outside async context
- mix sync and async operations without run_sync()
- forget to dispose engines in tests
- use implicit transactions for complex operations
- ignore database-specific features when needed
common pitfalls
greenlet errors
# wrong
async def bad_pattern(session: AsyncSession):
    users = await session.execute(select(User))
    for user in users.scalars():
        print(user.posts)  # MissingGreenlet error!
# correct
async def good_pattern(session: AsyncSession):
    stmt = select(User).options(selectinload(User.posts))
    users = await session.execute(stmt)
    for user in users.scalars():
        print(user.posts)  # works!session scope
# wrong - session used outside context
async def get_user(user_id: int):
    async with async_session() as session:
        return await session.get(User, user_id)
    # user is detached here!
# correct - process within session
async def get_user_dict(user_id: int) -> dict:
    async with async_session() as session:
        user = await session.get(User, user_id)
        return {"id": user.id, "email": user.email}complete example
from datetime import datetime
from typing import List, Optional
from sqlalchemy import String, select, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, AsyncAttrs
from pydantic import BaseModel, ConfigDict
# models
class Base(AsyncAttrs, DeclarativeBase):
    pass
class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
    name: Mapped[str]
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    posts: Mapped[List["Post"]] = relationship(
        back_populates="author",
        lazy="selectin",
        cascade="all, delete-orphan"
    )
class Post(Base):
    __tablename__ = "posts"
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    author: Mapped["User"] = relationship(back_populates="posts")
# pydantic schemas
class UserCreate(BaseModel):
    email: str
    name: str
class UserResponse(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    id: int
    email: str
    name: str
    post_count: int = 0
# service
class UserService:
    def __init__(self, session: AsyncSession):
        self.session = session
    async def create_user(self, user_data: UserCreate) -> User:
        user = User(**user_data.model_dump())
        self.session.add(user)
        await self.session.commit()
        await self.session.refresh(user)
        return user
    async def get_users_with_post_count(self) -> List[UserResponse]:
        stmt = (
            select(
                User,
                func.count(Post.id).label("post_count")
            )
            .outerjoin(Post)
            .group_by(User.id)
        )
        result = await self.session.execute(stmt)
        users = []
        for user, post_count in result:
            user_dict = {
                "id": user.id,
                "email": user.email,
                "name": user.name,
                "post_count": post_count or 0
            }
            users.append(UserResponse(**user_dict))
        return users
# usage
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
async_session = async_sessionmaker(engine, expire_on_commit=False)
async def main():
    async with async_session() as session:
        service = UserService(session)
        # create user
        user_data = UserCreate(email="alice@example.com", name="alice")
        user = await service.create_user(user_data)
        # get users with counts
        users = await service.get_users_with_post_count()