Опирается на правила: R-SQLA-QRY-1R-SQLA-QRY-5 и R-SQLA-QRY-X1R-SQLA-QRY-X4 из SQLAlchemy Style Guide → раздел 5. Запросы.

Важно знать

  • Только SQLAlchemy 2.0-style: select(Model) + await session.execute(...). session.query(...) — запрещён в новом коде.
  • В async ленивая загрузка не работает. lazy="raise" на всех relationship по умолчанию; там, где нужны связи — явный selectinload или joinedload в запросе.
  • session.execute(select(...).where(...)) возвращает Result; вытаскиваем через .scalar_one_or_none(), .scalars().all(), .mappings().all() — в зависимости от того, что запрашивали.
  • COUNT(*) — отдельным запросом; len(result.scalars().all()) после fetchall() без LIMIT на большой таблице — запрещено.
  • Bulk-вставка — session.execute(insert(Model), rows) или session.add_all(models), не цикл с session.add + await session.flush() на каждой итерации.
  • Read-проекции (CQRS-запрос) — отдельный <X>ViewRepository, возвращает Pydantic read-DTO, не полный агрегат.
  • Конкатенация строк в text(f"... {x}") — SQL-injection. Параметры только через bind: text("... :x").bindparams(x=val) или Core-выражения.

SQLAlchemy 2.0 сменила первичный API запросов: session.query(...) из 1.x уступил место select(...) из sqlalchemy.future. В async-контексте это не просто предпочтение стиля — устаревший API в ряде случаев не работает с AsyncSession вообще. Статья разбирает пять классов ситуаций, которые покрывают R-SQLA-QRY-*: базовый select, eager-load связей, пагинацию, bulk-вставку и read-проекции.

select(...) + await session.execute

R-SQLA-QRY-1: единственный разрешённый стиль в новом коде.

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from adapters.out.persistence.models import OrderModel
from adapters.out.persistence.order_mapper import to_domain
from core.order.domain import Order
from uuid import UUID

class SqlAlchemyOrderRepository:
    async def find_by_id(self, session: AsyncSession, order_id: UUID) -> Order | None:
        result = await session.execute(
            select(OrderModel).where(OrderModel.id == order_id)
        )
        model = result.scalar_one_or_none()
        return to_domain(model) if model else None

session.execute(...) возвращает CursorResult. Из него берём нужное:

  • result.scalar_one_or_none() — один объект или None; бросает MultipleResultsFound, если строк больше одной.
  • result.scalar_one() — то же, но бросает NoResultFound при отсутствии строки.
  • result.scalars().all() — список объектов при select(Model).
  • result.mappings().all() — список dict-подобных объектов при select(col1, col2, ...).

Для сложных проекций с несколькими колонками удобно result.mappings():

from sqlalchemy import select, func
from adapters.out.persistence.models import ProductModel

class SqlAlchemyProductRepository:
    async def find_active_with_stock(
        self, session: AsyncSession
    ) -> list[dict]:
        result = await session.execute(
            select(
                ProductModel.id,
                ProductModel.name,
                ProductModel.price,
                ProductModel.stock_quantity,
            ).where(ProductModel.is_active.is_(True))
            .order_by(ProductModel.name)
        )
        return result.mappings().all()

Eager-load связей — selectinload и joinedload

R-SQLA-QRY-2: SQLAlchemy async не поддерживает ленивую загрузку. Любое обращение к незагруженному relationship внутри async with session: вызывает MissingGreenlet. Поэтому на моделях — lazy="raise", а загрузку подключаем в запросе явно.

from sqlalchemy.orm import selectinload, joinedload

class SqlAlchemyOrderRepository:
    async def find_by_id(self, session: AsyncSession, order_id: UUID) -> Order | None:
        result = await session.execute(
            select(OrderModel)
            .options(selectinload(OrderModel.items))
            .where(OrderModel.id == order_id)
        )
        model = result.scalar_one_or_none()
        return to_domain(model) if model else None

    async def list_confirmed(
        self, session: AsyncSession, customer_id: UUID, limit: int, offset: int
    ) -> list[Order]:
        result = await session.execute(
            select(OrderModel)
            .options(selectinload(OrderModel.items))
            .where(
                OrderModel.customer_id == customer_id,
                OrderModel.status == "CONFIRMED",
            )
            .order_by(OrderModel.created_at.desc())
            .limit(limit)
            .offset(offset)
        )
        return [to_domain(m) for m in result.scalars().all()]

selectinload против joinedload.

selectinload генерирует второй SELECT ... WHERE order_id IN (...) — один запрос на всю коллекцию. Подходит для коллекций (list[OrderItemModel]): плоский результат без дублирования строк родителя.

joinedload делает LEFT JOIN в одном запросе. Подходит для many-to-one / one-to-one (загрузить Customer вместе с Order). Для коллекций генерирует дублирование строк: каждый Order с тремя Items вернёт три строки — SQLAlchemy дедуплицирует, но объём данных растёт.

