Опирается на правила:
AUTH-7…AUTH-9из Auth Patterns → раздел 3. RBAC: маппинг ролей.
Важно знать
- Роли в JWT —
realm_access.roles(Keycloak) илиscope(стандартный OAuth2).- Маппинг в
Principal.rolesпроисходит в зависимостиprincipalчерез вспомогательную функцию_roles(claims).- Каталог ролей UCP:
customer,seller,admin,system. Любая новая роль — пересмотр Bounded Context.Depends(require_roles(...))обязателен на каждом endpoint. Endpoint без проверки роли — критическое нарушение.- RBAC отвечает endpoint-level, не resource-level. «Этот ли его order» — это ABAC.
require_rolesпринимает несколько ролей; достаточно одного совпадения (OR-семантика).- Невалидный JWT (просроченный, неверная подпись) → 401, недостаточная роль → 403. Путать запрещено.
adminобходит ABAC, но не RBAC: у admin-endpoint всё равно стоитrequire_roles("admin").
RBAC (Role-Based Access Control) — первый слой авторизации после JWT validation. Отвечает на простой вопрос: «может ли пользователь этой роли вообще обращаться к этому endpoint». UCP задаёт минимум ролей — слишком много ролей означает либо плохую модель, либо ABAC, замаскированный под роли.
Principal и маппинг ролей
AUTH-7: роли из claim маппятся в Principal.roles в зависимости.
# adapters/in/http/security.py
from dataclasses import dataclass
from typing import Any
from fastapi import Depends
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi import HTTPException
import jwt
from jwt import PyJWKClient
from config import settings
_bearer = HTTPBearer()
_jwks = PyJWKClient(settings.jwks_uri, cache_keys=True, lifespan=300)
@dataclass(frozen=True)
class Principal:
sub: str
roles: list[str]
def is_admin(self) -> bool:
return "admin" in self.roles
def _roles(claims: dict[str, Any]) -> list[str]:
# Keycloak: realm_access.roles
realm = claims.get("realm_access", {})
if isinstance(realm, dict) and "roles" in realm:
return realm["roles"]
# стандартный OAuth2: scope как строка через пробел
scope = claims.get("scope", "")
return scope.split() if scope else []
async def principal(
creds: HTTPAuthorizationCredentials = Depends(_bearer),
) -> Principal:
token = creds.credentials
try:
key = _jwks.get_signing_key_from_jwt(token).key
claims = jwt.decode(
token, key, algorithms=["RS256"],
audience=settings.audience, issuer=settings.issuer,
)
except jwt.PyJWTError as e:
raise HTTPException(status_code=401, detail="invalid token") from e
return Principal(sub=claims["sub"], roles=_roles(claims))
def require_roles(*roles: str):
async def dep(p: Principal = Depends(principal)) -> Principal:
if not set(roles) & set(p.roles):
raise HTTPException(status_code=403, detail="forbidden")
return p
return dep
Что делает _roles:
- Для Keycloak JWT: читает
realm_access.roles— массив строк. - Для стандартного OAuth2: читает
scope— строка с пробелами, разбивается в список. - Результат —
Principal.roles: list[str].
Пример Keycloak JWT:
{
"sub": "user-42",
"realm_access": {
"roles": ["customer", "loyalty-member"]
}
}
→ Principal(sub="user-42", roles=["customer", "loyalty-member"]).
Каталог ролей UCP
AUTH-8: четыре стандартных роли.
| Роль | Кто | Что делает |
|---|---|---|
customer | Конечный пользователь | Создаёт и читает свои заказы, оплачивает |
seller | Продавец на маркетплейсе | Управляет своими товарами, видит заказы своих товаров |
admin | Внутренний пользователь | Полный доступ, всегда + audit log |
system | Service-to-service | Cross-service вызовы (Client Credentials или mTLS) |
Любая новая роль — повод пересмотреть Bounded Context. Если появляется customer-vip, premium-seller, partner-admin — обычно это не новая роль, а атрибут на существующей роли. Это уже ABAC.
Сценарии «зачем нам новая роль»:
- «У нас есть premium-customers, им доступны другие endpoints» — это feature flag или подписка, проверяется в Handler по атрибуту
customer.is_premium. - «B2B-клиенты не могут видеть retail» — это разные Bounded Context, разные сервисы.
- «Junior-admin может только просматривать» — это permission system, не роль.
Минимизация ролей — дисциплина.
require_roles обязателен на каждом endpoint
AUTH-9: каждый endpoint имеет проверку.
# adapters/in/http/order_router.py
from fastapi import APIRouter, Depends
from adapters.in.http.security import Principal, principal as get_principal, require_roles
from application.create_order_handler import CreateOrderCommand, CreateOrderHandler
from application.get_order_handler import GetOrderByIdQuery, GetOrderHandler
from application.cancel_order_handler import CancelOrderCommand, CancelOrderHandler
router = APIRouter(prefix="/orders")
@router.post("")
async def create_order(
request: CreateOrderRequest,
p: Principal = Depends(require_roles("customer")),
handler: CreateOrderHandler = Depends(),
):
return handler.handle(CreateOrderCommand(customer_id=p.sub, **request.model_dump()))
@router.get("/{order_id}")
async def get_order(
order_id: str,
p: Principal = Depends(require_roles("customer", "admin")),
handler: GetOrderHandler = Depends(),
):
return handler.handle(GetOrderByIdQuery(order_id=order_id, principal=p))
@router.post("/{order_id}/cancel")
async def cancel_order(
order_id: str,
p: Principal = Depends(require_roles("customer", "admin")),
handler: CancelOrderHandler = Depends(),
):
return handler.handle(CancelOrderCommand(order_id=order_id, principal=p))
# adapters/in/http/admin_order_router.py
from fastapi import APIRouter, Depends
from adapters.in.http.security import Principal, require_roles
from application.refund_order_handler import RefundOrderCommand, RefundOrderHandler
admin_router = APIRouter(prefix="/admin/orders")
@admin_router.post("/{order_id}/refund")
async def refund_order(
order_id: str,
p: Principal = Depends(require_roles("admin")),
handler: RefundOrderHandler = Depends(),
):
return handler.handle(RefundOrderCommand(order_id=order_id, principal=p))
Endpoint без Depends(require_roles(...)) — даже если он внутри /admin/ namespace — открыт всем обладателям валидного JWT. require_roles задаёт OR-семантику: достаточно совпадения хотя бы с одной из перечисленных ролей.
Pytest-проверка: нет роли — нет доступа
# tests/test_order_router_auth.py
import pytest
from httpx import AsyncClient
@pytest.mark.anyio
async def test_create_order_requires_customer_role(client: AsyncClient, seller_token: str):
resp = await client.post(
"/orders",
json={"product_id": "prod-1", "quantity": 2},
headers={"Authorization": f"Bearer {seller_token}"},
)
assert resp.status_code == 403
@pytest.mark.anyio
async def test_create_order_without_token_returns_401(client: AsyncClient):
resp = await client.post("/orders", json={"product_id": "prod-1", "quantity": 2})
assert resp.status_code == 401
401 — токен отсутствует или невалиден. 403 — токен валиден, но роль не подходит. Нарушение этого разграничения — нарушение AUTH-6.
RBAC ≠ ABAC
RBAC отвечает endpoint-level: «может ли роль customer обращаться к GET /orders/{order_id}».
RBAC не отвечает: «может ли customer-42 читать order-12345». Это ABAC — нужно загрузить order, проверить customer_id, сравнить с principal.sub. Подробнее — ABAC: владение ресурсом.
# RBAC — endpoint-level: кто вообще может
@router.post("/{order_id}/cancel")
async def cancel_order(
order_id: str,
p: Principal = Depends(require_roles("customer", "admin")), # RBAC
handler: CancelOrderHandler = Depends(),
):
return handler.handle(CancelOrderCommand(order_id=order_id, principal=p))
# ABAC — resource-level: именно ли его этот Order
class CancelOrderHandler:
def handle(self, cmd: CancelOrderCommand) -> Order:
order = self._repo.find_by_id_for_update(cmd.order_id)
if not cmd.principal.is_admin() and order.customer_id != cmd.principal.sub:
raise ForbiddenError("order does not belong to current user") # ABAC
order.cancel()
return self._repo.save(order)
Два слоя — без одного из них дыра.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Endpoint без Depends(require_roles(...)) | AUTH-9 | каждый endpoint имеет проверку |
| 10+ ролей в каталоге | AUTH-8 | customer/seller/admin/system + ABAC |
Новая роль для feature flag (customer-premium) | AUTH-8 | атрибут на customer, проверка в Handler |
| Hardcoded строки ролей россыпью по коду | AUTH-9 | константы или Enum |
Depends(principal) без require_roles для critical endpoint | AUTH-9 | явная роль через require_roles |
RBAC на resource-level (p.roles + сравнение id inline) | AUTH-3 | RBAC endpoint + ABAC в Handler |
Роли из произвольного claim без единой точки _roles() | AUTH-7 | единая функция маппинга |
| Самописный разбор claim без библиотеки | AUTH-4 | PyJWKClient + jwt.decode с проверкой подписи |
Куда дальше
- JWT validation —
principal-зависимость, JWKS-кеш, 401 vs 403. - ABAC: владение ресурсом — следующий слой после RBAC.
- Где какая проверка — Gateway vs BFF vs Domain Service.
- Audit admin-команд — роль
adminвсегда + audit log. - PII и секреты — что не должно попасть в логи и события.
- Service-to-service — роль
system, Client Credentials, mTLS. - Идемпотентность —
Idempotency-Keyдля money-команд. - Хранение токенов на клиенте — HttpOnly cookie, RT rotation.