← назад к разделу

Когда клиент отправляет запрос с токеном, сервис должен убедиться: токен настоящий, подпись верна, срок действия не истёк. Кажется, что это просто — «распакуй Base64 и прочитай». Но именно здесь чаще всего делают ошибки, которые открывают доступ злоумышленникам.

Разберём, как сделать это правильно в FastAPI.

Почему нельзя просто «распаковать» токен

JWT — это три части, разделённые точками: заголовок, полезная нагрузка и подпись. Подпись создаётся сервером аутентификации (IdP) с помощью приватного ключа.

Частая ошибка — читать только среднюю часть и не проверять подпись:

# Так делать нельзя
import base64, json

def bad_decode(token: str) -> dict:
    payload_b64 = token.split(".")[1]
    padding = "=" * (4 - len(payload_b64) % 4)
    return json.loads(base64.urlsafe_b64decode(payload_b64 + padding))

Что идёт не так:

  • Подпись не проверяется — злоумышленник вставляет любые данные, включая admin: true.
  • Срок действия не проверяется — просроченный токен принимается как свежий.
  • Издатель и аудитория не проверяются — токен от другого сервиса или чужого IdP тоже пройдёт.
  • Алгоритм none может пройти — классическая уязвимость: токен без подписи объявляет "alg": "none" и принимается без проверки.

Правильный способ — использовать PyJWT вместе с PyJWKClient. Библиотека сама делает все нужные проверки.

Как устроена правильная проверка

Сервер аутентификации публикует открытые ключи по известному адресу — это называется JWKS (JSON Web Key Set). Ваш сервис скачивает эти ключи и с их помощью проверяет подпись каждого токена.

Весь этот код живёт в одном месте: adapters/in/http/security.py. Остальные части приложения получают уже проверенный объект Principal через механизм зависимостей FastAPI — Depends.

# adapters/in/http/security.py
import jwt
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from jwt import PyJWKClient, PyJWKClientConnectionError
from pydantic import BaseModel

from app.config import settings

_jwks = PyJWKClient(settings.jwks_uri, cache_keys=True, lifespan=300)

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=True)


class Principal(BaseModel):
    sub: str
    roles: list[str]


def _extract_roles(claims: dict) -> list[str]:
    realm = claims.get("realm_access", {})
    return realm.get("roles", [])


async def principal(token: str = Depends(oauth2_scheme)) -> Principal:
    try:
        signing_key = _jwks.get_signing_key_from_jwt(token).key
    except PyJWKClientConnectionError as exc:
        raise HTTPException(status_code=503, detail="identity provider unavailable") from exc
    try:
        claims = jwt.decode(
            token,
            signing_key,
            algorithms=["RS256"],
            audience=settings.audience,
            issuer=settings.issuer,
        )
    except jwt.PyJWTError as exc:
        raise HTTPException(status_code=401, detail="invalid token") from exc
    return Principal(sub=claims["sub"], roles=_extract_roles(claims))


def require_roles(*roles: str):
    async def dep(p: Principal = Depends(principal)) -> Principal:
        if not set(roles) & set(p.roles):
            raise HTTPException(status_code=403, detail="forbidden")
        return p
    return dep

Что здесь происходит по шагам:

  1. _jwks.get_signing_key_from_jwt(token) — находит нужный ключ по полю kid в заголовке токена.
  2. jwt.decode(...) — проверяет подпись, срок действия (exp), издателя (iss) и аудиторию (aud). Явно указан алгоритм RS256 — это закрывает атаку через alg: none.
  3. Если что-то не так — автоматически бросается HTTPException(status_code=401).
  4. Если всё хорошо — возвращается Principal с sub и списком ролей.

Настройки берутся из переменных окружения через pydantic-settings:

# config.py
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    jwks_uri: str
    audience: str
    issuer: str

    model_config = {"env_file": ".env"}


settings = Settings()

Пример .env для локального запуска (в git не коммитится):

JWKS_URI=https://idp.example.com/realms/main/protocol/openid-connect/certs
AUDIENCE=order-service
ISSUER=https://idp.example.com/realms/main

