Опирается на правила:
AUTH-4…AUTH-6из Auth Patterns — раздел 2. JWT validation.
Важно знать
- JWT валидируется единой зависимостью
Depends(principal)на основеPyJWT+PyJWKClient— самописныйjwt.decodeбез проверки подписи запрещён.- JWKS тянется из IdP по
jwks_uriс кешемlifespan=300(PyJWKClient). Вручную распаковывать ключи — запрещено.- Rotation ключей: если
kidв токене не найден в кеше,PyJWKClientделает принудительный refresh — писать своё не нужно.- 401 — невалидная подпись или просроченный
exp. Клиент не аутентифицирован.- 403 — токен валиден, но роль не подходит или ABAC отказал. Аутентифицирован, но не авторизован.
- Путать 401/403 запрещено — для клиента это разные сценарии с разными действиями (refresh-flow vs «доступ закрыт»).
alg: noneнедопустим: вjwt.decodeявно указываетсяalgorithms=["RS256"]— без этого библиотека может принять токен без подписи.
JWT validation — точка входа, где сервис решает «пришёл реальный пользователь или подделка». Один пропуск в проверке — и атакующий подписывает токен своим ключом, объявляет себя администратором, получает полный доступ. AUTH-4 запрещает любой самописный код в этом месте — только проверенная библиотека.
Стандартная зависимость principal
AUTH-4: вся логика JWT живёт в одном месте — adapters/in/http/security.py. Остальной код получает готовый объект Principal через Depends.
# adapters/in/http/security.py
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jwt import PyJWT, PyJWKClient
from pydantic import BaseModel
from app.config import settings
_jwks = PyJWKClient(settings.jwks_uri, cache_keys=True, lifespan=300)
_jwt = PyJWT()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=True)
class Principal(BaseModel):
sub: str
roles: list[str]
def _extract_roles(claims: dict) -> list[str]:
realm = claims.get("realm_access", {})
return realm.get("roles", [])
async def principal(token: str = Depends(oauth2_scheme)) -> Principal:
try:
signing_key = _jwks.get_signing_key_from_jwt(token).key
claims = _jwt.decode(
token,
signing_key,
algorithms=["RS256"],
audience=settings.audience,
issuer=settings.issuer,
)
except Exception as exc:
raise HTTPException(status_code=401, detail="invalid token") from exc
return Principal(sub=claims["sub"], roles=_extract_roles(claims))
def require_roles(*roles: str):
async def dep(p: Principal = Depends(principal)) -> Principal:
if not set(roles) & set(p.roles):
raise HTTPException(status_code=403, detail="forbidden")
return p
return dep
Конфигурация через pydantic-settings (AUTH-17 — секреты не в git):
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
jwks_uri: str
audience: str
issuer: str
model_config = {"env_file": ".env"}
settings = Settings()
Пример .env для локального запуска (в git не коммитится):
JWKS_URI=https://idp.example.com/realms/main/protocol/openid-connect/certs
AUDIENCE=order-service
ISSUER=https://idp.example.com/realms/main
Применение в роутерах
AUTH-9: каждый endpoint обязан объявить роль. Endpoint без Depends(require_roles(...)) — критическое нарушение.
# adapters/in/http/orders_router.py
from fastapi import APIRouter, Depends
from adapters.in.http.security import Principal, require_roles
from application.use_cases.create_order import CreateOrderUseCase
router = APIRouter(prefix="/orders")
@router.post("/", status_code=201)
async def create_order(
body: CreateOrderRequest,
principal: Principal = Depends(require_roles("customer", "admin")),
use_case: CreateOrderUseCase = Depends(),
) -> OrderResponse:
return await use_case.execute(body, principal)
@router.get("/{order_id}")
async def get_order(
order_id: str,
principal: Principal = Depends(require_roles("customer", "seller", "admin")),
use_case: GetOrderUseCase = Depends(),
) -> OrderResponse:
return await use_case.execute(order_id, principal)
Principal прокидывается в UseCase или Handler — ABAC-проверка по владению (order.customer_id == principal.sub) происходит там, а не в роутере (AUTH-3, AUTH-10, AUTH-11).
Самописный парсинг — запрещён
# ЗАПРЕЩЕНО — AUTH-4
import base64, json
def bad_decode(token: str) -> dict:
payload_b64 = token.split(".")[1]
padding = "=" * (4 - len(payload_b64) % 4)
return json.loads(base64.urlsafe_b64decode(payload_b64 + padding))
Что ломается:
- Подпись не проверяется — attacker подсовывает любые claims.
expне проверяется — просроченный токен принимается.issиaudне проверяются — токен от чужого IdP или другого сервиса принимается.alg: noneможет пройти — классическая уязвимость JWT.- JWKS не кешируется — каждый запрос дёргает IdP напрямую.
JWKS-кеш и rotation ключей
AUTH-5: PyJWKClient с lifespan=300 кеширует JWK Set на 5 минут. Если kid из токена отсутствует в кеше (IdP сменил ключ) — клиент делает принудительный refresh и сразу подбирает новый ключ. Ничего дополнительно писать не нужно.
_jwks = PyJWKClient(
settings.jwks_uri,
cache_keys=True,
lifespan=300,
)
Запрещено:
- Хранить публичный ключ в
settings.pyили файле — при rotation сервис перестанет проверять токены. - Вызывать
_jwks.fetch_data()вручную по таймеру —PyJWKClientделает это сам. - Парсить JWK вручную через
cryptographyбезPyJWKClient.
При недоступности IdP в момент старта сервис не пройдёт проверку health/readiness — это ожидаемое поведение: принимать запросы без возможности проверить JWT нельзя.
401 vs 403
AUTH-6: семантика кодов определяет поведение клиента.
| Код | Ситуация | Действие клиента |
|---|---|---|
| 401 Unauthorized | Невалидный JWT: плохая подпись, просроченный exp, отсутствующий заголовок | Запустить refresh-token flow или редирект на логин |
| 403 Forbidden | JWT валиден, но require_roles отказал или ABAC отказал | Показать «доступ запрещён», refresh не поможет |
Корректная схема — FastAPI возвращает 401 через HTTPException(status_code=401) из зависимости principal, и 403 через HTTPException(status_code=403) из require_roles. Путать нельзя: клиент, получив 403 на просроченный токен, не сделает refresh и зависнет.
Пример обработки в клиентском коде (Product-сервис вызывает Order-сервис s2s):
# adapters/out/http/order_client.py
async def get_order(order_id: str, token: str) -> OrderDTO:
resp = await client.get(f"/orders/{order_id}", headers={"Authorization": f"Bearer {token}"})
if resp.status_code == 401:
raise TokenExpiredError()
if resp.status_code == 403:
raise ForbiddenError()
resp.raise_for_status()
return OrderDTO.model_validate(resp.json())
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Самописный decode без проверки подписи | AUTH-4 | PyJWT + PyJWKClient |
algorithms=[] или algorithms=["none"] | AUTH-4 | algorithms=["RS256"] явно |
| Хранение публичного ключа в конфиге | AUTH-5 | PyJWKClient с jwks_uri |
| Ручной fetch JWKS по таймеру | AUTH-5 | lifespan=300 в PyJWKClient |
| 403 вместо 401 на невалидный JWT | AUTH-6 | 401 из principal, 403 из require_roles |
| 401 вместо 403 на нехватку прав | AUTH-6 | разделить зависимости principal и require_roles |
| Валидация JWT внутри обработчика бизнес-логики | AUTH-4 | только через Depends(principal) на уровне роутера |
jwt.decode без audience и issuer | AUTH-4 | передавать audience=settings.audience, issuer=settings.issuer |
Куда дальше
- Где какая проверка — Gateway vs BFF vs Domain Service.
- RBAC: маппинг ролей — claim
realm_access.rolesиrequire_roles. - ABAC: владение ресурсом —
order.customer_id == principal.subв Handler. - Service-to-service — Client Credentials Flow и mTLS для внутренних клиентов.
- PII и секреты —
jwks_uriчерез env, секреты не в git. - Аудит admin-команд — декоратор аудита при
admin-обходе ABAC. - Хранение токенов на клиенте — HttpOnly cookie, refresh-token rotation.
- Идемпотентность —
Idempotency-Keyдля денежных команд.