Опирается на правила:
R-SQLA-QRY-1…R-SQLA-QRY-5иR-SQLA-QRY-X1…R-SQLA-QRY-X4из SQLAlchemy Style Guide → раздел 5. Запросы.
Важно знать
- Только SQLAlchemy 2.0-style:
select(Model)+await session.execute(...).session.query(...)— запрещён в новом коде.- В async ленивая загрузка не работает.
lazy="raise"на всехrelationshipпо умолчанию; там, где нужны связи — явныйselectinloadилиjoinedloadв запросе.session.execute(select(...).where(...))возвращаетResult; вытаскиваем через.scalar_one_or_none(),.scalars().all(),.mappings().all()— в зависимости от того, что запрашивали.COUNT(*)— отдельным запросом;len(result.scalars().all())послеfetchall()безLIMITна большой таблице — запрещено.- Bulk-вставка —
session.execute(insert(Model), rows)илиsession.add_all(models), не цикл сsession.add+await session.flush()на каждой итерации.- Read-проекции (CQRS-запрос) — отдельный
<X>ViewRepository, возвращает Pydantic read-DTO, не полный агрегат.- Конкатенация строк в
text(f"... {x}")— SQL-injection. Параметры только через bind:text("... :x").bindparams(x=val)или Core-выражения.
SQLAlchemy 2.0 сменила первичный API запросов: session.query(...) из 1.x уступил место select(...) из sqlalchemy.future. В async-контексте это не просто предпочтение стиля — устаревший API в ряде случаев не работает с AsyncSession вообще. Статья разбирает пять классов ситуаций, которые покрывают R-SQLA-QRY-*: базовый select, eager-load связей, пагинацию, bulk-вставку и read-проекции.
select(...) + await session.execute
R-SQLA-QRY-1: единственный разрешённый стиль в новом коде.
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from adapters.out.persistence.models import OrderModel
from adapters.out.persistence.order_mapper import to_domain
from core.order.domain import Order
from uuid import UUID
class SqlAlchemyOrderRepository:
async def find_by_id(self, session: AsyncSession, order_id: UUID) -> Order | None:
result = await session.execute(
select(OrderModel).where(OrderModel.id == order_id)
)
model = result.scalar_one_or_none()
return to_domain(model) if model else None
session.execute(...) возвращает CursorResult. Из него берём нужное:
result.scalar_one_or_none()— один объект илиNone; бросаетMultipleResultsFound, если строк больше одной.result.scalar_one()— то же, но бросаетNoResultFoundпри отсутствии строки.result.scalars().all()— список объектов приselect(Model).result.mappings().all()— списокdict-подобных объектов приselect(col1, col2, ...).
Для сложных проекций с несколькими колонками удобно result.mappings():
from sqlalchemy import select, func
from adapters.out.persistence.models import ProductModel
class SqlAlchemyProductRepository:
async def find_active_with_stock(
self, session: AsyncSession
) -> list[dict]:
result = await session.execute(
select(
ProductModel.id,
ProductModel.name,
ProductModel.price,
ProductModel.stock_quantity,
).where(ProductModel.is_active.is_(True))
.order_by(ProductModel.name)
)
return result.mappings().all()
Eager-load связей — selectinload и joinedload
R-SQLA-QRY-2: SQLAlchemy async не поддерживает ленивую загрузку. Любое обращение к незагруженному relationship внутри async with session: вызывает MissingGreenlet. Поэтому на моделях — lazy="raise", а загрузку подключаем в запросе явно.
from sqlalchemy.orm import selectinload, joinedload
class SqlAlchemyOrderRepository:
async def find_by_id(self, session: AsyncSession, order_id: UUID) -> Order | None:
result = await session.execute(
select(OrderModel)
.options(selectinload(OrderModel.items))
.where(OrderModel.id == order_id)
)
model = result.scalar_one_or_none()
return to_domain(model) if model else None
async def list_confirmed(
self, session: AsyncSession, customer_id: UUID, limit: int, offset: int
) -> list[Order]:
result = await session.execute(
select(OrderModel)
.options(selectinload(OrderModel.items))
.where(
OrderModel.customer_id == customer_id,
OrderModel.status == "CONFIRMED",
)
.order_by(OrderModel.created_at.desc())
.limit(limit)
.offset(offset)
)
return [to_domain(m) for m in result.scalars().all()]
selectinload против joinedload.
selectinload генерирует второй SELECT ... WHERE order_id IN (...) — один запрос на всю коллекцию. Подходит для коллекций (list[OrderItemModel]): плоский результат без дублирования строк родителя.
joinedload делает LEFT JOIN в одном запросе. Подходит для many-to-one / one-to-one (загрузить Customer вместе с Order). Для коллекций генерирует дублирование строк: каждый Order с тремя Items вернёт три строки — SQLAlchemy дедуплицирует, но объём данных растёт.
# joinedload — для загрузки одной связанной сущности
result = await session.execute(
select(OrderModel)
.options(joinedload(OrderModel.customer))
.where(OrderModel.id == order_id)
)
model = result.unique().scalar_one_or_none()
При joinedload для коллекций нужен .unique() перед .scalar_one_or_none() / .scalars().all() — иначе SQLAlchemy вернёт дублирующиеся объекты. Для selectinload .unique() не нужен.
Вложенная загрузка через selectinload(A).selectinload(B):
select(OrderModel).options(
selectinload(OrderModel.items)
.selectinload(OrderItemModel.product)
)
Это два дополнительных SELECT (по одному уровню), не N запросов.
Пагинация — limit/offset и keyset
R-SQLA-QRY-4: два подхода; выбор зависит от требований.
Offset-пагинация — проще, подходит для большинства API с умеренным объёмом данных:
from sqlalchemy import select, func
class SqlAlchemyProductRepository:
async def list_paginated(
self, session: AsyncSession, limit: int, offset: int
) -> tuple[list[Product], int]:
items_result = await session.execute(
select(ProductModel)
.where(ProductModel.is_active.is_(True))
.order_by(ProductModel.created_at.desc())
.limit(limit)
.offset(offset)
)
count_result = await session.execute(
select(func.count()).select_from(ProductModel)
.where(ProductModel.is_active.is_(True))
)
total = count_result.scalar_one()
items = [to_domain(m) for m in items_result.scalars().all()]
return items, total
COUNT(*) — отдельным session.execute. Не len(session.execute(select(Model)).scalars().all()) без LIMIT — это загрузит весь результат в память.
Keyset-пагинация (cursor-based) — для больших таблиц или бесконечной прокрутки. Не страдает от дрейфа строк при вставке/удалении, не деградирует на высоких offset'ах:
from sqlalchemy import and_
class SqlAlchemyOrderRepository:
async def list_after_cursor(
self,
session: AsyncSession,
customer_id: UUID,
after_created_at: datetime | None,
after_id: UUID | None,
limit: int,
) -> list[Order]:
q = (
select(OrderModel)
.where(OrderModel.customer_id == customer_id)
.order_by(OrderModel.created_at.desc(), OrderModel.id.desc())
.limit(limit)
)
if after_created_at is not None and after_id is not None:
q = q.where(
and_(
OrderModel.created_at <= after_created_at,
(OrderModel.created_at < after_created_at)
| (OrderModel.id < after_id),
)
)
result = await session.execute(q)
return [to_domain(m) for m in result.scalars().all()]
Курсор — пара (created_at, id) — передаётся в следующий запрос как after_created_at + after_id. Комбинированное условие обеспечивает строгий порядок без пропусков и дубликатов.
Bulk-вставка
R-SQLA-QRY-3: не цикл с session.add() + flush() на каждой итерации — один execute с Core insert.
from sqlalchemy.dialects.postgresql import insert
class SqlAlchemyProductRepository:
async def bulk_create(self, session: AsyncSession, products: list[Product]) -> None:
rows = [
{
"id": p.id,
"name": p.name,
"price": p.price,
"stock_quantity": p.stock_quantity,
"is_active": p.is_active,
"created_at": p.created_at,
}
for p in products
]
await session.execute(insert(ProductModel), rows)
Для более высокой производительности PostgreSQL ON CONFLICT DO NOTHING / ON CONFLICT DO UPDATE:
from sqlalchemy.dialects.postgresql import insert as pg_insert
async def upsert_products(self, session: AsyncSession, products: list[Product]) -> None:
rows = [{"id": p.id, "name": p.name, "price": p.price} for p in products]
stmt = (
pg_insert(ProductModel)
.values(rows)
.on_conflict_do_update(
index_elements=["id"],
set_={"name": pg_insert(ProductModel).excluded.name,
"price": pg_insert(ProductModel).excluded.price},
)
)
await session.execute(stmt)
session.add_all(models) — альтернатива для небольших батчей, когда нужен ORM-трекинг объектов (например, чтобы сработали @event.listens_for хуки):
async def save_all(self, session: AsyncSession, orders: list[Order]) -> None:
models = [to_model(o) for o in orders]
session.add_all(models)
add_all выполнит INSERT при следующем flush. Для batch-сценариев без ORM-хуков execute(insert(...), rows) быстрее: один roundtrip против N.
Read-проекции через ViewRepository
R-SQLA-QRY-5: CQRS-запросы (список заказов для UI, сводка по клиенту) не грузят полный агрегат — возвращают плоский read-DTO через отдельный <X>ViewRepository.
# core/order/port/order_view_repository.py
from typing import Protocol
from core.order.dto import OrderSummary, CustomerOrderStats
class OrderViewRepository(Protocol):
async def list_summaries(
self, session: "AsyncSession", customer_id: UUID, limit: int, offset: int
) -> list[OrderSummary]: ...
async def stats_by_customer(
self, session: "AsyncSession", customer_id: UUID
) -> CustomerOrderStats | None: ...
# core/order/dto.py
from pydantic import BaseModel
from uuid import UUID
from decimal import Decimal
from datetime import datetime
class OrderSummary(BaseModel):
id: UUID
status: str
total: Decimal
item_count: int
created_at: datetime
class CustomerOrderStats(BaseModel):
customer_id: UUID
total_orders: int
total_spent: Decimal
# adapters/out/persistence/order_view_repository.py
from sqlalchemy import select, func
from adapters.out.persistence.models import OrderModel, OrderItemModel
from core.order.dto import OrderSummary, CustomerOrderStats
class SqlAlchemyOrderViewRepository:
async def list_summaries(
self, session: AsyncSession, customer_id: UUID, limit: int, offset: int
) -> list[OrderSummary]:
result = await session.execute(
select(
OrderModel.id,
OrderModel.status,
OrderModel.total_amount.label("total"),
func.count(OrderItemModel.id).label("item_count"),
OrderModel.created_at,
)
.join(OrderItemModel, OrderItemModel.order_id == OrderModel.id, isouter=True)
.where(OrderModel.customer_id == customer_id)
.group_by(OrderModel.id)
.order_by(OrderModel.created_at.desc())
.limit(limit)
.offset(offset)
)
return [OrderSummary.model_validate(dict(row)) for row in result.mappings().all()]
async def stats_by_customer(
self, session: AsyncSession, customer_id: UUID
) -> CustomerOrderStats | None:
result = await session.execute(
select(
OrderModel.customer_id,
func.count(OrderModel.id).label("total_orders"),
func.coalesce(func.sum(OrderModel.total_amount), 0).label("total_spent"),
)
.where(OrderModel.customer_id == customer_id)
.group_by(OrderModel.customer_id)
)
row = result.mappings().one_or_none()
return CustomerOrderStats.model_validate(dict(row)) if row else None
OrderSummary.model_validate(dict(row)) — Pydantic v2-style. result.mappings() возвращает RowMapping-объекты, которые ведут себя как dict, поэтому dict(row) работает без дополнительной конвертации.
ViewRepository не обязан реализовывать тот же Protocol, что OrderRepository — это независимый порт для независимого read-сценария. Handler-запроса (GetOrderSummariesQueryHandler) инжектирует OrderViewRepository, не OrderRepository.
Параметризация запросов
R-SQLA-QRY-X4: SQL-строки с конкатенацией переменных — SQL-injection. Core-выражения SQLAlchemy параметризуют автоматически; если нужен text() — только через bindparams.
from sqlalchemy import text
# ПЛОХО — SQL-injection
await session.execute(text(f"SELECT * FROM orders WHERE status = '{status}'"))
# ХОРОШО — bindparams
await session.execute(
text("SELECT * FROM orders WHERE status = :status").bindparams(status=status)
)
# ЛУЧШЕ — Core-выражение, без text() вообще
await session.execute(
select(OrderModel).where(OrderModel.status == status)
)
Core-выражения (OrderModel.status == status) компилируются в параметризованный SQL автоматически — никакого ручного экранирования не нужно. text() используем только для нестандартных конструкций PostgreSQL, которые SQLAlchemy Core ещё не умеет выражать (например, AT TIME ZONE в старых версиях, TABLESAMPLE).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
session.query(OrderModel).filter(...) (SQLAlchemy 1.x стиль) | R-SQLA-QRY-X1 | select(OrderModel).where(...) + await session.execute(...) |
lazy="select" (дефолт) на relationship в async | R-SQLA-QRY-X2 | lazy="raise" + явный selectinload/joinedload в запросе |
result.scalars().all() без .limit(...) на большой таблице | R-SQLA-QRY-X3 | Всегда .limit(n) или keyset; COUNT(*) отдельным запросом |
text(f"WHERE name = '{name}'") — конкатенация строк | R-SQLA-QRY-X4 | text("WHERE name = :name").bindparams(name=name) или Core-выражение |
len(result.scalars().all()) как подсчёт total | R-SQLA-QRY-4 | select(func.count()).select_from(Model).where(...) отдельным запросом |
Цикл session.add(model); await session.flush() для batch | R-SQLA-QRY-3 | session.execute(insert(Model), rows) или session.add_all(models) |
Полный агрегат (Order) в ответе query-handler'а | R-SQLA-QRY-5 | Отдельный ViewRepository, возвращает read-DTO через model_validate |
Куда дальше
- Repository pattern в SQLAlchemy — порт-Protocol, SqlAlchemy-реализация, маппер
to_domain/to_model, ORM-модели. - Транзакции в SQLAlchemy — Unit of Work на handler'е,
session.begin(), обработка ошибок TX,structlog-контекст по шагам. - PostgreSQL: ACID и уровни изоляции — когда нужен
REPEATABLE_READ, retry на40001, write skew.