Опирается на правила: AUTH-4AUTH-6 из Auth Patterns — раздел 2. JWT validation.

Важно знать

  • JWT валидируется единой зависимостью Depends(principal) на основе PyJWT + PyJWKClient — самописный jwt.decode без проверки подписи запрещён.
  • JWKS тянется из IdP по jwks_uri с кешем lifespan=300 (PyJWKClient). Вручную распаковывать ключи — запрещено.
  • Rotation ключей: если kid в токене не найден в кеше, PyJWKClient делает принудительный refresh — писать своё не нужно.
  • 401 — невалидная подпись или просроченный exp. Клиент не аутентифицирован.
  • 403 — токен валиден, но роль не подходит или ABAC отказал. Аутентифицирован, но не авторизован.
  • Путать 401/403 запрещено — для клиента это разные сценарии с разными действиями (refresh-flow vs «доступ закрыт»).
  • alg: none недопустим: в jwt.decode явно указывается algorithms=["RS256"] — без этого библиотека может принять токен без подписи.

JWT validation — точка входа, где сервис решает «пришёл реальный пользователь или подделка». Один пропуск в проверке — и атакующий подписывает токен своим ключом, объявляет себя администратором, получает полный доступ. AUTH-4 запрещает любой самописный код в этом месте — только проверенная библиотека.

Стандартная зависимость principal

AUTH-4: вся логика JWT живёт в одном месте — adapters/in/http/security.py. Остальной код получает готовый объект Principal через Depends.

# adapters/in/http/security.py
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jwt import PyJWT, PyJWKClient
from pydantic import BaseModel

from app.config import settings

_jwks = PyJWKClient(settings.jwks_uri, cache_keys=True, lifespan=300)
_jwt = PyJWT()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=True)


class Principal(BaseModel):
    sub: str
    roles: list[str]


def _extract_roles(claims: dict) -> list[str]:
    realm = claims.get("realm_access", {})
    return realm.get("roles", [])


async def principal(token: str = Depends(oauth2_scheme)) -> Principal:
    try:
        signing_key = _jwks.get_signing_key_from_jwt(token).key
        claims = _jwt.decode(
            token,
            signing_key,
            algorithms=["RS256"],
            audience=settings.audience,
            issuer=settings.issuer,
        )
    except Exception as exc:
        raise HTTPException(status_code=401, detail="invalid token") from exc
    return Principal(sub=claims["sub"], roles=_extract_roles(claims))


def require_roles(*roles: str):
    async def dep(p: Principal = Depends(principal)) -> Principal:
        if not set(roles) & set(p.roles):
            raise HTTPException(status_code=403, detail="forbidden")
        return p
    return dep

Конфигурация через pydantic-settings (AUTH-17 — секреты не в git):

# config.py
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    jwks_uri: str
    audience: str
    issuer: str

    model_config = {"env_file": ".env"}


settings = Settings()

Пример .env для локального запуска (в git не коммитится):

JWKS_URI=https://idp.example.com/realms/main/protocol/openid-connect/certs
AUDIENCE=order-service
ISSUER=https://idp.example.com/realms/main

Применение в роутерах

AUTH-9: каждый endpoint обязан объявить роль. Endpoint без Depends(require_roles(...)) — критическое нарушение.

# adapters/in/http/orders_router.py
from fastapi import APIRouter, Depends

from adapters.in.http.security import Principal, require_roles
from application.use_cases.create_order import CreateOrderUseCase

router = APIRouter(prefix="/orders")


@router.post("/", status_code=201)
async def create_order(
    body: CreateOrderRequest,
    principal: Principal = Depends(require_roles("customer", "admin")),
    use_case: CreateOrderUseCase = Depends(),
) -> OrderResponse:
    return await use_case.execute(body, principal)


@router.get("/{order_id}")
async def get_order(
    order_id: str,
    principal: Principal = Depends(require_roles("customer", "seller", "admin")),
    use_case: GetOrderUseCase = Depends(),
) -> OrderResponse:
    return await use_case.execute(order_id, principal)

Principal прокидывается в UseCase или Handler — ABAC-проверка по владению (order.customer_id == principal.sub) происходит там, а не в роутере (AUTH-3, AUTH-10, AUTH-11).

