Опирается на правила: R-OBS-CTX-1R-OBS-CTX-4 и R-OBS-CTX-X1R-OBS-CTX-X3 из Observability Style Guide → раздел 6. Context propagation.

Важно знать

  • contextvars нативно проходят через await и все coroutine-потомки — аналог MDC без TaskDecorator.
  • bind_contextvars(request_id=...) в middleware populates request_id из X-Request-Id header или UUID.
  • clear_contextvars() в finally middleware — обязательно. Иначе контекст утекает соседнему запросу при переиспользовании event-loop-задач.
  • trace_id/span_id — автоматически через OTel-structlog processor, не руками.
  • user_id populates после JWT-валидации в Depends, не в request-id middleware.
  • run_in_executor и thread-offload разрывают contextvars — передавай через contextvars.copy_context().run(...).
  • bind_contextvarsтолько в middleware, не в handler или service. Иначе clear-логика неочевидна.
  • Утечка user_id соседнего запроса в логи — compliance-инцидент.

В Java MDC — thread-local: его нужно явно очищать и копировать на каждый новый thread. В Python с asyncio картина другая: contextvars.ContextVar нативно наследуется каждым Task, порождённым через asyncio.create_task или await. Это значит request_id и user_id, заложенные в middleware, видны всем await-ям внутри request-цикла без явного передавания аргументами. Цена — дисциплина в lifecycle: clear_contextvars() в finally middleware и явный copy_context() при offload в thread.

RequestIdMiddleware

R-OBS-CTX-1: один middleware на entry-point.

# app/middleware/request_id.py
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from structlog.contextvars import bind_contextvars, clear_contextvars

class RequestIdMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = request.headers.get("X-Request-Id") or str(uuid.uuid4())
        bind_contextvars(request_id=request_id)
        try:
            response = await call_next(request)
            response.headers["X-Request-Id"] = request_id
            return response
        finally:
            clear_contextvars()
# app/main.py
from fastapi import FastAPI
from app.middleware.request_id import RequestIdMiddleware

app = FastAPI()
app.add_middleware(RequestIdMiddleware)

Что делает middleware:

  • Читает X-Request-Id из входящего header; если нет — генерирует UUID.
  • Возвращает тот же X-Request-Id в response — клиент передаёт его при инциденте.
  • bind_contextvars кладёт request_id в structlog-контекст текущей async-задачи.
  • В finallyclear_contextvars(). Без этого при переиспользовании event-loop-worker значение останется в contextvars следующего request.

add_middleware(RequestIdMiddleware) должен идти до auth-middleware, чтобы ошибки аутентификации уже несли request_id в логах.

trace_id/span_id автоматически

R-OBS-CTX-2: OTel-structlog processor добавляет trace_id/span_id из активного span в каждую log-запись.

# app/observability/logging.py
import structlog
from opentelemetry import trace

def otel_trace_context_processor(logger, method, event_dict):
    span = trace.get_current_span()
    ctx = span.get_span_context()
    if ctx.is_valid:
        event_dict["trace_id"] = format(ctx.trace_id, "032x")
        event_dict["span_id"] = format(ctx.span_id, "016x")
    return event_dict

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        otel_trace_context_processor,
        structlog.processors.JSONRenderer(),
    ]
)

merge_contextvars сначала вливает request_id/user_id из contextvars, затем otel_trace_context_processor добавляет trace_id/span_id из активного OTel-span. Руками bind_contextvars(trace_id=...) не делать — конфликт с авто, span меняется в процессе.

user_id после JWT-валидации

R-OBS-CTX-4: user_id populates в auth-зависимости, после JWT-декодирования.

# app/dependencies/auth.py
import structlog
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from structlog.contextvars import bind_contextvars

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> str:
    payload = decode_jwt(credentials.credentials)
    user_id = payload.get("sub")
    if not user_id:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
    bind_contextvars(user_id=user_id)
    return user_id
# app/routers/orders.py
from fastapi import APIRouter, Depends
from app.dependencies.auth import get_current_user

router = APIRouter()
log = structlog.get_logger(__name__)

@router.post("/orders")
async def create_order(
    payload: CreateOrderRequest,
    user_id: str = Depends(get_current_user),
):
    log.info("order_create_requested", customer_id=payload.customer_id)
    # request_id, user_id, trace_id — уже в contextvars, попадут в каждую запись

bind_contextvars(user_id=...) выполняется внутри async-задачи request — значение видно всем дочерним coroutine. clear_contextvars() в RequestIdMiddleware.finally снимет и user_id тоже в конце request-цикла.

bind_contextvars в service или handler напрямую — нарушение R-OBS-CTX-X2: где тогда clear?

Propagation через asyncio.create_task

R-OBS-CTX-3: asyncio.create_task создаёт Task и автоматически копирует текущий contextvars.Contextrequest_id/user_id переходят без лишнего кода.

# app/services/order_service.py
import asyncio
import structlog

log = structlog.get_logger(__name__)