# joinedload — для загрузки одной связанной сущности
result = await session.execute(
    select(OrderModel)
    .options(joinedload(OrderModel.customer))
    .where(OrderModel.id == order_id)
)
model = result.unique().scalar_one_or_none()

При joinedload для коллекций нужен .unique() перед .scalar_one_or_none() / .scalars().all() — иначе SQLAlchemy вернёт дублирующиеся объекты. Для selectinload .unique() не нужен.

Вложенная загрузка через selectinload(A).selectinload(B):

select(OrderModel).options(
    selectinload(OrderModel.items)
    .selectinload(OrderItemModel.product)
)

Это два дополнительных SELECT (по одному уровню), не N запросов.

Пагинация — limit/offset и keyset

R-SQLA-QRY-4: два подхода; выбор зависит от требований.

Offset-пагинация — проще, подходит для большинства API с умеренным объёмом данных:

from sqlalchemy import select, func

class SqlAlchemyProductRepository:
    async def list_paginated(
        self, session: AsyncSession, limit: int, offset: int
    ) -> tuple[list[Product], int]:
        items_result = await session.execute(
            select(ProductModel)
            .where(ProductModel.is_active.is_(True))
            .order_by(ProductModel.created_at.desc())
            .limit(limit)
            .offset(offset)
        )
        count_result = await session.execute(
            select(func.count()).select_from(ProductModel)
            .where(ProductModel.is_active.is_(True))
        )
        total = count_result.scalar_one()
        items = [to_domain(m) for m in items_result.scalars().all()]
        return items, total

COUNT(*)отдельным session.execute. Не len(session.execute(select(Model)).scalars().all()) без LIMIT — это загрузит весь результат в память.

Keyset-пагинация (cursor-based) — для больших таблиц или бесконечной прокрутки. Не страдает от дрейфа строк при вставке/удалении, не деградирует на высоких offset'ах:

from sqlalchemy import and_

class SqlAlchemyOrderRepository:
    async def list_after_cursor(
        self,
        session: AsyncSession,
        customer_id: UUID,
        after_created_at: datetime | None,
        after_id: UUID | None,
        limit: int,
    ) -> list[Order]:
        q = (
            select(OrderModel)
            .where(OrderModel.customer_id == customer_id)
            .order_by(OrderModel.created_at.desc(), OrderModel.id.desc())
            .limit(limit)
        )
        if after_created_at is not None and after_id is not None:
            q = q.where(
                and_(
                    OrderModel.created_at <= after_created_at,
                    (OrderModel.created_at < after_created_at)
                    | (OrderModel.id < after_id),
                )
            )
        result = await session.execute(q)
        return [to_domain(m) for m in result.scalars().all()]

Курсор — пара (created_at, id) — передаётся в следующий запрос как after_created_at + after_id. Комбинированное условие обеспечивает строгий порядок без пропусков и дубликатов.

Bulk-вставка

R-SQLA-QRY-3: не цикл с session.add() + flush() на каждой итерации — один execute с Core insert.

from sqlalchemy.dialects.postgresql import insert

class SqlAlchemyProductRepository:
    async def bulk_create(self, session: AsyncSession, products: list[Product]) -> None:
        rows = [
            {
                "id": p.id,
                "name": p.name,
                "price": p.price,
                "stock_quantity": p.stock_quantity,
                "is_active": p.is_active,
                "created_at": p.created_at,
            }
            for p in products
        ]
        await session.execute(insert(ProductModel), rows)

Для более высокой производительности PostgreSQL ON CONFLICT DO NOTHING / ON CONFLICT DO UPDATE:

from sqlalchemy.dialects.postgresql import insert as pg_insert

async def upsert_products(self, session: AsyncSession, products: list[Product]) -> None:
    rows = [{"id": p.id, "name": p.name, "price": p.price} for p in products]
    stmt = (
        pg_insert(ProductModel)
        .values(rows)
        .on_conflict_do_update(
            index_elements=["id"],
            set_={"name": pg_insert(ProductModel).excluded.name,
                  "price": pg_insert(ProductModel).excluded.price},
        )
    )
    await session.execute(stmt)

session.add_all(models) — альтернатива для небольших батчей, когда нужен ORM-трекинг объектов (например, чтобы сработали @event.listens_for хуки):

async def save_all(self, session: AsyncSession, orders: list[Order]) -> None:
    models = [to_model(o) for o in orders]
    session.add_all(models)

add_all выполнит INSERT при следующем flush. Для batch-сценариев без ORM-хуков execute(insert(...), rows) быстрее: один roundtrip против N.

Read-проекции через ViewRepository

R-SQLA-QRY-5: CQRS-запросы (список заказов для UI, сводка по клиенту) не грузят полный агрегат — возвращают плоский read-DTO через отдельный <X>ViewRepository.

# core/order/port/order_view_repository.py
from typing import Protocol
from core.order.dto import OrderSummary, CustomerOrderStats

class OrderViewRepository(Protocol):
    async def list_summaries(
        self, session: "AsyncSession", customer_id: UUID, limit: int, offset: int
    ) -> list[OrderSummary]: ...

    async def stats_by_customer(
        self, session: "AsyncSession", customer_id: UUID
    ) -> CustomerOrderStats | None: ...