Самописный парсинг — запрещён

# ЗАПРЕЩЕНО — AUTH-4
import base64, json

def bad_decode(token: str) -> dict:
    payload_b64 = token.split(".")[1]
    padding = "=" * (4 - len(payload_b64) % 4)
    return json.loads(base64.urlsafe_b64decode(payload_b64 + padding))

Что ломается:

  • Подпись не проверяется — attacker подсовывает любые claims.
  • exp не проверяется — просроченный токен принимается.
  • iss и aud не проверяются — токен от чужого IdP или другого сервиса принимается.
  • alg: none может пройти — классическая уязвимость JWT.
  • JWKS не кешируется — каждый запрос дёргает IdP напрямую.

JWKS-кеш и rotation ключей

AUTH-5: PyJWKClient с lifespan=300 кеширует JWK Set на 5 минут. Если kid из токена отсутствует в кеше (IdP сменил ключ) — клиент делает принудительный refresh и сразу подбирает новый ключ. Ничего дополнительно писать не нужно.

_jwks = PyJWKClient(
    settings.jwks_uri,
    cache_keys=True,
    lifespan=300,
)

Запрещено:

  • Хранить публичный ключ в settings.py или файле — при rotation сервис перестанет проверять токены.
  • Вызывать _jwks.fetch_data() вручную по таймеру — PyJWKClient делает это сам.
  • Парсить JWK вручную через cryptography без PyJWKClient.

При недоступности IdP в момент старта сервис не пройдёт проверку health/readiness — это ожидаемое поведение: принимать запросы без возможности проверить JWT нельзя.

401 vs 403

AUTH-6: семантика кодов определяет поведение клиента.

КодСитуацияДействие клиента
401 UnauthorizedНевалидный JWT: плохая подпись, просроченный exp, отсутствующий заголовокЗапустить refresh-token flow или редирект на логин
403 ForbiddenJWT валиден, но require_roles отказал или ABAC отказалПоказать «доступ запрещён», refresh не поможет

Корректная схема — FastAPI возвращает 401 через HTTPException(status_code=401) из зависимости principal, и 403 через HTTPException(status_code=403) из require_roles. Путать нельзя: клиент, получив 403 на просроченный токен, не сделает refresh и зависнет.

Пример обработки в клиентском коде (Product-сервис вызывает Order-сервис s2s):

# adapters/out/http/order_client.py
async def get_order(order_id: str, token: str) -> OrderDTO:
    resp = await client.get(f"/orders/{order_id}", headers={"Authorization": f"Bearer {token}"})
    if resp.status_code == 401:
        raise TokenExpiredError()
    if resp.status_code == 403:
        raise ForbiddenError()
    resp.raise_for_status()
    return OrderDTO.model_validate(resp.json())

Что запрещено

АнтипаттернПравилоЧто взамен
Самописный decode без проверки подписиAUTH-4PyJWT + PyJWKClient
algorithms=[] или algorithms=["none"]AUTH-4algorithms=["RS256"] явно
Хранение публичного ключа в конфигеAUTH-5PyJWKClient с jwks_uri
Ручной fetch JWKS по таймеруAUTH-5lifespan=300 в PyJWKClient
403 вместо 401 на невалидный JWTAUTH-6401 из principal, 403 из require_roles
401 вместо 403 на нехватку правAUTH-6разделить зависимости principal и require_roles
Валидация JWT внутри обработчика бизнес-логикиAUTH-4только через Depends(principal) на уровне роутера
jwt.decode без audience и issuerAUTH-4передавать audience=settings.audience, issuer=settings.issuer

Куда дальше

  • Где какая проверка — Gateway vs BFF vs Domain Service.
  • RBAC: маппинг ролей — claim realm_access.roles и require_roles.
  • ABAC: владение ресурсом — order.customer_id == principal.sub в Handler.
  • Service-to-service — Client Credentials Flow и mTLS для внутренних клиентов.
  • PII и секреты — jwks_uri через env, секреты не в git.
  • Аудит admin-команд — декоратор аудита при admin-обходе ABAC.
  • Хранение токенов на клиенте — HttpOnly cookie, refresh-token rotation.
  • Идемпотентность — Idempotency-Key для денежных команд.