Опирается на правила: AUTH-10AUTH-12 из Auth Patterns → раздел 4. ABAC: владение ресурсом.

Важно знать

  • ABAC обязателен для каждой команды/запроса, работающего с агрегатом по id.
  • Два способа: AccessPolicy-класс (для простых ownership-проверок) или проверка внутри Handler (для составной логики).
  • ABAC-логика живёт в AccessPolicy или Handler — не размазывается по роутерам (APIRouter).
  • Роль admin обходит ABAC (полный доступ), но каждое действие фиксируется в audit log (AUTH-15).
  • RBAC-only endpoint без ABAC — классический IDOR (Insecure Direct Object Reference): любой holder JWT с нужной ролью читает чужие ресурсы.
  • GET /orders/{id} без ownership-check позволяет customer-42 читать заказы customer-99.
  • principal.sub — строка (UUID / числовой id из JWT), сравнение с order.customer_id — строка к строке или привести типы явно.
  • Один из двух способов, никогда оба одновременно — дублирование расходится при изменении бизнес-правил.

ABAC (Attribute-Based Access Control) — второй слой авторизации. RBAC говорит «роль customer может вызывать GET /orders/{id}», ABAC говорит «но этот customer может читать только свои orders». Без ABAC RBAC-endpoint открыт для горизонтального перебора.

Два способа

AUTH-10: AccessPolicy-класс или handler-check.

Способ 1: AccessPolicy для простых случаев

AccessPolicy — обычный класс-сервис, внедряется через Depends в Handler или router. Содержит методы can_view_order, can_edit_order и т.д. — единая точка ownership-логики по агрегату.

# adapters/in/http/security.py — уже описан в jwt-validation
from dataclasses import dataclass

@dataclass(frozen=True)
class Principal:
    sub: str
    roles: list[str]

    def is_admin(self) -> bool:
        return "admin" in self.roles
# application/access_policy.py
from fastapi import Depends, HTTPException
from adapters.in.http.security import Principal, principal as get_principal
from domain.order.order_repository import OrderRepository


class OrderAccessPolicy:
    def __init__(
        self,
        repo: OrderRepository = Depends(),
        p: Principal = Depends(get_principal),
    ):
        self._repo = repo
        self._principal = p

    def _load(self, order_id: str):
        order = self._repo.find_by_id(order_id)
        if order is None:
            raise HTTPException(status_code=404, detail="not found")
        return order

    def can_view(self, order_id: str):
        order = self._load(order_id)
        if not self._principal.is_admin() and order.customer_id != self._principal.sub:
            raise HTTPException(status_code=403, detail="forbidden")
        return order

    def can_edit(self, order_id: str):
        return self.can_view(order_id)
# adapters/in/http/order_router.py
from fastapi import APIRouter, Depends
from application.access_policy import OrderAccessPolicy
from adapters.in.http.security import require_roles

router = APIRouter(prefix="/orders")


@router.get("/{order_id}")
async def get_order(
    order_id: str,
    policy: OrderAccessPolicy = Depends(),
    _: None = Depends(require_roles("customer", "admin")),
):
    order = policy.can_view(order_id)
    return OrderResponse.from_domain(order)

admin ветка внутри can_view обходит ownership — handler-check не нужен.

Способ 2: проверка внутри Handler

Для write-команд, где нужен SELECT FOR UPDATE + проверка состояния, ABAC размещается в Handler — он же владеет транзакцией и загружает агрегат.

# application/cancel_order_handler.py
from dataclasses import dataclass
from adapters.in.http.security import Principal
from domain.order.order_repository import OrderRepository
from domain.order.errors import OrderNotFound, OrderCannotBeCancelled
from application.errors import ForbiddenError
from application.audit_log import AuditLog, AdminAction
from datetime import datetime, timezone


@dataclass(frozen=True)
class CancelOrderCommand:
    order_id: str
    principal: Principal


class CancelOrderHandler:
    def __init__(
        self,
        repo: OrderRepository,
        audit_log: AuditLog,
    ):
        self._repo = repo
        self._audit_log = audit_log

    def handle(self, cmd: CancelOrderCommand) -> Order:
        order = self._repo.find_by_id_for_update(cmd.order_id)
        if order is None:
            raise OrderNotFound(cmd.order_id)

        if not cmd.principal.is_admin() and order.customer_id != cmd.principal.sub:
            raise ForbiddenError("order does not belong to current user")

        if not order.can_cancel():
            raise OrderCannotBeCancelled(order.id, order.status)

        order.cancel()
        saved = self._repo.save(order)

        if cmd.principal.is_admin():
            self._audit_log.record(AdminAction(
                actor_id=cmd.principal.sub,
                action="cancel-order",
                resource_type="Order",
                resource_id=order.id,
                occurred_at=datetime.now(timezone.utc),
                metadata={"previous_status": order.previous_status},
            ))

        return saved
