← назад к разделу

Представьте: у вас 20 запросов выполняются одновременно, и в логах перемешались строки от всех них. Какая строка относится к какому запросу? Без дополнительной информации — никак не понять.

Решение — context propagation: один раз установить идентификатор запроса (request_id) на входе, и он автоматически появится в каждой лог-записи внутри этого запроса — в сервисах, зависимостях, фоновых задачах. Не нужно тащить его параметром через все функции.

Как это работает в Python

В Java логи привязывают к потоку через MDC (thread-local хранилище). В Python с asyncio один поток обслуживает тысячи запросов через coroutine. Механизм другой — contextvars.ContextVar.

ContextVar хранит значение отдельно для каждой asyncio-задачи. Когда вы создаёте задачу через await или asyncio.create_task, дочерняя задача автоматически получает копию контекста родителя. То есть request_id, установленный в начале запроса, виден во всех await-вызовах внутри этого запроса без передачи аргументами.

Библиотека structlog использует именно contextvars под капотом — функции bind_contextvars / clear_contextvars работают с текущей asyncio-задачей.

Middleware: устанавливаем request_id один раз

Правильное место для установки request_id — middleware. Он перехватывает каждый входящий запрос до того, как он попадёт в обработчик.

# app/middleware/request_id.py
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from structlog.contextvars import bind_contextvars, clear_contextvars

class RequestIdMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = request.headers.get("X-Request-Id") or str(uuid.uuid4())
        bind_contextvars(request_id=request_id)
        try:
            response = await call_next(request)
            response.headers["X-Request-Id"] = request_id
            return response
        finally:
            clear_contextvars()
# app/main.py
from fastapi import FastAPI
from app.middleware.request_id import RequestIdMiddleware

app = FastAPI()
app.add_middleware(RequestIdMiddleware)

Что здесь происходит:

  • Читаем X-Request-Id из заголовка запроса — если клиент уже прислал его (например, из API-шлюза), сохраняем. Если нет — генерируем UUID.
  • bind_contextvars(request_id=request_id) записывает значение в contextvars текущей задачи.
  • Возвращаем тот же request_id в заголовке ответа — клиент сможет передать его в поддержку при проблемах.
  • clear_contextvars() в блоке finally — обязательно. Без этого при переиспользовании event-loop-задач request_id от предыдущего запроса попадёт в логи следующего.

Важный порядок: middleware с request_id должен добавляться раньше auth-middleware. Тогда даже ошибки аутентификации будут нести request_id в логах.

trace_id и span_id: добавляет OpenTelemetry автоматически

request_id помогает найти все логи одного запроса внутри одного сервиса. trace_id — это идентификатор, который связывает логи и трейсы через несколько сервисов. Его ставить вручную не нужно — это делает structlog-процессор, который читает активный OpenTelemetry span.

# app/observability/logging.py
import structlog
from opentelemetry import trace

def otel_trace_context_processor(logger, method, event_dict):
    span = trace.get_current_span()
    ctx = span.get_span_context()
    if ctx.is_valid:
        event_dict["trace_id"] = format(ctx.trace_id, "032x")
        event_dict["span_id"] = format(ctx.span_id, "016x")
    return event_dict

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,   # добавляет request_id, user_id
        otel_trace_context_processor,               # добавляет trace_id, span_id
        structlog.processors.JSONRenderer(),
    ]
)

merge_contextvars первым делом вливает всё, что лежит в contextvars (request_id, user_id), затем процессор добавляет trace_id и span_id из активного OTel-спана. В итоге каждая лог-запись автоматически содержит все четыре поля.

Не добавляйте trace_id через bind_contextvars вручную — span меняется в процессе обработки запроса, и вручную установленное значение устареет.

user_id: добавляем после проверки токена

user_id нельзя добавить в middleware для request_id — в тот момент токен ещё не проверен. Правильное место — зависимость (Depends), которая валидирует JWT.

# app/dependencies/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from structlog.contextvars import bind_contextvars

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> str:
    payload = decode_jwt(credentials.credentials)
    user_id = payload.get("sub")
    if not user_id:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
    bind_contextvars(user_id=user_id)
    return user_id
# app/routers/orders.py
from fastapi import APIRouter, Depends
import structlog
from app.dependencies.auth import get_current_user

router = APIRouter()
log = structlog.get_logger(__name__)

@router.post("/orders")
async def create_order(
    payload: CreateOrderRequest,
    user_id: str = Depends(get_current_user),
):
    log.info("order_create_requested", customer_id=payload.customer_id)
    # request_id, user_id, trace_id попадут в запись автоматически

bind_contextvars(user_id=...) выполняется внутри asyncio-задачи запроса, и значение доступно всем дочерним coroutine. clear_contextvars() в finally middleware снимет и user_id в конце запроса.

Нельзя вызывать bind_contextvars внутри сервиса или обработчика напрямую — там нет clear, и где его вызывать становится неочевидным.

asyncio.create_task: контекст переходит автоматически

Если внутри обработчика запускаете фоновую задачу через asyncio.create_task, контекст переходит в неё автоматически — никакого дополнительного кода не нужно.

# app/services/order_service.py
import asyncio
import structlog

log = structlog.get_logger(__name__)

async def confirm_order(order_id: str, customer_id: str) -> None:
    log.info("confirm_order_started", order_id=order_id)

    asyncio.create_task(
        _send_confirmation_email(order_id, customer_id)
    )

