sqlalchemy async

overview

sqlalchemy - python sql toolkit and orm with async support

  • mature, battle-tested orm (20+ years)
  • async support since v1.4
  • type hints with Mapped[] syntax
  • supports postgresql, mysql, sqlite, and more
  • integrates with pydantic for validation

when to use

use sqlalchemy when:

  • need database persistence
  • complex queries and relationships
  • transactions and data integrity matter
  • building production applications
  • need database migrations (with alembic)

don’t use when:

  • simple data containers suffice (use dataclasses)
  • only need validation (use pydantic)
  • working with nosql databases
  • extreme performance requirements (use raw sql)

installation

# core with async
uv add 'sqlalchemy[asyncio]'

# database drivers
uv add asyncpg        # postgresql
uv add aiosqlite      # sqlite
uv add aiomysql       # mysql

# optional: migration tool
uv add alembic

basic setup

async engine and base

from typing import Optional, List
from datetime import datetime
from sqlalchemy import String, ForeignKey, select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, AsyncAttrs

# critical: include AsyncAttrs for lazy loading support
class Base(AsyncAttrs, DeclarativeBase):
    pass

# postgresql
engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    echo=True,  # log sql statements
    pool_pre_ping=True,  # verify connections
    pool_size=5,
    max_overflow=10
)

# sqlite (for development)
# engine = create_async_engine("sqlite+aiosqlite:///./app.db", echo=True)

async_session = async_sessionmaker(engine, expire_on_commit=False)

basic model

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
    name: Mapped[str] = mapped_column(String(100))
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[Optional[datetime]] = mapped_column(onupdate=datetime.utcnow)

    # relationships - always specify loading strategy
    posts: Mapped[List["Post"]] = relationship(
        back_populates="author",
        lazy="selectin"  # prevent greenlet errors
    )

async patterns

context manager pattern

async def create_user(email: str, name: str) -> User:
    async with async_session() as session:
        async with session.begin():  # auto-commit on success
            user = User(email=email, name=name)
            session.add(user)
            # no explicit commit needed
        return user

# explicit transaction control
async def transfer_funds(from_id: int, to_id: int, amount: float):
    async with async_session() as session:
        try:
            async with session.begin():
                from_account = await session.get(Account, from_id)
                to_account = await session.get(Account, to_id)

                from_account.balance -= amount
                to_account.balance += amount
                # auto-commits if no exception
        except Exception:
            # auto-rollback on exception
            raise

preventing greenlet errors

from sqlalchemy.orm import selectinload, joinedload

# wrong - causes MissingGreenlet error
async def get_user_wrong(session: AsyncSession, user_id: int):
    user = await session.get(User, user_id)
    posts = user.posts  # error: lazy loading in async context

# correct - eager load relationships
async def get_user_with_posts(session: AsyncSession, user_id: int):
    stmt = (
        select(User)
        .where(User.id == user_id)
        .options(selectinload(User.posts))  # eager load
    )
    result = await session.execute(stmt)
    return result.scalar_one_or_none()

relationships

one-to-many

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    content: Mapped[str] = mapped_column(Text)
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    # back reference
    author: Mapped["User"] = relationship(
        back_populates="posts",
        lazy="joined"  # always join
    )

many-to-many

from sqlalchemy import Table

# association table
user_role = Table(
    "user_roles",
    Base.metadata,
    mapped_column("user_id", ForeignKey("users.id"), primary_key=True),
    mapped_column("role_id", ForeignKey("roles.id"), primary_key=True),
)

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(100), unique=True)

    roles: Mapped[List["Role"]] = relationship(
        secondary=user_role,
        back_populates="users",
        lazy="selectin"
    )

class Role(Base):
    __tablename__ = "roles"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)

    users: Mapped[List["User"]] = relationship(
        secondary=user_role,
        back_populates="roles",
        lazy="selectin"
    )

query patterns

basic queries

from sqlalchemy import select, and_, or_, func