# adapters/in/http/order_router.py (фрагмент)
from adapters.in.http.security import principal as get_principal, require_roles

@router.post("/{order_id}/cancel")
async def cancel_order(
    order_id: str,
    p: Principal = Depends(get_principal),
    _: None = Depends(require_roles("customer", "admin")),
    handler: CancelOrderHandler = Depends(),
):
    return handler.handle(CancelOrderCommand(order_id=order_id, principal=p))

Handler держит всю логику: загрузка с блокировкой → ABAC → бизнес-правило → сохранение → audit.

Когда какой способ

СлучайСпособ
Простая ownership: order.customer_id == principal.subAccessPolicy
Read endpoints без модификацииAccessPolicy
Write с FOR UPDATE + проверка состоянияHandler-check
Составная проверка: ownership + статус + бизнес-правилоHandler-check
Multi-aggregate: admin or seller связанного продуктаHandler-check

ABAC централизована, не размазана

AUTH-11: ownership-логика в одном месте.

Корректно:

# OrderAccessPolicy.can_view — единая точка для всех эндпоинтов Order

Неверно:

# ПЛОХО — ABAC размазан по роутерам
@router.get("/{order_id}")
async def get_order(order_id: str, p: Principal = Depends(get_principal)):
    order = repo.find_by_id(order_id)
    if order.customer_id != p.sub:          # дублируется в каждом эндпоинте
        raise HTTPException(status_code=403)
    return order

@router.post("/{order_id}/cancel")
async def cancel_order(order_id: str, p: Principal = Depends(get_principal)):
    order = repo.find_by_id(order_id)
    if order.customer_id != p.sub:          # ещё раз
        raise HTTPException(status_code=403)
    ...

Проблемы: логика дублируется в каждом endpoint; при смене модели (co-ownership, совладелец) надо обновлять N мест; роутер делает бизнес-проверку — нарушение thin-router.

Admin обходит ABAC + audit

AUTH-12: admin может всё, но след остаётся.

Пример для запроса Product из каталога Sber-маркета:

# application/get_product_handler.py
@dataclass(frozen=True)
class GetProductQuery:
    product_id: str
    principal: Principal


class GetProductHandler:
    def __init__(self, repo: ProductRepository, audit_log: AuditLog):
        self._repo = repo
        self._audit_log = audit_log

    def handle(self, query: GetProductQuery) -> Product:
        product = self._repo.find_by_id(query.product_id)
        if product is None:
            raise ProductNotFound(query.product_id)

        if not query.principal.is_admin() and product.seller_id != query.principal.sub:
            raise ForbiddenError("product does not belong to current seller")

        if query.principal.is_admin():
            self._audit_log.record(AdminAction(
                actor_id=query.principal.sub,
                action="view-product",
                resource_type="Product",
                resource_id=product.id,
                occurred_at=datetime.now(timezone.utc),
                metadata={"seller_id": product.seller_id},
            ))

        return product

Admin может посмотреть любой продукт продавца (compliance, support), но в product_audit_log остаётся запись: actor_id=admin-7, action=view-product, resource_id=prod-555.

Подробнее — Audit admin-команд.

Что запрещено

АнтипаттернПравилоЧто взамен
RBAC-endpoint без ABAC для own-resourceAUTH-10оба слоя обязательны
ABAC-проверка inline в роутереAUTH-11AccessPolicy или Handler
Дубликат ABAC в AccessPolicy + Handler одновременноAUTH-11один из способов
Admin без записи в audit log при обходе ABACAUTH-12audit_log.record(...) обязателен
Проверка только customer_id == sub, без ветки is_admin()AUTH-12is_admin() or ownership
HTTPException(403) вместо typed ForbiddenErrorAUTH-6доменная ошибка → маппер → 403
ABAC игнорирует deleted/inactive ресурсыAUTH-10проверять статус до ownership
principal.sub сравнивается с int без приведенияAUTH-10явное приведение типов

Куда дальше

  • RBAC: маппинг ролей — слой до ABAC.
  • Где какая проверка — ABAC в Domain Service, не в Gateway.
  • Audit admin-команд — обязательная пара к admin-override.
  • JWT validation — Principal и require_roles из JWKS.
  • PII и секреты — что не должно попасть в логи и события.
  • Service-to-service — mTLS и Client Credentials для inter-service ABAC.
  • Идемпотентность — Idempotency-Key для money-команд.
  • Хранение токенов на клиенте — HttpOnly cookie, RT rotation.