Как использовать в роутерах

Каждый endpoint указывает, какие роли нужны для доступа. Делается это через Depends(require_roles(...)):

# adapters/in/http/orders_router.py
from fastapi import APIRouter, Depends

from adapters.in.http.security import Principal, require_roles
from application.use_cases.create_order import CreateOrderUseCase

router = APIRouter(prefix="/orders")


@router.post("/", status_code=201)
async def create_order(
    body: CreateOrderRequest,
    principal: Principal = Depends(require_roles("customer", "admin")),
    use_case: CreateOrderUseCase = Depends(),
) -> OrderResponse:
    return await use_case.execute(body, principal)


@router.get("/{order_id}")
async def get_order(
    order_id: str,
    principal: Principal = Depends(require_roles("customer", "seller", "admin")),
    use_case: GetOrderUseCase = Depends(),
) -> OrderResponse:
    return await use_case.execute(order_id, principal)

Объект Principal прокидывается в UseCase — проверки типа «этот заказ принадлежит текущему пользователю» (order.customer_id == principal.sub) делаются там, а не в роутере.

Кеш JWKS и смена ключей

Каждый раз тянуть ключи из IdP при каждом запросе — плохая идея: это медленно и создаёт лишнюю нагрузку. PyJWKClient с параметром lifespan=300 кеширует ключи на 5 минут:

_jwks = PyJWKClient(
    settings.jwks_uri,
    cache_keys=True,
    lifespan=300,
)

Что происходит при смене ключей у IdP: если kid из нового токена не найден в кеше, PyJWKClient автоматически обновляет кеш и сразу находит новый ключ. Ничего дополнительно писать не нужно.

Распространённые ошибки:

  • Хранить публичный ключ прямо в конфигурации — при смене ключей сервис перестанет проверять токены.
  • Вручную вызывать _jwks.fetch_data() по таймеру — PyJWKClient делает это сам.
  • Парсить JWK вручную через cryptography без PyJWKClient.

Если IdP недоступен при старте сервиса — сервис не пройдёт проверку готовности (readiness). Это правильное поведение: принимать запросы без возможности проверить токен нельзя.

Разница между 401 и 403

Это разные ситуации с разными последствиями для клиента:

КодКогдаЧто делает клиент
401 UnauthorizedТокен невалиден: плохая подпись, истёк срок, нет заголовкаЗапустить обновление токена или перенаправить на логин
403 ForbiddenТокен валиден, но роль не подходит или доступ к ресурсу закрытПоказать «доступ запрещён», обновление токена не поможет

Если вернуть 403 на просроченный токен — клиент не попытается его обновить и «зависнет». Если вернуть 401 при нехватке прав — клиент будет бесконечно пытаться войти заново.

В коде выше это разделено по зависимостям: principal бросает 401, require_roles бросает 403.

Пример обработки в клиентском коде, когда один сервис вызывает другой:

# adapters/out/http/order_client.py
async def get_order(order_id: str, token: str) -> OrderDTO:
    resp = await client.get(f"/orders/{order_id}", headers={"Authorization": f"Bearer {token}"})
    if resp.status_code == 401:
        raise TokenExpiredError()
    if resp.status_code == 403:
        raise ForbiddenError()
    resp.raise_for_status()
    return OrderDTO.model_validate(resp.json())

Коротко

  • JWT содержит подпись — её обязательно проверять; просто «распаковать Base64» не достаточно.
  • Используй PyJWT + PyJWKClient: они проверяют подпись, срок действия, издателя и аудиторию.
  • Явно указывай algorithms=["RS256"] — это закрывает атаку через токен без подписи (alg: none).
  • Вся логика проверки токена живёт в одном месте (security.py) и отдаётся в endpoint через Depends(principal).
  • PyJWKClient кеширует ключи и сам обновляет кеш при смене ключей у IdP.
  • 401 — токен невалиден (клиент обновляет токен), 403 — токен валиден, но доступ закрыт (обновление не поможет).
  • Конфигурацию (jwks_uri, audience, issuer) читай из переменных окружения, не хардкоди.

Что почитать дальше