Опирается на правила:
AUTH-10…AUTH-12из Auth Patterns → раздел 4. ABAC: владение ресурсом.
Важно знать
- ABAC обязателен для каждой команды/запроса, работающего с агрегатом по id.
- Два способа:
AccessPolicy-класс (для простых ownership-проверок) или проверка внутри Handler (для составной логики).- ABAC-логика живёт в
AccessPolicyили Handler — не размазывается по роутерам (APIRouter).- Роль
adminобходит ABAC (полный доступ), но каждое действие фиксируется в audit log (AUTH-15).- RBAC-only endpoint без ABAC — классический IDOR (Insecure Direct Object Reference): любой holder JWT с нужной ролью читает чужие ресурсы.
GET /orders/{id}без ownership-check позволяет customer-42 читать заказы customer-99.principal.sub— строка (UUID / числовой id из JWT), сравнение сorder.customer_id— строка к строке или привести типы явно.- Один из двух способов, никогда оба одновременно — дублирование расходится при изменении бизнес-правил.
ABAC (Attribute-Based Access Control) — второй слой авторизации. RBAC говорит «роль customer может вызывать GET /orders/{id}», ABAC говорит «но этот customer может читать только свои orders». Без ABAC RBAC-endpoint открыт для горизонтального перебора.
Два способа
AUTH-10: AccessPolicy-класс или handler-check.
Способ 1: AccessPolicy для простых случаев
AccessPolicy — обычный класс-сервис, внедряется через Depends в Handler или router. Содержит методы can_view_order, can_edit_order и т.д. — единая точка ownership-логики по агрегату.
# adapters/in/http/security.py — уже описан в jwt-validation
from dataclasses import dataclass
@dataclass(frozen=True)
class Principal:
sub: str
roles: list[str]
def is_admin(self) -> bool:
return "admin" in self.roles
# application/access_policy.py
from fastapi import Depends, HTTPException
from adapters.in.http.security import Principal, principal as get_principal
from domain.order.order_repository import OrderRepository
class OrderAccessPolicy:
def __init__(
self,
repo: OrderRepository = Depends(),
p: Principal = Depends(get_principal),
):
self._repo = repo
self._principal = p
def _load(self, order_id: str):
order = self._repo.find_by_id(order_id)
if order is None:
raise HTTPException(status_code=404, detail="not found")
return order
def can_view(self, order_id: str):
order = self._load(order_id)
if not self._principal.is_admin() and order.customer_id != self._principal.sub:
raise HTTPException(status_code=403, detail="forbidden")
return order
def can_edit(self, order_id: str):
return self.can_view(order_id)
# adapters/in/http/order_router.py
from fastapi import APIRouter, Depends
from application.access_policy import OrderAccessPolicy
from adapters.in.http.security import require_roles
router = APIRouter(prefix="/orders")
@router.get("/{order_id}")
async def get_order(
order_id: str,
policy: OrderAccessPolicy = Depends(),
_: None = Depends(require_roles("customer", "admin")),
):
order = policy.can_view(order_id)
return OrderResponse.from_domain(order)
admin ветка внутри can_view обходит ownership — handler-check не нужен.
Способ 2: проверка внутри Handler
Для write-команд, где нужен SELECT FOR UPDATE + проверка состояния, ABAC размещается в Handler — он же владеет транзакцией и загружает агрегат.
# application/cancel_order_handler.py
from dataclasses import dataclass
from adapters.in.http.security import Principal
from domain.order.order_repository import OrderRepository
from domain.order.errors import OrderNotFound, OrderCannotBeCancelled
from application.errors import ForbiddenError
from application.audit_log import AuditLog, AdminAction
from datetime import datetime, timezone
@dataclass(frozen=True)
class CancelOrderCommand:
order_id: str
principal: Principal
class CancelOrderHandler:
def __init__(
self,
repo: OrderRepository,
audit_log: AuditLog,
):
self._repo = repo
self._audit_log = audit_log
def handle(self, cmd: CancelOrderCommand) -> Order:
order = self._repo.find_by_id_for_update(cmd.order_id)
if order is None:
raise OrderNotFound(cmd.order_id)
if not cmd.principal.is_admin() and order.customer_id != cmd.principal.sub:
raise ForbiddenError("order does not belong to current user")
if not order.can_cancel():
raise OrderCannotBeCancelled(order.id, order.status)
order.cancel()
saved = self._repo.save(order)
if cmd.principal.is_admin():
self._audit_log.record(AdminAction(
actor_id=cmd.principal.sub,
action="cancel-order",
resource_type="Order",
resource_id=order.id,
occurred_at=datetime.now(timezone.utc),
metadata={"previous_status": order.previous_status},
))
return saved
# adapters/in/http/order_router.py (фрагмент)
from adapters.in.http.security import principal as get_principal, require_roles
@router.post("/{order_id}/cancel")
async def cancel_order(
order_id: str,
p: Principal = Depends(get_principal),
_: None = Depends(require_roles("customer", "admin")),
handler: CancelOrderHandler = Depends(),
):
return handler.handle(CancelOrderCommand(order_id=order_id, principal=p))
Handler держит всю логику: загрузка с блокировкой → ABAC → бизнес-правило → сохранение → audit.
Когда какой способ
| Случай | Способ |
|---|---|
Простая ownership: order.customer_id == principal.sub | AccessPolicy |
| Read endpoints без модификации | AccessPolicy |
Write с FOR UPDATE + проверка состояния | Handler-check |
| Составная проверка: ownership + статус + бизнес-правило | Handler-check |
| Multi-aggregate: admin or seller связанного продукта | Handler-check |
ABAC централизована, не размазана
AUTH-11: ownership-логика в одном месте.
Корректно:
# OrderAccessPolicy.can_view — единая точка для всех эндпоинтов Order
Неверно:
# ПЛОХО — ABAC размазан по роутерам
@router.get("/{order_id}")
async def get_order(order_id: str, p: Principal = Depends(get_principal)):
order = repo.find_by_id(order_id)
if order.customer_id != p.sub: # дублируется в каждом эндпоинте
raise HTTPException(status_code=403)
return order
@router.post("/{order_id}/cancel")
async def cancel_order(order_id: str, p: Principal = Depends(get_principal)):
order = repo.find_by_id(order_id)
if order.customer_id != p.sub: # ещё раз
raise HTTPException(status_code=403)
...
Проблемы: логика дублируется в каждом endpoint; при смене модели (co-ownership, совладелец) надо обновлять N мест; роутер делает бизнес-проверку — нарушение thin-router.
Admin обходит ABAC + audit
AUTH-12: admin может всё, но след остаётся.
Пример для запроса Product из каталога Sber-маркета:
# application/get_product_handler.py
@dataclass(frozen=True)
class GetProductQuery:
product_id: str
principal: Principal
class GetProductHandler:
def __init__(self, repo: ProductRepository, audit_log: AuditLog):
self._repo = repo
self._audit_log = audit_log
def handle(self, query: GetProductQuery) -> Product:
product = self._repo.find_by_id(query.product_id)
if product is None:
raise ProductNotFound(query.product_id)
if not query.principal.is_admin() and product.seller_id != query.principal.sub:
raise ForbiddenError("product does not belong to current seller")
if query.principal.is_admin():
self._audit_log.record(AdminAction(
actor_id=query.principal.sub,
action="view-product",
resource_type="Product",
resource_id=product.id,
occurred_at=datetime.now(timezone.utc),
metadata={"seller_id": product.seller_id},
))
return product
Admin может посмотреть любой продукт продавца (compliance, support), но в product_audit_log остаётся запись: actor_id=admin-7, action=view-product, resource_id=prod-555.
Подробнее — Audit admin-команд.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| RBAC-endpoint без ABAC для own-resource | AUTH-10 | оба слоя обязательны |
| ABAC-проверка inline в роутере | AUTH-11 | AccessPolicy или Handler |
Дубликат ABAC в AccessPolicy + Handler одновременно | AUTH-11 | один из способов |
| Admin без записи в audit log при обходе ABAC | AUTH-12 | audit_log.record(...) обязателен |
Проверка только customer_id == sub, без ветки is_admin() | AUTH-12 | is_admin() or ownership |
HTTPException(403) вместо typed ForbiddenError | AUTH-6 | доменная ошибка → маппер → 403 |
ABAC игнорирует deleted/inactive ресурсы | AUTH-10 | проверять статус до ownership |
principal.sub сравнивается с int без приведения | AUTH-10 | явное приведение типов |
Куда дальше
- RBAC: маппинг ролей — слой до ABAC.
- Где какая проверка — ABAC в Domain Service, не в Gateway.
- Audit admin-команд — обязательная пара к admin-override.
- JWT validation —
Principalиrequire_rolesиз JWKS. - PII и секреты — что не должно попасть в логи и события.
- Service-to-service — mTLS и Client Credentials для inter-service ABAC.
- Идемпотентность —
Idempotency-Keyдля money-команд. - Хранение токенов на клиенте — HttpOnly cookie, RT rotation.