async def query_examples(session: AsyncSession):
    # select with where
    stmt = select(User).where(User.email == "test@example.com")
    result = await session.execute(stmt)
    user = result.scalar_one_or_none()

    # multiple conditions
    stmt = select(User).where(
        and_(
            User.created_at > datetime(2023, 1, 1),
            or_(
                User.email.like("%@company.com"),
                User.is_admin == True
            )
        )
    )

    # join with eager loading
    stmt = (
        select(User)
        .join(User.posts)
        .where(Post.published == True)
        .options(contains_eager(User.posts))
    )

    # aggregation
    stmt = (
        select(User.id, func.count(Post.id).label("post_count"))
        .join(Post)
        .group_by(User.id)
        .having(func.count(Post.id) > 5)
    )

pagination

async def get_users_paginated(
    session: AsyncSession,
    page: int = 1,
    per_page: int = 20
) -> tuple[List[User], int]:
    # count total
    count_stmt = select(func.count()).select_from(User)
    total = await session.scalar(count_stmt)

    # get page
    stmt = (
        select(User)
        .offset((page - 1) * per_page)
        .limit(per_page)
    )
    result = await session.execute(stmt)
    users = result.scalars().all()

    return users, total

database types

json/jsonb (postgresql)

from sqlalchemy import JSON
from sqlalchemy.dialects.postgresql import JSONB

class Document(Base):
    __tablename__ = "documents"

    id: Mapped[int] = mapped_column(primary_key=True)

    # portable json
    metadata: Mapped[dict] = mapped_column(
        JSON().with_variant(JSONB, "postgresql")
    )

    # json queries (postgresql)
    # await session.execute(
    #     select(Document).where(
    #         Document.metadata["type"] == "report"
    #     )
    # )

uuid support

import uuid
from sqlalchemy.dialects.postgresql import UUID as PG_UUID

class User(Base):
    __tablename__ = "users"

    # postgresql: native uuid, sqlite: char(36)
    id: Mapped[uuid.UUID] = mapped_column(
        PG_UUID(as_uuid=True).with_variant(String(36), "sqlite"),
        primary_key=True,
        default=uuid.uuid4
    )

integration with pydantic

from pydantic import BaseModel, ConfigDict

# pydantic model for validation
class UserCreate(BaseModel):
    email: str
    name: str

class UserResponse(BaseModel):
    model_config = ConfigDict(from_attributes=True)

    id: int
    email: str
    name: str
    created_at: datetime

# service layer
async def create_user(session: AsyncSession, user_data: UserCreate) -> UserResponse:
    user = User(**user_data.model_dump())
    session.add(user)
    await session.commit()
    await session.refresh(user)
    return UserResponse.model_validate(user)

performance optimization

bulk operations

async def bulk_insert_users(users_data: List[dict]):
    async with async_session() as session:
        async with session.begin():
            # use bulk_insert_mappings for performance
            await session.run_sync(
                lambda sync_session: sync_session.bulk_insert_mappings(
                    User, users_data
                )
            )

# modern insert with returning
from sqlalchemy import insert

async def bulk_create_with_returning(users_data: List[dict]):
    async with async_session() as session:
        stmt = insert(User).values(users_data).returning(User)
        result = await session.execute(stmt)
        await session.commit()
        return result.scalars().all()

streaming results

async def process_large_dataset():
    async with async_session() as session:
        stmt = select(User).execution_options(yield_per=100)

        # stream results instead of loading all into memory
        async with session.stream(stmt) as result:
            async for user in result.scalars():
                await process_user(user)

hybrid properties

from sqlalchemy.ext.hybrid import hybrid_property

class Order(Base):
    __tablename__ = "orders"

    id: Mapped[int] = mapped_column(primary_key=True)
    quantity: Mapped[int]
    unit_price: Mapped[float]

    @hybrid_property
    def total_price(self) -> float:
        return self.quantity * self.unit_price

    @total_price.expression
    def total_price(cls):
        return cls.quantity * cls.unit_price

# use in queries
# select(Order).where(Order.total_price > 100)

migrations with alembic

# initialize alembic
uv run alembic init migrations