async def _send_confirmation_email(order_id: str, customer_id: str) -> None:
    log.info("email_queued", order_id=order_id, customer_id=customer_id)
    # request_id здесь уже есть — Task получил копию контекста автоматически

Это главное отличие от Java: не нужен никакой аналог TaskDecorator — asyncio копирует contextvars при создании задачи.

run_in_executor: контекст нужно передать явно

Вот единственное место, где автоматическая передача ломается. Если выносите блокирующий код в пул потоков через run_in_executor, потоки не получают contextvars автоматически. Нужно сделать снимок контекста заранее.

# app/services/product_service.py
import asyncio
import contextvars
import structlog
from opentelemetry import context as otel_context

log = structlog.get_logger(__name__)

async def export_product_catalog(product_ids: list[str]) -> bytes:
    loop = asyncio.get_running_loop()
    ctx = contextvars.copy_context()       # снимок contextvars до offload
    otel_ctx = otel_context.get_current()  # снимок OTel-контекста

    def _blocking_export():
        token = otel_context.attach(otel_ctx)
        try:
            return ctx.run(_build_csv, product_ids)
        finally:
            otel_context.detach(token)

    return await loop.run_in_executor(None, _blocking_export)

def _build_csv(product_ids: list[str]) -> bytes:
    log.info("catalog_export_started", count=len(product_ids))
    # request_id здесь доступен через ctx.run
    ...

ctx.run(fn, *args) выполняет функцию с восстановленным снимком contextvars. otel_context.attach/detach восстанавливает активный OTel-спан — без этого новые спаны внутри потока будут без родителя и не свяжутся с трейсом.

Kafka и фоновые очереди: передаём traceparent в заголовках

При отправке события в Kafka OTel-контекст не переходит сам — нужно сериализовать его в заголовки сообщения.

# app/adapters/kafka/producer.py
from opentelemetry import propagate
from structlog.contextvars import get_contextvars
import structlog

log = structlog.get_logger(__name__)

async def publish_order_placed(order_id: str, customer_id: str) -> None:
    carrier: dict[str, str] = {}
    propagate.inject(carrier)  # traceparent + tracestate → carrier

    ctx_vars = get_contextvars()
    headers = {
        "traceparent": carrier.get("traceparent", ""),
        "request_id": ctx_vars.get("request_id", ""),
    }

    await kafka_producer.send(
        "order.placed",
        value={"order_id": order_id, "customer_id": customer_id},
        headers=list(headers.items()),
    )
    log.info("order_placed_published", order_id=order_id)

В consumer восстанавливаем контекст до обработки:

# app/consumers/order_placed_consumer.py
from opentelemetry import propagate, context as otel_context
from structlog.contextvars import bind_contextvars, clear_contextvars

async def handle_order_placed(message) -> None:
    headers = dict(message.headers)
    carrier = {"traceparent": headers.get("traceparent", "")}
    parent_ctx = propagate.extract(carrier)

    bind_contextvars(
        request_id=headers.get("request_id", ""),
        order_id=message.value["order_id"],
    )
    token = otel_context.attach(parent_ctx)
    try:
        await _process_order_placed(message.value)
    finally:
        otel_context.detach(token)
        clear_contextvars()

propagate.extract восстанавливает OTel-контекст из заголовка traceparent — спан consumer'а становится дочерним к спану producer'а. Без этого в Tempo будет два несвязанных дерева трейсов вместо одного сквозного.

Частые ошибки

Забытый clear_contextvars() — самая опасная ошибка. Если не чистить контекст в finally, при переиспользовании event-loop-задач user_id от одного запроса попадёт в логи другого пользователя. Это утечка данных.

bind_contextvars в сервисе — вызывать bind_contextvars глубоко в коде (не в middleware и не в Depends) значит потерять контроль над clear. Держите установку контекста в одном месте — на входе.

run_in_executor без copy_context() — в потоке будет пустой контекст, request_id в логах пропадёт, спаны потеряют родителя.

bind_contextvars(trace_id=...) вручную — ставить trace_id руками бессмысленно: span меняется в процессе обработки запроса, и вручную установленное значение окажется устаревшим. Доверьте это OTel-процессору.

Коротко

  • contextvars.ContextVar — изолированное хранилище на asyncio-задачу. Аналог MDC, но работает с asyncio без TaskDecorator.
  • Установите request_id один раз в middleware через bind_contextvars — он автоматически попадёт во все логи запроса.
  • clear_contextvars() в блоке finally middleware — обязательно, иначе контекст утечёт в следующий запрос.
  • user_id добавляйте в auth-зависимости (Depends), после проверки JWT, — не в middleware для request_id.
  • trace_id и span_id добавляет OTel-structlog-процессор автоматически — не устанавливайте вручную.
  • asyncio.create_task копирует contextvars в дочернюю задачу автоматически.
  • run_in_executor контекст не наследует — перед offload снимите снимок через copy_context() и otel_context.get_current().
  • Kafka и очереди: сериализуйте traceparent в заголовки сообщения через propagate.inject, восстанавливайте в consumer через propagate.extract.

Что почитать дальше

  • Логирование в Python с structlog — merge_contextvars, форматы вывода, обработка ошибок.
  • Трейсинг в Python с OpenTelemetry — инструментация FastAPI, ручные спаны, семплирование.
  • Health checks в FastAPI — /health/live и /health/ready с TTL-кешем.
  • Метрики в Python — prometheus-client, RED-метрики, кардинальность меток.