← назад к разделу

Когда в API появляются эндпоинты вида GET /orders/{id}, возникает вопрос: а может ли любой авторизованный пользователь читать чужие заказы? Ролевой проверки (RBAC) для этого недостаточно — нужен второй слой.

Проблема: RBAC не защищает от горизонтального перебора

Представьте: у вас есть роль customer, которая разрешает вызывать GET /orders/{id}. Пользователь customer-42 знает свой заказ order-100 и просто меняет число в URL на order-101 — это заказ customer-99. RBAC пропустит запрос, потому что роль есть. Ресурс окажется чужим — но никто этого не проверил.

Такая атака называется IDOR (Insecure Direct Object Reference) — прямое обращение к чужому объекту по идентификатору. Это одна из самых частых уязвимостей в API.

ABAC (Attribute-Based Access Control) — контроль доступа на основе атрибутов — решает именно эту проблему. RBAC говорит «роль customer может вызывать этот endpoint», ABAC добавляет «но только к своим ресурсам». Вместе они дают полноценную защиту.

Что такое Principal

В JWT-токене пользователя всегда есть поле sub — уникальный идентификатор пользователя. Когда FastAPI извлекает токен из запроса, удобно завернуть эти данные в датакласс:

# adapters/in/http/security.py
from dataclasses import dataclass

@dataclass(frozen=True)
class Principal:
    sub: str        # UUID или числовой id пользователя из JWT
    roles: list[str]

    def is_admin(self) -> bool:
        return "admin" in self.roles

principal.sub — это строка. При сравнении с order.customer_id типы должны совпадать: если в базе хранится UUID, приводите явно, иначе сравнение не сработает.

Способ 1: AccessPolicy для простых проверок

Если нужна просто проверка «этот ресурс принадлежит текущему пользователю», хорошо подходит отдельный класс AccessPolicy. Это обычный сервис, который инжектируется через Depends в роутер.

# application/access_policy.py
from fastapi import Depends, HTTPException
from adapters.in.http.security import Principal, principal as get_principal
from domain.order.order_repository import OrderRepository


class OrderAccessPolicy:
    def __init__(
        self,
        repo: OrderRepository = Depends(),
        p: Principal = Depends(get_principal),
    ):
        self._repo = repo
        self._principal = p

    def _load(self, order_id: str):
        order = self._repo.find_by_id(order_id)
        if order is None:
            raise HTTPException(status_code=404, detail="not found")
        return order

    def can_view(self, order_id: str):
        order = self._load(order_id)
        if not self._principal.is_admin() and order.customer_id != self._principal.sub:
            raise HTTPException(status_code=403, detail="forbidden")
        return order

    def can_edit(self, order_id: str):
        return self.can_view(order_id)

Роутер вызывает политику и получает уже проверенный объект:

# adapters/in/http/order_router.py
from fastapi import APIRouter, Depends
from application.access_policy import OrderAccessPolicy
from adapters.in.http.security import require_roles

router = APIRouter(prefix="/orders")

@router.get("/{order_id}")
async def get_order(
    order_id: str,
    policy: OrderAccessPolicy = Depends(),
    _: None = Depends(require_roles("customer", "admin")),
):
    order = policy.can_view(order_id)
    return OrderResponse.from_domain(order)

AccessPolicy — единая точка ownership-логики для всех эндпоинтов агрегата. Если завтра бизнес-правило изменится (например, добавится совладелец), меняете одно место, а не каждый роутер.

Способ 2: проверка внутри Handler

Для команд, изменяющих состояние (например, отмена заказа), ABAC удобнее разместить прямо в Handler. Handler владеет транзакцией, загружает агрегат с блокировкой и проверяет бизнес-правила — ABAC здесь органично вписывается в тот же поток:

# application/cancel_order_handler.py
from dataclasses import dataclass
from adapters.in.http.security import Principal
from domain.order.order_repository import OrderRepository
from domain.order.errors import OrderNotFound, OrderCannotBeCancelled
from application.errors import ForbiddenError
from application.audit_log import AuditLog, AdminAction
from datetime import datetime, timezone


@dataclass(frozen=True)
class CancelOrderCommand:
    order_id: str
    principal: Principal