# core/order/dto.py
from pydantic import BaseModel
from uuid import UUID
from decimal import Decimal
from datetime import datetime

class OrderSummary(BaseModel):
    id: UUID
    status: str
    total: Decimal
    item_count: int
    created_at: datetime

class CustomerOrderStats(BaseModel):
    customer_id: UUID
    total_orders: int
    total_spent: Decimal
# adapters/out/persistence/order_view_repository.py
from sqlalchemy import select, func
from adapters.out.persistence.models import OrderModel, OrderItemModel
from core.order.dto import OrderSummary, CustomerOrderStats

class SqlAlchemyOrderViewRepository:
    async def list_summaries(
        self, session: AsyncSession, customer_id: UUID, limit: int, offset: int
    ) -> list[OrderSummary]:
        result = await session.execute(
            select(
                OrderModel.id,
                OrderModel.status,
                OrderModel.total_amount.label("total"),
                func.count(OrderItemModel.id).label("item_count"),
                OrderModel.created_at,
            )
            .join(OrderItemModel, OrderItemModel.order_id == OrderModel.id, isouter=True)
            .where(OrderModel.customer_id == customer_id)
            .group_by(OrderModel.id)
            .order_by(OrderModel.created_at.desc())
            .limit(limit)
            .offset(offset)
        )
        return [OrderSummary.model_validate(dict(row)) for row in result.mappings().all()]

    async def stats_by_customer(
        self, session: AsyncSession, customer_id: UUID
    ) -> CustomerOrderStats | None:
        result = await session.execute(
            select(
                OrderModel.customer_id,
                func.count(OrderModel.id).label("total_orders"),
                func.coalesce(func.sum(OrderModel.total_amount), 0).label("total_spent"),
            )
            .where(OrderModel.customer_id == customer_id)
            .group_by(OrderModel.customer_id)
        )
        row = result.mappings().one_or_none()
        return CustomerOrderStats.model_validate(dict(row)) if row else None

OrderSummary.model_validate(dict(row)) — Pydantic v2-style. result.mappings() возвращает RowMapping-объекты, которые ведут себя как dict, поэтому dict(row) работает без дополнительной конвертации.

ViewRepository не обязан реализовывать тот же Protocol, что OrderRepository — это независимый порт для независимого read-сценария. Handler-запроса (GetOrderSummariesQueryHandler) инжектирует OrderViewRepository, не OrderRepository.

Параметризация запросов

R-SQLA-QRY-X4: SQL-строки с конкатенацией переменных — SQL-injection. Core-выражения SQLAlchemy параметризуют автоматически; если нужен text() — только через bindparams.

from sqlalchemy import text

# ПЛОХО — SQL-injection
await session.execute(text(f"SELECT * FROM orders WHERE status = '{status}'"))

# ХОРОШО — bindparams
await session.execute(
    text("SELECT * FROM orders WHERE status = :status").bindparams(status=status)
)

# ЛУЧШЕ — Core-выражение, без text() вообще
await session.execute(
    select(OrderModel).where(OrderModel.status == status)
)

Core-выражения (OrderModel.status == status) компилируются в параметризованный SQL автоматически — никакого ручного экранирования не нужно. text() используем только для нестандартных конструкций PostgreSQL, которые SQLAlchemy Core ещё не умеет выражать (например, AT TIME ZONE в старых версиях, TABLESAMPLE).

Что запрещено

АнтипаттернПравилоЧто взамен
session.query(OrderModel).filter(...) (SQLAlchemy 1.x стиль)R-SQLA-QRY-X1select(OrderModel).where(...) + await session.execute(...)
lazy="select" (дефолт) на relationship в asyncR-SQLA-QRY-X2lazy="raise" + явный selectinload/joinedload в запросе
result.scalars().all() без .limit(...) на большой таблицеR-SQLA-QRY-X3Всегда .limit(n) или keyset; COUNT(*) отдельным запросом
text(f"WHERE name = '{name}'") — конкатенация строкR-SQLA-QRY-X4text("WHERE name = :name").bindparams(name=name) или Core-выражение
len(result.scalars().all()) как подсчёт totalR-SQLA-QRY-4select(func.count()).select_from(Model).where(...) отдельным запросом
Цикл session.add(model); await session.flush() для batchR-SQLA-QRY-3session.execute(insert(Model), rows) или session.add_all(models)
Полный агрегат (Order) в ответе query-handler'аR-SQLA-QRY-5Отдельный ViewRepository, возвращает read-DTO через model_validate

Куда дальше

  • Repository pattern в SQLAlchemy — порт-Protocol, SqlAlchemy-реализация, маппер to_domain/to_model, ORM-модели.
  • Транзакции в SQLAlchemy — Unit of Work на handler'е, session.begin(), обработка ошибок TX, structlog-контекст по шагам.
  • PostgreSQL: ACID и уровни изоляции — когда нужен REPEATABLE_READ, retry на 40001, write skew.