Когда в API появляются эндпоинты вида GET /orders/{id}, возникает вопрос: а может ли любой авторизованный пользователь читать чужие заказы? Ролевой проверки (RBAC) для этого недостаточно — нужен второй слой.
Проблема: RBAC не защищает от горизонтального перебора
Представьте: у вас есть роль customer, которая разрешает вызывать GET /orders/{id}. Пользователь customer-42 знает свой заказ order-100 и просто меняет число в URL на order-101 — это заказ customer-99. RBAC пропустит запрос, потому что роль есть. Ресурс окажется чужим — но никто этого не проверил.
Такая атака называется IDOR (Insecure Direct Object Reference) — прямое обращение к чужому объекту по идентификатору. Это одна из самых частых уязвимостей в API.
ABAC (Attribute-Based Access Control) — контроль доступа на основе атрибутов — решает именно эту проблему. RBAC говорит «роль customer может вызывать этот endpoint», ABAC добавляет «но только к своим ресурсам». Вместе они дают полноценную защиту.
Что такое Principal
В JWT-токене пользователя всегда есть поле sub — уникальный идентификатор пользователя. Когда FastAPI извлекает токен из запроса, удобно завернуть эти данные в датакласс:
# adapters/in/http/security.py
from dataclasses import dataclass
@dataclass(frozen=True)
class Principal:
sub: str # UUID или числовой id пользователя из JWT
roles: list[str]
def is_admin(self) -> bool:
return "admin" in self.roles
principal.sub — это строка. При сравнении с order.customer_id типы должны совпадать: если в базе хранится UUID, приводите явно, иначе сравнение не сработает.
Способ 1: AccessPolicy для простых проверок
Если нужна просто проверка «этот ресурс принадлежит текущему пользователю», хорошо подходит отдельный класс AccessPolicy. Это обычный сервис, который инжектируется через Depends в роутер.
# application/access_policy.py
from fastapi import Depends, HTTPException
from adapters.in.http.security import Principal, principal as get_principal
from domain.order.order_repository import OrderRepository
class OrderAccessPolicy:
def __init__(
self,
repo: OrderRepository = Depends(),
p: Principal = Depends(get_principal),
):
self._repo = repo
self._principal = p
def _load(self, order_id: str):
order = self._repo.find_by_id(order_id)
if order is None:
raise HTTPException(status_code=404, detail="not found")
return order
def can_view(self, order_id: str):
order = self._load(order_id)
if not self._principal.is_admin() and order.customer_id != self._principal.sub:
raise HTTPException(status_code=403, detail="forbidden")
return order
def can_edit(self, order_id: str):
return self.can_view(order_id)
Роутер вызывает политику и получает уже проверенный объект:
# adapters/in/http/order_router.py
from fastapi import APIRouter, Depends
from application.access_policy import OrderAccessPolicy
from adapters.in.http.security import require_roles
router = APIRouter(prefix="/orders")
@router.get("/{order_id}")
async def get_order(
order_id: str,
policy: OrderAccessPolicy = Depends(),
_: None = Depends(require_roles("customer", "admin")),
):
order = policy.can_view(order_id)
return OrderResponse.from_domain(order)
AccessPolicy — единая точка ownership-логики для всех эндпоинтов агрегата. Если завтра бизнес-правило изменится (например, добавится совладелец), меняете одно место, а не каждый роутер.
Способ 2: проверка внутри Handler
Для команд, изменяющих состояние (например, отмена заказа), ABAC удобнее разместить прямо в Handler. Handler владеет транзакцией, загружает агрегат с блокировкой и проверяет бизнес-правила — ABAC здесь органично вписывается в тот же поток:
# application/cancel_order_handler.py
from dataclasses import dataclass
from adapters.in.http.security import Principal
from domain.order.order_repository import OrderRepository
from domain.order.errors import OrderNotFound, OrderCannotBeCancelled
from application.errors import ForbiddenError
from application.audit_log import AuditLog, AdminAction
from datetime import datetime, timezone
@dataclass(frozen=True)
class CancelOrderCommand:
order_id: str
principal: Principal
class CancelOrderHandler:
def __init__(self, repo: OrderRepository, audit_log: AuditLog):
self._repo = repo
self._audit_log = audit_log
def handle(self, cmd: CancelOrderCommand) -> Order:
order = self._repo.find_by_id_for_update(cmd.order_id)
if order is None:
raise OrderNotFound(cmd.order_id)
if not cmd.principal.is_admin() and order.customer_id != cmd.principal.sub:
raise ForbiddenError("order does not belong to current user")
if not order.can_cancel():
raise OrderCannotBeCancelled(order.id, order.status)
order.cancel()
saved = self._repo.save(order)
if cmd.principal.is_admin():
self._audit_log.record(AdminAction(
actor_id=cmd.principal.sub,
action="cancel-order",
resource_type="Order",
resource_id=order.id,
occurred_at=datetime.now(timezone.utc),
metadata={"previous_status": order.previous_status},
))
return saved
Порядок: загрузка с блокировкой → ABAC → бизнес-правило → сохранение → audit. Handler остаётся тонким роутером — роутер передаёт ему команду и ничего не проверяет сам.
Когда какой способ выбрать
| Ситуация | Способ |
|---|---|
Простое сравнение: order.customer_id == principal.sub | AccessPolicy |
| Read-эндпоинты без изменения состояния | AccessPolicy |
Write-команды с SELECT FOR UPDATE | Handler-check |
| Ownership + проверка статуса + бизнес-правило вместе | Handler-check |
| Один агрегат, несколько возможных владельцев | Handler-check |
Важно: выбирайте один из двух способов для каждого агрегата, не оба одновременно. Дубликат расходится при изменении бизнес-правил.
Частая ошибка: ABAC в каждом роутере
Антипаттерн — писать проверку владения прямо в каждой функции роутера:
# ПЛОХО — проверка дублируется в каждом эндпоинте
@router.get("/{order_id}")
async def get_order(order_id: str, p: Principal = Depends(get_principal)):
order = repo.find_by_id(order_id)
if order.customer_id != p.sub:
raise HTTPException(status_code=403)
return order
@router.post("/{order_id}/cancel")
async def cancel_order(order_id: str, p: Principal = Depends(get_principal)):
order = repo.find_by_id(order_id)
if order.customer_id != p.sub: # снова то же самое
raise HTTPException(status_code=403)
...
При изменении правил (добавить совладельца, проверять статус ресурса до проверки владения) придётся обходить все эндпоинты. Пропустите один — уязвимость вернётся.
Admin обходит ABAC, но оставляет след
Администратор часто должен видеть любые ресурсы — для поддержки, проверки, расследования. Это нормально, но каждое такое действие должно быть зафиксировано в журнале:
if query.principal.is_admin():
self._audit_log.record(AdminAction(
actor_id=query.principal.sub,
action="view-product",
resource_type="Product",
resource_id=product.id,
occurred_at=datetime.now(timezone.utc),
metadata={"seller_id": product.seller_id},
))
В журнале появится запись: actor_id=admin-7, action=view-product, resource_id=prod-555. Если кто-то злоупотребляет admin-доступом, это видно при аудите.
Проверка is_admin() должна быть во всех ветках — не только для audit, но и чтобы пропустить ownership-проверку. Обычный паттерн: if not principal.is_admin() and resource.owner != principal.sub: raise ForbiddenError(...).
Коротко
- RBAC говорит «роль разрешает вызов», ABAC добавляет «только к своим ресурсам» — оба слоя нужны вместе.
- Без ABAC RBAC-эндпоинт открыт для IDOR: любой пользователь с нужной ролью перебирает чужие объекты по id.
- Два способа разместить ABAC: AccessPolicy (для read и простых ownership) и Handler-check (для write с транзакцией и бизнес-правилами).
- Ownership-логика — в одном месте, не дублируется по роутерам.
principal.sub— строка из JWT; при сравнении с id из базы следите за типами.- Admin обходит ABAC, но каждое его действие записывается в audit log.
Что почитать дальше
- RBAC: маппинг ролей — слой до ABAC.
- Audit admin-команд — обязательная пара к admin-override.
- JWT validation — как получить
Principalиз токена.