Опирается на правила:
R-OBS-CTX-1…R-OBS-CTX-4иR-OBS-CTX-X1…R-OBS-CTX-X3из Observability Style Guide → раздел 6. Context propagation.
Важно знать
contextvarsнативно проходят черезawaitи все coroutine-потомки — аналог MDC безTaskDecorator.bind_contextvars(request_id=...)в middleware populatesrequest_idизX-Request-Idheader или UUID.clear_contextvars()вfinallymiddleware — обязательно. Иначе контекст утекает соседнему запросу при переиспользовании event-loop-задач.trace_id/span_id— автоматически через OTel-structlog processor, не руками.user_idpopulates после JWT-валидации вDepends, не в request-id middleware.run_in_executorи thread-offload разрывают contextvars — передавай черезcontextvars.copy_context().run(...).bind_contextvars— только в middleware, не в handler или service. Иначе clear-логика неочевидна.- Утечка
user_idсоседнего запроса в логи — compliance-инцидент.
В Java MDC — thread-local: его нужно явно очищать и копировать на каждый новый thread. В Python с asyncio картина другая: contextvars.ContextVar нативно наследуется каждым Task, порождённым через asyncio.create_task или await. Это значит request_id и user_id, заложенные в middleware, видны всем await-ям внутри request-цикла без явного передавания аргументами. Цена — дисциплина в lifecycle: clear_contextvars() в finally middleware и явный copy_context() при offload в thread.
RequestIdMiddleware
R-OBS-CTX-1: один middleware на entry-point.
# app/middleware/request_id.py
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from structlog.contextvars import bind_contextvars, clear_contextvars
class RequestIdMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-Id") or str(uuid.uuid4())
bind_contextvars(request_id=request_id)
try:
response = await call_next(request)
response.headers["X-Request-Id"] = request_id
return response
finally:
clear_contextvars()
# app/main.py
from fastapi import FastAPI
from app.middleware.request_id import RequestIdMiddleware
app = FastAPI()
app.add_middleware(RequestIdMiddleware)
Что делает middleware:
- Читает
X-Request-Idиз входящего header; если нет — генерирует UUID. - Возвращает тот же
X-Request-Idв response — клиент передаёт его при инциденте. bind_contextvarsкладётrequest_idв structlog-контекст текущей async-задачи.- В
finally—clear_contextvars(). Без этого при переиспользовании event-loop-worker значение останется в contextvars следующего request.
add_middleware(RequestIdMiddleware) должен идти до auth-middleware, чтобы ошибки аутентификации уже несли request_id в логах.
trace_id/span_id автоматически
R-OBS-CTX-2: OTel-structlog processor добавляет trace_id/span_id из активного span в каждую log-запись.
# app/observability/logging.py
import structlog
from opentelemetry import trace
def otel_trace_context_processor(logger, method, event_dict):
span = trace.get_current_span()
ctx = span.get_span_context()
if ctx.is_valid:
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
return event_dict
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
otel_trace_context_processor,
structlog.processors.JSONRenderer(),
]
)
merge_contextvars сначала вливает request_id/user_id из contextvars, затем otel_trace_context_processor добавляет trace_id/span_id из активного OTel-span. Руками bind_contextvars(trace_id=...) не делать — конфликт с авто, span меняется в процессе.
user_id после JWT-валидации
R-OBS-CTX-4: user_id populates в auth-зависимости, после JWT-декодирования.
# app/dependencies/auth.py
import structlog
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from structlog.contextvars import bind_contextvars
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> str:
payload = decode_jwt(credentials.credentials)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
bind_contextvars(user_id=user_id)
return user_id
# app/routers/orders.py
from fastapi import APIRouter, Depends
from app.dependencies.auth import get_current_user
router = APIRouter()
log = structlog.get_logger(__name__)
@router.post("/orders")
async def create_order(
payload: CreateOrderRequest,
user_id: str = Depends(get_current_user),
):
log.info("order_create_requested", customer_id=payload.customer_id)
# request_id, user_id, trace_id — уже в contextvars, попадут в каждую запись
bind_contextvars(user_id=...) выполняется внутри async-задачи request — значение видно всем дочерним coroutine. clear_contextvars() в RequestIdMiddleware.finally снимет и user_id тоже в конце request-цикла.
bind_contextvars в service или handler напрямую — нарушение R-OBS-CTX-X2: где тогда clear?
Propagation через asyncio.create_task
R-OBS-CTX-3: asyncio.create_task создаёт Task и автоматически копирует текущий contextvars.Context — request_id/user_id переходят без лишнего кода.
# app/services/order_service.py
import asyncio
import structlog
log = structlog.get_logger(__name__)
async def confirm_order(order_id: str, customer_id: str) -> None:
log.info("confirm_order_started", order_id=order_id)
# contextvars наследуются в Task автоматически
asyncio.create_task(
_send_confirmation_email(order_id, customer_id)
)
async def _send_confirmation_email(order_id: str, customer_id: str) -> None:
log.info("email_queued", order_id=order_id, customer_id=customer_id)
# request_id из middleware уже здесь — Task получил копию контекста
Trace в Tempo покажет span от incoming request → background-task без разрыва, если OTel-автоинструментация активна (opentelemetry-instrumentation-fastapi).
run_in_executor: явная передача контекста
R-OBS-CTX-3 + R-OBS-TRC-X4: блокирующий код в thread-pool через loop.run_in_executor разрывает contextvars и OTel-контекст — copy_context() обязателен.
# app/services/product_service.py
import asyncio
import contextvars
import structlog
from opentelemetry import context as otel_context
log = structlog.get_logger(__name__)
async def export_product_catalog(product_ids: list[str]) -> bytes:
loop = asyncio.get_running_loop()
ctx = contextvars.copy_context() # снимок contextvars до offload
otel_ctx = otel_context.get_current() # снимок OTel-контекста
def _blocking_export():
# восстанавливаем оба контекста в thread
token = otel_context.attach(otel_ctx)
try:
return ctx.run(_build_csv, product_ids)
finally:
otel_context.detach(token)
return await loop.run_in_executor(None, _blocking_export)
def _build_csv(product_ids: list[str]) -> bytes:
log.info("catalog_export_started", count=len(product_ids))
# request_id здесь через ctx.run — contextvars восстановлены
...
ctx.run(fn, *args) выполняет функцию в изолированной копии contextvars — thread видит request_id/user_id из оригинального request. otel_context.attach/detach восстанавливает активный span для thread — без этого manual spans внутри _blocking_export будут orphan-spans без parent.
Kafka/background tasks: явная передача traceparent
При отправке события в Kafka или фоновую очередь OTel-контекст не переходит автоматически — передаём traceparent в headers сообщения.
# app/adapters/kafka/producer.py
from opentelemetry import context as otel_context, propagate
from structlog.contextvars import get_contextvars
import structlog
log = structlog.get_logger(__name__)
async def publish_order_placed(order_id: str, customer_id: str) -> None:
carrier: dict[str, str] = {}
propagate.inject(carrier) # traceparent + tracestate → carrier
ctx_vars = get_contextvars()
headers = {
"traceparent": carrier.get("traceparent", ""),
"request_id": ctx_vars.get("request_id", ""),
}
await kafka_producer.send(
"order.placed",
value={"order_id": order_id, "customer_id": customer_id},
headers=list(headers.items()),
)
log.info("order_placed_published", order_id=order_id)
В consumer восстанавливаем контекст до обработки:
# app/consumers/order_placed_consumer.py
from opentelemetry import propagate, context as otel_context
from structlog.contextvars import bind_contextvars, clear_contextvars
async def handle_order_placed(message) -> None:
headers = dict(message.headers)
carrier = {"traceparent": headers.get("traceparent", "")}
parent_ctx = propagate.extract(carrier)
bind_contextvars(
request_id=headers.get("request_id", ""),
order_id=message.value["order_id"],
)
token = otel_context.attach(parent_ctx)
try:
await _process_order_placed(message.value)
finally:
otel_context.detach(token)
clear_contextvars()
propagate.extract восстанавливает OTel-контекст — consumer-span становится child originating trace. Без этого Tempo покажет два несвязанных дерева.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
bind_contextvars без clear_contextvars() в finally | R-OBS-CTX-X1 | try/finally: clear_contextvars() в middleware |
bind_contextvars в handler или service | R-OBS-CTX-X2 | только в middleware / auth-зависимости |
run_in_executor без copy_context() | R-OBS-CTX-X3 | ctx = copy_context(); ctx.run(fn) + otel_context.attach |
bind_contextvars(trace_id=...) вручную | R-OBS-CTX-2 | OTel-structlog processor автоматически |
user_id в RequestIdMiddleware до JWT | R-OBS-CTX-4 | bind_contextvars(user_id=...) в auth Depends |
Kafka-сообщение без traceparent в headers | R-OBS-TRC-X4 | propagate.inject(carrier) при отправке |
print() / logging.info() вместо structlog | R-OBS-LOG-X2 | structlog.get_logger(__name__) |
Куда дальше
- Конфигурация — management-порт,
APP_ENV-aware JSON/text конфиг structlog. - Health checks — раздельные
/health/liveи/health/readyс TTL-кешем. - Logging —
merge_contextvars, kwargs вместо f-string,exc_info=Trueдля ошибок. - Metrics —
prometheus-client, RED-метрики, низкая cardinality labels. - SLO и алерты — SLO recording rules и multi-window burn-rate alerts.
- Tracing —
opentelemetry-instrumentation-fastapi,start_as_current_span, sampling.