async def confirm_order(order_id: str, customer_id: str) -> None:
    log.info("confirm_order_started", order_id=order_id)

    # contextvars наследуются в Task автоматически
    asyncio.create_task(
        _send_confirmation_email(order_id, customer_id)
    )

async def _send_confirmation_email(order_id: str, customer_id: str) -> None:
    log.info("email_queued", order_id=order_id, customer_id=customer_id)
    # request_id из middleware уже здесь — Task получил копию контекста

Trace в Tempo покажет span от incoming request → background-task без разрыва, если OTel-автоинструментация активна (opentelemetry-instrumentation-fastapi).

run_in_executor: явная передача контекста

R-OBS-CTX-3 + R-OBS-TRC-X4: блокирующий код в thread-pool через loop.run_in_executor разрывает contextvars и OTel-контекст — copy_context() обязателен.

# app/services/product_service.py
import asyncio
import contextvars
import structlog
from opentelemetry import context as otel_context

log = structlog.get_logger(__name__)

async def export_product_catalog(product_ids: list[str]) -> bytes:
    loop = asyncio.get_running_loop()
    ctx = contextvars.copy_context()          # снимок contextvars до offload
    otel_ctx = otel_context.get_current()    # снимок OTel-контекста

    def _blocking_export():
        # восстанавливаем оба контекста в thread
        token = otel_context.attach(otel_ctx)
        try:
            return ctx.run(_build_csv, product_ids)
        finally:
            otel_context.detach(token)

    return await loop.run_in_executor(None, _blocking_export)

def _build_csv(product_ids: list[str]) -> bytes:
    log.info("catalog_export_started", count=len(product_ids))
    # request_id здесь через ctx.run — contextvars восстановлены
    ...

ctx.run(fn, *args) выполняет функцию в изолированной копии contextvars — thread видит request_id/user_id из оригинального request. otel_context.attach/detach восстанавливает активный span для thread — без этого manual spans внутри _blocking_export будут orphan-spans без parent.

Kafka/background tasks: явная передача traceparent

При отправке события в Kafka или фоновую очередь OTel-контекст не переходит автоматически — передаём traceparent в headers сообщения.

# app/adapters/kafka/producer.py
from opentelemetry import context as otel_context, propagate
from structlog.contextvars import get_contextvars
import structlog

log = structlog.get_logger(__name__)

async def publish_order_placed(order_id: str, customer_id: str) -> None:
    carrier: dict[str, str] = {}
    propagate.inject(carrier)  # traceparent + tracestate → carrier

    ctx_vars = get_contextvars()
    headers = {
        "traceparent": carrier.get("traceparent", ""),
        "request_id": ctx_vars.get("request_id", ""),
    }

    await kafka_producer.send(
        "order.placed",
        value={"order_id": order_id, "customer_id": customer_id},
        headers=list(headers.items()),
    )
    log.info("order_placed_published", order_id=order_id)

В consumer восстанавливаем контекст до обработки:

# app/consumers/order_placed_consumer.py
from opentelemetry import propagate, context as otel_context
from structlog.contextvars import bind_contextvars, clear_contextvars

async def handle_order_placed(message) -> None:
    headers = dict(message.headers)
    carrier = {"traceparent": headers.get("traceparent", "")}
    parent_ctx = propagate.extract(carrier)

    bind_contextvars(
        request_id=headers.get("request_id", ""),
        order_id=message.value["order_id"],
    )
    token = otel_context.attach(parent_ctx)
    try:
        await _process_order_placed(message.value)
    finally:
        otel_context.detach(token)
        clear_contextvars()

propagate.extract восстанавливает OTel-контекст — consumer-span становится child originating trace. Без этого Tempo покажет два несвязанных дерева.

Что запрещено

АнтипаттернПравилоЧто взамен
bind_contextvars без clear_contextvars() в finallyR-OBS-CTX-X1try/finally: clear_contextvars() в middleware
bind_contextvars в handler или serviceR-OBS-CTX-X2только в middleware / auth-зависимости
run_in_executor без copy_context()R-OBS-CTX-X3ctx = copy_context(); ctx.run(fn) + otel_context.attach
bind_contextvars(trace_id=...) вручнуюR-OBS-CTX-2OTel-structlog processor автоматически
user_id в RequestIdMiddleware до JWTR-OBS-CTX-4bind_contextvars(user_id=...) в auth Depends
Kafka-сообщение без traceparent в headersR-OBS-TRC-X4propagate.inject(carrier) при отправке
print() / logging.info() вместо structlogR-OBS-LOG-X2structlog.get_logger(__name__)

Куда дальше

  • Конфигурация — management-порт, APP_ENV-aware JSON/text конфиг structlog.
  • Health checks — раздельные /health/live и /health/ready с TTL-кешем.
  • Logging — merge_contextvars, kwargs вместо f-string, exc_info=True для ошибок.
  • Metrics — prometheus-client, RED-метрики, низкая cardinality labels.
  • SLO и алерты — SLO recording rules и multi-window burn-rate alerts.
  • Tracing — opentelemetry-instrumentation-fastapi, start_as_current_span, sampling.