class CancelOrderHandler:
    def __init__(self, repo: OrderRepository, audit_log: AuditLog):
        self._repo = repo
        self._audit_log = audit_log

    def handle(self, cmd: CancelOrderCommand) -> Order:
        order = self._repo.find_by_id_for_update(cmd.order_id)
        if order is None:
            raise OrderNotFound(cmd.order_id)

        if not cmd.principal.is_admin() and order.customer_id != cmd.principal.sub:
            raise ForbiddenError("order does not belong to current user")

        if not order.can_cancel():
            raise OrderCannotBeCancelled(order.id, order.status)

        order.cancel()
        saved = self._repo.save(order)

        if cmd.principal.is_admin():
            self._audit_log.record(AdminAction(
                actor_id=cmd.principal.sub,
                action="cancel-order",
                resource_type="Order",
                resource_id=order.id,
                occurred_at=datetime.now(timezone.utc),
                metadata={"previous_status": order.previous_status},
            ))

        return saved

Порядок: загрузка с блокировкой → ABAC → бизнес-правило → сохранение → audit. Handler остаётся тонким роутером — роутер передаёт ему команду и ничего не проверяет сам.

Когда какой способ выбрать

СитуацияСпособ
Простое сравнение: order.customer_id == principal.subAccessPolicy
Read-эндпоинты без изменения состоянияAccessPolicy
Write-команды с SELECT FOR UPDATEHandler-check
Ownership + проверка статуса + бизнес-правило вместеHandler-check
Один агрегат, несколько возможных владельцевHandler-check

Важно: выбирайте один из двух способов для каждого агрегата, не оба одновременно. Дубликат расходится при изменении бизнес-правил.

Частая ошибка: ABAC в каждом роутере

Антипаттерн — писать проверку владения прямо в каждой функции роутера:

# ПЛОХО — проверка дублируется в каждом эндпоинте
@router.get("/{order_id}")
async def get_order(order_id: str, p: Principal = Depends(get_principal)):
    order = repo.find_by_id(order_id)
    if order.customer_id != p.sub:
        raise HTTPException(status_code=403)
    return order

@router.post("/{order_id}/cancel")
async def cancel_order(order_id: str, p: Principal = Depends(get_principal)):
    order = repo.find_by_id(order_id)
    if order.customer_id != p.sub:   # снова то же самое
        raise HTTPException(status_code=403)
    ...

При изменении правил (добавить совладельца, проверять статус ресурса до проверки владения) придётся обходить все эндпоинты. Пропустите один — уязвимость вернётся.

Admin обходит ABAC, но оставляет след

Администратор часто должен видеть любые ресурсы — для поддержки, проверки, расследования. Это нормально, но каждое такое действие должно быть зафиксировано в журнале:

if query.principal.is_admin():
    self._audit_log.record(AdminAction(
        actor_id=query.principal.sub,
        action="view-product",
        resource_type="Product",
        resource_id=product.id,
        occurred_at=datetime.now(timezone.utc),
        metadata={"seller_id": product.seller_id},
    ))

В журнале появится запись: actor_id=admin-7, action=view-product, resource_id=prod-555. Если кто-то злоупотребляет admin-доступом, это видно при аудите.

Проверка is_admin() должна быть во всех ветках — не только для audit, но и чтобы пропустить ownership-проверку. Обычный паттерн: if not principal.is_admin() and resource.owner != principal.sub: raise ForbiddenError(...).

Коротко

  • RBAC говорит «роль разрешает вызов», ABAC добавляет «только к своим ресурсам» — оба слоя нужны вместе.
  • Без ABAC RBAC-эндпоинт открыт для IDOR: любой пользователь с нужной ролью перебирает чужие объекты по id.
  • Два способа разместить ABAC: AccessPolicy (для read и простых ownership) и Handler-check (для write с транзакцией и бизнес-правилами).
  • Ownership-логика — в одном месте, не дублируется по роутерам.
  • principal.sub — строка из JWT; при сравнении с id из базы следите за типами.
  • Admin обходит ABAC, но каждое его действие записывается в audit log.

Что почитать дальше

  • RBAC: маппинг ролей — слой до ABAC.
  • Audit admin-команд — обязательная пара к admin-override.
  • JWT validation — как получить Principal из токена.