Представьте: у вас 20 запросов выполняются одновременно, и в логах перемешались строки от всех них. Какая строка относится к какому запросу? Без дополнительной информации — никак не понять.
Решение — context propagation: один раз установить идентификатор запроса (request_id) на входе, и он автоматически появится в каждой лог-записи внутри этого запроса — в сервисах, зависимостях, фоновых задачах. Не нужно тащить его параметром через все функции.
Как это работает в Python
В Java логи привязывают к потоку через MDC (thread-local хранилище). В Python с asyncio один поток обслуживает тысячи запросов через coroutine. Механизм другой — contextvars.ContextVar.
ContextVar хранит значение отдельно для каждой asyncio-задачи. Когда вы создаёте задачу через await или asyncio.create_task, дочерняя задача автоматически получает копию контекста родителя. То есть request_id, установленный в начале запроса, виден во всех await-вызовах внутри этого запроса без передачи аргументами.
Библиотека structlog использует именно contextvars под капотом — функции bind_contextvars / clear_contextvars работают с текущей asyncio-задачей.
Middleware: устанавливаем request_id один раз
Правильное место для установки request_id — middleware. Он перехватывает каждый входящий запрос до того, как он попадёт в обработчик.
# app/middleware/request_id.py
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from structlog.contextvars import bind_contextvars, clear_contextvars
class RequestIdMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = request.headers.get("X-Request-Id") or str(uuid.uuid4())
bind_contextvars(request_id=request_id)
try:
response = await call_next(request)
response.headers["X-Request-Id"] = request_id
return response
finally:
clear_contextvars()
# app/main.py
from fastapi import FastAPI
from app.middleware.request_id import RequestIdMiddleware
app = FastAPI()
app.add_middleware(RequestIdMiddleware)
Что здесь происходит:
- Читаем
X-Request-Idиз заголовка запроса — если клиент уже прислал его (например, из API-шлюза), сохраняем. Если нет — генерируем UUID. bind_contextvars(request_id=request_id)записывает значение в contextvars текущей задачи.- Возвращаем тот же
request_idв заголовке ответа — клиент сможет передать его в поддержку при проблемах. clear_contextvars()в блокеfinally— обязательно. Без этого при переиспользовании event-loop-задачrequest_idот предыдущего запроса попадёт в логи следующего.
Важный порядок: middleware с request_id должен добавляться раньше auth-middleware. Тогда даже ошибки аутентификации будут нести request_id в логах.
trace_id и span_id: добавляет OpenTelemetry автоматически
request_id помогает найти все логи одного запроса внутри одного сервиса. trace_id — это идентификатор, который связывает логи и трейсы через несколько сервисов. Его ставить вручную не нужно — это делает structlog-процессор, который читает активный OpenTelemetry span.
# app/observability/logging.py
import structlog
from opentelemetry import trace
def otel_trace_context_processor(logger, method, event_dict):
span = trace.get_current_span()
ctx = span.get_span_context()
if ctx.is_valid:
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
return event_dict
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars, # добавляет request_id, user_id
otel_trace_context_processor, # добавляет trace_id, span_id
structlog.processors.JSONRenderer(),
]
)
merge_contextvars первым делом вливает всё, что лежит в contextvars (request_id, user_id), затем процессор добавляет trace_id и span_id из активного OTel-спана. В итоге каждая лог-запись автоматически содержит все четыре поля.
Не добавляйте trace_id через bind_contextvars вручную — span меняется в процессе обработки запроса, и вручную установленное значение устареет.
user_id: добавляем после проверки токена
user_id нельзя добавить в middleware для request_id — в тот момент токен ещё не проверен. Правильное место — зависимость (Depends), которая валидирует JWT.
# app/dependencies/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from structlog.contextvars import bind_contextvars
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> str:
payload = decode_jwt(credentials.credentials)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
bind_contextvars(user_id=user_id)
return user_id
# app/routers/orders.py
from fastapi import APIRouter, Depends
import structlog
from app.dependencies.auth import get_current_user
router = APIRouter()
log = structlog.get_logger(__name__)
@router.post("/orders")
async def create_order(
payload: CreateOrderRequest,
user_id: str = Depends(get_current_user),
):
log.info("order_create_requested", customer_id=payload.customer_id)
# request_id, user_id, trace_id попадут в запись автоматически
bind_contextvars(user_id=...) выполняется внутри asyncio-задачи запроса, и значение доступно всем дочерним coroutine. clear_contextvars() в finally middleware снимет и user_id в конце запроса.
Нельзя вызывать bind_contextvars внутри сервиса или обработчика напрямую — там нет clear, и где его вызывать становится неочевидным.
asyncio.create_task: контекст переходит автоматически
Если внутри обработчика запускаете фоновую задачу через asyncio.create_task, контекст переходит в неё автоматически — никакого дополнительного кода не нужно.
# app/services/order_service.py
import asyncio
import structlog
log = structlog.get_logger(__name__)
async def confirm_order(order_id: str, customer_id: str) -> None:
log.info("confirm_order_started", order_id=order_id)
asyncio.create_task(
_send_confirmation_email(order_id, customer_id)
)
async def _send_confirmation_email(order_id: str, customer_id: str) -> None:
log.info("email_queued", order_id=order_id, customer_id=customer_id)
# request_id здесь уже есть — Task получил копию контекста автоматически
Это главное отличие от Java: не нужен никакой аналог TaskDecorator — asyncio копирует contextvars при создании задачи.
run_in_executor: контекст нужно передать явно
Вот единственное место, где автоматическая передача ломается. Если выносите блокирующий код в пул потоков через run_in_executor, потоки не получают contextvars автоматически. Нужно сделать снимок контекста заранее.
# app/services/product_service.py
import asyncio
import contextvars
import structlog
from opentelemetry import context as otel_context
log = structlog.get_logger(__name__)
async def export_product_catalog(product_ids: list[str]) -> bytes:
loop = asyncio.get_running_loop()
ctx = contextvars.copy_context() # снимок contextvars до offload
otel_ctx = otel_context.get_current() # снимок OTel-контекста
def _blocking_export():
token = otel_context.attach(otel_ctx)
try:
return ctx.run(_build_csv, product_ids)
finally:
otel_context.detach(token)
return await loop.run_in_executor(None, _blocking_export)
def _build_csv(product_ids: list[str]) -> bytes:
log.info("catalog_export_started", count=len(product_ids))
# request_id здесь доступен через ctx.run
...
ctx.run(fn, *args) выполняет функцию с восстановленным снимком contextvars. otel_context.attach/detach восстанавливает активный OTel-спан — без этого новые спаны внутри потока будут без родителя и не свяжутся с трейсом.
Kafka и фоновые очереди: передаём traceparent в заголовках
При отправке события в Kafka OTel-контекст не переходит сам — нужно сериализовать его в заголовки сообщения.
# app/adapters/kafka/producer.py
from opentelemetry import propagate
from structlog.contextvars import get_contextvars
import structlog
log = structlog.get_logger(__name__)
async def publish_order_placed(order_id: str, customer_id: str) -> None:
carrier: dict[str, str] = {}
propagate.inject(carrier) # traceparent + tracestate → carrier
ctx_vars = get_contextvars()
headers = {
"traceparent": carrier.get("traceparent", ""),
"request_id": ctx_vars.get("request_id", ""),
}
await kafka_producer.send(
"order.placed",
value={"order_id": order_id, "customer_id": customer_id},
headers=list(headers.items()),
)
log.info("order_placed_published", order_id=order_id)
В consumer восстанавливаем контекст до обработки:
# app/consumers/order_placed_consumer.py
from opentelemetry import propagate, context as otel_context
from structlog.contextvars import bind_contextvars, clear_contextvars
async def handle_order_placed(message) -> None:
headers = dict(message.headers)
carrier = {"traceparent": headers.get("traceparent", "")}
parent_ctx = propagate.extract(carrier)
bind_contextvars(
request_id=headers.get("request_id", ""),
order_id=message.value["order_id"],
)
token = otel_context.attach(parent_ctx)
try:
await _process_order_placed(message.value)
finally:
otel_context.detach(token)
clear_contextvars()
propagate.extract восстанавливает OTel-контекст из заголовка traceparent — спан consumer'а становится дочерним к спану producer'а. Без этого в Tempo будет два несвязанных дерева трейсов вместо одного сквозного.
Частые ошибки
Забытый clear_contextvars() — самая опасная ошибка. Если не чистить контекст в finally, при переиспользовании event-loop-задач user_id от одного запроса попадёт в логи другого пользователя. Это утечка данных.
bind_contextvars в сервисе — вызывать bind_contextvars глубоко в коде (не в middleware и не в Depends) значит потерять контроль над clear. Держите установку контекста в одном месте — на входе.
run_in_executor без copy_context() — в потоке будет пустой контекст, request_id в логах пропадёт, спаны потеряют родителя.
bind_contextvars(trace_id=...) вручную — ставить trace_id руками бессмысленно: span меняется в процессе обработки запроса, и вручную установленное значение окажется устаревшим. Доверьте это OTel-процессору.
Коротко
contextvars.ContextVar— изолированное хранилище на asyncio-задачу. Аналог MDC, но работает с asyncio безTaskDecorator.- Установите
request_idодин раз в middleware черезbind_contextvars— он автоматически попадёт во все логи запроса. clear_contextvars()в блокеfinallymiddleware — обязательно, иначе контекст утечёт в следующий запрос.user_idдобавляйте в auth-зависимости (Depends), после проверки JWT, — не в middleware дляrequest_id.trace_idиspan_idдобавляет OTel-structlog-процессор автоматически — не устанавливайте вручную.asyncio.create_taskкопирует contextvars в дочернюю задачу автоматически.run_in_executorконтекст не наследует — перед offload снимите снимок черезcopy_context()иotel_context.get_current().- Kafka и очереди: сериализуйте
traceparentв заголовки сообщения черезpropagate.inject, восстанавливайте в consumer черезpropagate.extract.
Что почитать дальше
- Логирование в Python с structlog —
merge_contextvars, форматы вывода, обработка ошибок. - Трейсинг в Python с OpenTelemetry — инструментация FastAPI, ручные спаны, семплирование.
- Health checks в FastAPI —
/health/liveи/health/readyс TTL-кешем. - Метрики в Python —
prometheus-client, RED-метрики, кардинальность меток.