Когда клиент отправляет запрос с токеном, сервис должен убедиться: токен настоящий, подпись верна, срок действия не истёк. Кажется, что это просто — «распакуй Base64 и прочитай». Но именно здесь чаще всего делают ошибки, которые открывают доступ злоумышленникам.
Разберём, как сделать это правильно в FastAPI.
Почему нельзя просто «распаковать» токен
JWT — это три части, разделённые точками: заголовок, полезная нагрузка и подпись. Подпись создаётся сервером аутентификации (IdP) с помощью приватного ключа.
Частая ошибка — читать только среднюю часть и не проверять подпись:
# Так делать нельзя
import base64, json
def bad_decode(token: str) -> dict:
payload_b64 = token.split(".")[1]
padding = "=" * (4 - len(payload_b64) % 4)
return json.loads(base64.urlsafe_b64decode(payload_b64 + padding))
Что идёт не так:
- Подпись не проверяется — злоумышленник вставляет любые данные, включая
admin: true. - Срок действия не проверяется — просроченный токен принимается как свежий.
- Издатель и аудитория не проверяются — токен от другого сервиса или чужого IdP тоже пройдёт.
- Алгоритм
noneможет пройти — классическая уязвимость: токен без подписи объявляет"alg": "none"и принимается без проверки.
Правильный способ — использовать PyJWT вместе с PyJWKClient. Библиотека сама делает все нужные проверки.
Как устроена правильная проверка
Сервер аутентификации публикует открытые ключи по известному адресу — это называется JWKS (JSON Web Key Set). Ваш сервис скачивает эти ключи и с их помощью проверяет подпись каждого токена.
Весь этот код живёт в одном месте: adapters/in/http/security.py. Остальные части приложения получают уже проверенный объект Principal через механизм зависимостей FastAPI — Depends.
# adapters/in/http/security.py
import jwt
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jwt import PyJWKClient, PyJWKClientConnectionError
from pydantic import BaseModel
from app.config import settings
_jwks = PyJWKClient(settings.jwks_uri, cache_keys=True, lifespan=300)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=True)
class Principal(BaseModel):
sub: str
roles: list[str]
def _extract_roles(claims: dict) -> list[str]:
realm = claims.get("realm_access", {})
return realm.get("roles", [])
async def principal(token: str = Depends(oauth2_scheme)) -> Principal:
try:
signing_key = _jwks.get_signing_key_from_jwt(token).key
except PyJWKClientConnectionError as exc:
raise HTTPException(status_code=503, detail="identity provider unavailable") from exc
try:
claims = jwt.decode(
token,
signing_key,
algorithms=["RS256"],
audience=settings.audience,
issuer=settings.issuer,
)
except jwt.PyJWTError as exc:
raise HTTPException(status_code=401, detail="invalid token") from exc
return Principal(sub=claims["sub"], roles=_extract_roles(claims))
def require_roles(*roles: str):
async def dep(p: Principal = Depends(principal)) -> Principal:
if not set(roles) & set(p.roles):
raise HTTPException(status_code=403, detail="forbidden")
return p
return dep
Что здесь происходит по шагам:
_jwks.get_signing_key_from_jwt(token)— находит нужный ключ по полюkidв заголовке токена.jwt.decode(...)— проверяет подпись, срок действия (exp), издателя (iss) и аудиторию (aud). Явно указан алгоритмRS256— это закрывает атаку черезalg: none.- Если что-то не так — автоматически бросается
HTTPException(status_code=401). - Если всё хорошо — возвращается
Principalсsubи списком ролей.
Настройки берутся из переменных окружения через pydantic-settings:
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
jwks_uri: str
audience: str
issuer: str
model_config = {"env_file": ".env"}
settings = Settings()
Пример .env для локального запуска (в git не коммитится):
JWKS_URI=https://idp.example.com/realms/main/protocol/openid-connect/certs
AUDIENCE=order-service
ISSUER=https://idp.example.com/realms/main
Как использовать в роутерах
Каждый endpoint указывает, какие роли нужны для доступа. Делается это через Depends(require_roles(...)):
# adapters/in/http/orders_router.py
from fastapi import APIRouter, Depends
from adapters.in.http.security import Principal, require_roles
from application.use_cases.create_order import CreateOrderUseCase
router = APIRouter(prefix="/orders")
@router.post("/", status_code=201)
async def create_order(
body: CreateOrderRequest,
principal: Principal = Depends(require_roles("customer", "admin")),
use_case: CreateOrderUseCase = Depends(),
) -> OrderResponse:
return await use_case.execute(body, principal)
@router.get("/{order_id}")
async def get_order(
order_id: str,
principal: Principal = Depends(require_roles("customer", "seller", "admin")),
use_case: GetOrderUseCase = Depends(),
) -> OrderResponse:
return await use_case.execute(order_id, principal)
Объект Principal прокидывается в UseCase — проверки типа «этот заказ принадлежит текущему пользователю» (order.customer_id == principal.sub) делаются там, а не в роутере.
Кеш JWKS и смена ключей
Каждый раз тянуть ключи из IdP при каждом запросе — плохая идея: это медленно и создаёт лишнюю нагрузку. PyJWKClient с параметром lifespan=300 кеширует ключи на 5 минут:
_jwks = PyJWKClient(
settings.jwks_uri,
cache_keys=True,
lifespan=300,
)
Что происходит при смене ключей у IdP: если kid из нового токена не найден в кеше, PyJWKClient автоматически обновляет кеш и сразу находит новый ключ. Ничего дополнительно писать не нужно.
Распространённые ошибки:
- Хранить публичный ключ прямо в конфигурации — при смене ключей сервис перестанет проверять токены.
- Вручную вызывать
_jwks.fetch_data()по таймеру —PyJWKClientделает это сам. - Парсить JWK вручную через
cryptographyбезPyJWKClient.
Если IdP недоступен при старте сервиса — сервис не пройдёт проверку готовности (readiness). Это правильное поведение: принимать запросы без возможности проверить токен нельзя.
Разница между 401 и 403
Это разные ситуации с разными последствиями для клиента:
| Код | Когда | Что делает клиент |
|---|---|---|
| 401 Unauthorized | Токен невалиден: плохая подпись, истёк срок, нет заголовка | Запустить обновление токена или перенаправить на логин |
| 403 Forbidden | Токен валиден, но роль не подходит или доступ к ресурсу закрыт | Показать «доступ запрещён», обновление токена не поможет |
Если вернуть 403 на просроченный токен — клиент не попытается его обновить и «зависнет». Если вернуть 401 при нехватке прав — клиент будет бесконечно пытаться войти заново.
В коде выше это разделено по зависимостям: principal бросает 401, require_roles бросает 403.
Пример обработки в клиентском коде, когда один сервис вызывает другой:
# adapters/out/http/order_client.py
async def get_order(order_id: str, token: str) -> OrderDTO:
resp = await client.get(f"/orders/{order_id}", headers={"Authorization": f"Bearer {token}"})
if resp.status_code == 401:
raise TokenExpiredError()
if resp.status_code == 403:
raise ForbiddenError()
resp.raise_for_status()
return OrderDTO.model_validate(resp.json())
Коротко
- JWT содержит подпись — её обязательно проверять; просто «распаковать Base64» не достаточно.
- Используй
PyJWT+PyJWKClient: они проверяют подпись, срок действия, издателя и аудиторию. - Явно указывай
algorithms=["RS256"]— это закрывает атаку через токен без подписи (alg: none). - Вся логика проверки токена живёт в одном месте (
security.py) и отдаётся в endpoint черезDepends(principal). PyJWKClientкеширует ключи и сам обновляет кеш при смене ключей у IdP.- 401 — токен невалиден (клиент обновляет токен), 403 — токен валиден, но доступ закрыт (обновление не поможет).
- Конфигурацию (
jwks_uri,audience,issuer) читай из переменных окружения, не хардкоди.
Что почитать дальше
- Разграничение ролей (RBAC) — claim
realm_access.rolesиrequire_roles. - Проверка владения ресурсом (ABAC) —
order.customer_id == principal.subв Handler. - Взаимодействие между сервисами — Client Credentials Flow и mTLS для внутренних запросов.
- Где какая проверка — Gateway vs BFF vs Domain Service.