sqlalchemy async
on this page
overview
sqlalchemy - python sql toolkit and orm with async support
- mature, battle-tested orm (20+ years)
- async support since v1.4
- type hints with
Mapped[]
syntax - supports postgresql, mysql, sqlite, and more
- integrates with pydantic for validation
when to use
use sqlalchemy when:
- need database persistence
- complex queries and relationships
- transactions and data integrity matter
- building production applications
- need database migrations (with alembic)
don’t use when:
- simple data containers suffice (use dataclasses)
- only need validation (use pydantic)
- working with nosql databases
- extreme performance requirements (use raw sql)
installation
# core with async
uv add 'sqlalchemy[asyncio]'
# database drivers
uv add asyncpg # postgresql
uv add aiosqlite # sqlite
uv add aiomysql # mysql
# optional: migration tool
uv add alembic
basic setup
async engine and base
from typing import Optional, List
from datetime import datetime
from sqlalchemy import String, ForeignKey, select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, AsyncAttrs
# critical: include AsyncAttrs for lazy loading support
class Base(AsyncAttrs, DeclarativeBase):
pass
# postgresql
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
echo=True, # log sql statements
pool_pre_ping=True, # verify connections
pool_size=5,
max_overflow=10
)
# sqlite (for development)
# engine = create_async_engine("sqlite+aiosqlite:///./app.db", echo=True)
async_session = async_sessionmaker(engine, expire_on_commit=False)
basic model
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
name: Mapped[str] = mapped_column(String(100))
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
updated_at: Mapped[Optional[datetime]] = mapped_column(onupdate=datetime.utcnow)
# relationships - always specify loading strategy
posts: Mapped[List["Post"]] = relationship(
back_populates="author",
lazy="selectin" # prevent greenlet errors
)
async patterns
context manager pattern
async def create_user(email: str, name: str) -> User:
async with async_session() as session:
async with session.begin(): # auto-commit on success
user = User(email=email, name=name)
session.add(user)
# no explicit commit needed
return user
# explicit transaction control
async def transfer_funds(from_id: int, to_id: int, amount: float):
async with async_session() as session:
try:
async with session.begin():
from_account = await session.get(Account, from_id)
to_account = await session.get(Account, to_id)
from_account.balance -= amount
to_account.balance += amount
# auto-commits if no exception
except Exception:
# auto-rollback on exception
raise
preventing greenlet errors
from sqlalchemy.orm import selectinload, joinedload
# wrong - causes MissingGreenlet error
async def get_user_wrong(session: AsyncSession, user_id: int):
user = await session.get(User, user_id)
posts = user.posts # error: lazy loading in async context
# correct - eager load relationships
async def get_user_with_posts(session: AsyncSession, user_id: int):
stmt = (
select(User)
.where(User.id == user_id)
.options(selectinload(User.posts)) # eager load
)
result = await session.execute(stmt)
return result.scalar_one_or_none()
relationships
one-to-many
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
content: Mapped[str] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# back reference
author: Mapped["User"] = relationship(
back_populates="posts",
lazy="joined" # always join
)
many-to-many
from sqlalchemy import Table
# association table
user_role = Table(
"user_roles",
Base.metadata,
mapped_column("user_id", ForeignKey("users.id"), primary_key=True),
mapped_column("role_id", ForeignKey("roles.id"), primary_key=True),
)
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(100), unique=True)
roles: Mapped[List["Role"]] = relationship(
secondary=user_role,
back_populates="users",
lazy="selectin"
)
class Role(Base):
__tablename__ = "roles"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
users: Mapped[List["User"]] = relationship(
secondary=user_role,
back_populates="roles",
lazy="selectin"
)
query patterns
basic queries
from sqlalchemy import select, and_, or_, func
async def query_examples(session: AsyncSession):
# select with where
stmt = select(User).where(User.email == "test@example.com")
result = await session.execute(stmt)
user = result.scalar_one_or_none()
# multiple conditions
stmt = select(User).where(
and_(
User.created_at > datetime(2023, 1, 1),
or_(
User.email.like("%@company.com"),
User.is_admin == True
)
)
)
# join with eager loading
stmt = (
select(User)
.join(User.posts)
.where(Post.published == True)
.options(contains_eager(User.posts))
)
# aggregation
stmt = (
select(User.id, func.count(Post.id).label("post_count"))
.join(Post)
.group_by(User.id)
.having(func.count(Post.id) > 5)
)
pagination
async def get_users_paginated(
session: AsyncSession,
page: int = 1,
per_page: int = 20
) -> tuple[List[User], int]:
# count total
count_stmt = select(func.count()).select_from(User)
total = await session.scalar(count_stmt)
# get page
stmt = (
select(User)
.offset((page - 1) * per_page)
.limit(per_page)
)
result = await session.execute(stmt)
users = result.scalars().all()
return users, total
database types
json/jsonb (postgresql)
from sqlalchemy import JSON
from sqlalchemy.dialects.postgresql import JSONB
class Document(Base):
__tablename__ = "documents"
id: Mapped[int] = mapped_column(primary_key=True)
# portable json
metadata: Mapped[dict] = mapped_column(
JSON().with_variant(JSONB, "postgresql")
)
# json queries (postgresql)
# await session.execute(
# select(Document).where(
# Document.metadata["type"] == "report"
# )
# )
uuid support
import uuid
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
class User(Base):
__tablename__ = "users"
# postgresql: native uuid, sqlite: char(36)
id: Mapped[uuid.UUID] = mapped_column(
PG_UUID(as_uuid=True).with_variant(String(36), "sqlite"),
primary_key=True,
default=uuid.uuid4
)
integration with pydantic
from pydantic import BaseModel, ConfigDict
# pydantic model for validation
class UserCreate(BaseModel):
email: str
name: str
class UserResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
email: str
name: str
created_at: datetime
# service layer
async def create_user(session: AsyncSession, user_data: UserCreate) -> UserResponse:
user = User(**user_data.model_dump())
session.add(user)
await session.commit()
await session.refresh(user)
return UserResponse.model_validate(user)
performance optimization
bulk operations
async def bulk_insert_users(users_data: List[dict]):
async with async_session() as session:
async with session.begin():
# use bulk_insert_mappings for performance
await session.run_sync(
lambda sync_session: sync_session.bulk_insert_mappings(
User, users_data
)
)
# modern insert with returning
from sqlalchemy import insert
async def bulk_create_with_returning(users_data: List[dict]):
async with async_session() as session:
stmt = insert(User).values(users_data).returning(User)
result = await session.execute(stmt)
await session.commit()
return result.scalars().all()
streaming results
async def process_large_dataset():
async with async_session() as session:
stmt = select(User).execution_options(yield_per=100)
# stream results instead of loading all into memory
async with session.stream(stmt) as result:
async for user in result.scalars():
await process_user(user)
hybrid properties
from sqlalchemy.ext.hybrid import hybrid_property
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
quantity: Mapped[int]
unit_price: Mapped[float]
@hybrid_property
def total_price(self) -> float:
return self.quantity * self.unit_price
@total_price.expression
def total_price(cls):
return cls.quantity * cls.unit_price
# use in queries
# select(Order).where(Order.total_price > 100)
migrations with alembic
# initialize alembic
uv run alembic init migrations
# create migration
uv run alembic revision --autogenerate -m "add users table"
# apply migrations
uv run alembic upgrade head
# rollback
uv run alembic downgrade -1
testing
import pytest
from sqlalchemy.pool import NullPool
@pytest.fixture
async def test_session():
# use in-memory sqlite for tests
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
poolclass=NullPool,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async_session = async_sessionmaker(engine, expire_on_commit=False)
async with async_session() as session:
yield session
await engine.dispose()
async def test_create_user(test_session: AsyncSession):
user = User(email="test@example.com", name="test user")
test_session.add(user)
await test_session.commit()
result = await test_session.get(User, user.id)
assert result.email == "test@example.com"
comparison with alternatives
feature | dataclasses | pydantic | sqlalchemy |
---|---|---|---|
persistence | ✗ | ✗ | ✓ |
relationships | ✗ | manual | automatic |
queries | ✗ | ✗ | full sql |
transactions | ✗ | ✗ | ✓ |
migrations | ✗ | ✗ | ✓ (alembic) |
complexity | minimal | moderate | high |
best practices
do
- use
AsyncAttrs
in your base class - set
expire_on_commit=False
for async sessions - always specify relationship loading strategy
- use
selectinload()
for collections - handle database operations in try/except blocks
- use connection pooling appropriately
- test with both sqlite and postgresql
don’t
- access unloaded relationships outside async context
- mix sync and async operations without
run_sync()
- forget to dispose engines in tests
- use implicit transactions for complex operations
- ignore database-specific features when needed
common pitfalls
greenlet errors
# wrong
async def bad_pattern(session: AsyncSession):
users = await session.execute(select(User))
for user in users.scalars():
print(user.posts) # MissingGreenlet error!
# correct
async def good_pattern(session: AsyncSession):
stmt = select(User).options(selectinload(User.posts))
users = await session.execute(stmt)
for user in users.scalars():
print(user.posts) # works!
session scope
# wrong - session used outside context
async def get_user(user_id: int):
async with async_session() as session:
return await session.get(User, user_id)
# user is detached here!
# correct - process within session
async def get_user_dict(user_id: int) -> dict:
async with async_session() as session:
user = await session.get(User, user_id)
return {"id": user.id, "email": user.email}
complete example
from datetime import datetime
from typing import List, Optional
from sqlalchemy import String, select, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession, AsyncAttrs
from pydantic import BaseModel, ConfigDict
# models
class Base(AsyncAttrs, DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(100), unique=True, index=True)
name: Mapped[str]
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
posts: Mapped[List["Post"]] = relationship(
back_populates="author",
lazy="selectin",
cascade="all, delete-orphan"
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
author: Mapped["User"] = relationship(back_populates="posts")
# pydantic schemas
class UserCreate(BaseModel):
email: str
name: str
class UserResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
email: str
name: str
post_count: int = 0
# service
class UserService:
def __init__(self, session: AsyncSession):
self.session = session
async def create_user(self, user_data: UserCreate) -> User:
user = User(**user_data.model_dump())
self.session.add(user)
await self.session.commit()
await self.session.refresh(user)
return user
async def get_users_with_post_count(self) -> List[UserResponse]:
stmt = (
select(
User,
func.count(Post.id).label("post_count")
)
.outerjoin(Post)
.group_by(User.id)
)
result = await self.session.execute(stmt)
users = []
for user, post_count in result:
user_dict = {
"id": user.id,
"email": user.email,
"name": user.name,
"post_count": post_count or 0
}
users.append(UserResponse(**user_dict))
return users
# usage
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
async_session = async_sessionmaker(engine, expire_on_commit=False)
async def main():
async with async_session() as session:
service = UserService(session)
# create user
user_data = UserCreate(email="alice@example.com", name="alice")
user = await service.create_user(user_data)
# get users with counts
users = await service.get_users_with_post_count()
references
══════════════════════════════════════════════════════════════════