# create migration
uv run alembic revision --autogenerate -m "add users table"

# apply migrations
uv run alembic upgrade head

# rollback
uv run alembic downgrade -1

testing

import pytest
from sqlalchemy.pool import NullPool

@pytest.fixture
async def test_session():
    # use in-memory sqlite for tests
    engine = create_async_engine(
        "sqlite+aiosqlite:///:memory:",
        poolclass=NullPool,
    )

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async_session = async_sessionmaker(engine, expire_on_commit=False)

    async with async_session() as session:
        yield session

    await engine.dispose()

async def test_create_user(test_session: AsyncSession):
    user = User(email="test@example.com", name="test user")
    test_session.add(user)
    await test_session.commit()

    result = await test_session.get(User, user.id)
    assert result.email == "test@example.com"

comparison with alternatives

featuredataclassespydanticsqlalchemy
persistence
relationshipsmanualautomatic
queriesfull sql
transactions
migrations✓ (alembic)
complexityminimalmoderatehigh

best practices

do

  • use AsyncAttrs in your base class
  • set expire_on_commit=False for async sessions
  • always specify relationship loading strategy
  • use selectinload() for collections
  • handle database operations in try/except blocks
  • use connection pooling appropriately
  • test with both sqlite and postgresql

don’t

  • access unloaded relationships outside async context
  • mix sync and async operations without run_sync()
  • forget to dispose engines in tests
  • use implicit transactions for complex operations
  • ignore database-specific features when needed

common pitfalls

greenlet errors

# wrong
async def bad_pattern(session: AsyncSession):
    users = await session.execute(select(User))
    for user in users.scalars():
        print(user.posts)  # MissingGreenlet error!

# correct
async def good_pattern(session: AsyncSession):
    stmt = select(User).options(selectinload(User.posts))
    users = await session.execute(stmt)
    for user in users.scalars():
        print(user.posts)  # works!

session scope

# wrong - session used outside context
async def get_user(user_id: int):
    async with async_session() as session:
        return await session.get(User, user_id)
    # user is detached here!

# correct - process within session
async def get_user_dict(user_id: int) -> dict:
    async with async_session() as session:
        user = await session.get(User, user_id)
        return {"id": user.id, "email": user.email}

complete example

from datetime import datetime
from typing import List, Optional
from sqlalchemy import String, select, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, AsyncAttrs
from pydantic import BaseModel, ConfigDict

# models
class Base(AsyncAttrs, DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
    name: Mapped[str]
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    posts: Mapped[List["Post"]] = relationship(
        back_populates="author",
        lazy="selectin",
        cascade="all, delete-orphan"
    )

class Post(Base):
    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    author: Mapped["User"] = relationship(back_populates="posts")

# pydantic schemas
class UserCreate(BaseModel):
    email: str
    name: str

class UserResponse(BaseModel):
    model_config = ConfigDict(from_attributes=True)

    id: int
    email: str
    name: str
    post_count: int = 0

# service
class UserService:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def create_user(self, user_data: UserCreate) -> User:
        user = User(**user_data.model_dump())
        self.session.add(user)
        await self.session.commit()
        await self.session.refresh(user)
        return user

    async def get_users_with_post_count(self) -> List[UserResponse]:
        stmt = (
            select(
                User,
                func.count(Post.id).label("post_count")
            )
            .outerjoin(Post)
            .group_by(User.id)
        )

        result = await self.session.execute(stmt)

        users = []
        for user, post_count in result:
            user_dict = {
                "id": user.id,
                "email": user.email,
                "name": user.name,
                "post_count": post_count or 0
            }
            users.append(UserResponse(**user_dict))

        return users

# usage
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
async_session = async_sessionmaker(engine, expire_on_commit=False)

async def main():
    async with async_session() as session:
        service = UserService(session)

        # create user
        user_data = UserCreate(email="alice@example.com", name="alice")
        user = await service.create_user(user_data)

        # get users with counts
        users = await service.get_users_with_post_count()

references

══════════════════════════════════════════════════════════════════
on this page