Опирается на правила: R-CACHE-STAMP-1R-CACHE-STAMP-3 и R-CACHE-STAMP-X1R-CACHE-STAMP-X2 из Caching Style Guide → раздел 7. Cache stampede.

Важно знать

  • Cache stampede (thundering herd) — все параллельные coroutine-ы видят cache miss на истёкшем ключе и одновременно уходят в БД, которая получает N одинаковых тяжёлых запросов вместо одного.
  • Single-flight (asyncio.Lock + dict in-flight) — Python-идиома для одного процесса: повторные coroutine-ы одного ключа ждут первого await, не запускают второй.
  • asyncio.Lock не виден другим процессам (R-CACHE-STAMP-X2): в multi-instance деплое каждый worker Uvicorn запустит свой запрос. Нужен distributed lockredis.lock / aioredlock.
  • Hot keys (главная, top-100, курсы валют) — лучшая защита — refresh-ahead через APScheduler или asyncio background task: cache всегда заполнен, stampede невозможен по дизайну.
  • Probabilistic refresh — обновление до истечения TTL с вероятностью, растущей по мере приближения к expiry. Размазывает момент обновления, убирает единый пик.
  • Игнорировать stampede для hot endpoints (>100 RPS) — гарантированный DB-инцидент при рестарте Redis или массовом evict (R-CACHE-STAMP-X1).
  • R-CACHE-STAMP-X2: asyncio.Lock не защищает distributed cache — используй redis.lock поверх redis.asyncio.

Cache stampede — частая причина каскадных отказов после рестарта Redis, после FLUSHDB по ошибке или после деплоя с новыми именами ключей. UCP формулирует три уровня защиты под Python: single-flight для одного процесса, distributed lock для multi-instance и refresh-ahead для известных hot keys.

Что такое stampede

Сценарий: ключ top-products:global истёк в 14:00.

T=0    worker-1 GET → cache MISS → SELECT TOP 100 ... (тяжёлый запрос, 250ms)
T=15ms worker-2 GET → cache MISS → SELECT TOP 100 ... (не знает о worker-1)
T=30ms worker-3 GET → cache MISS → SELECT TOP 100 ...
...
T=200ms worker-12 GET → cache MISS → SELECT TOP 100 ...
T=250ms worker-1 → SET top-products:global ...
T=260ms worker-2 → SET top-products:global ... — поверх worker-1

БД получила 12 одинаковых тяжёлых запросов вместо одного. При 500 RPS в момент expiry это легко сотни параллельных coroutine-ов — БД начинает отказывать, latency растёт у всех downstream.

Один процесс — single-flight через asyncio.Lock

R-CACHE-STAMP-1: Python-идиома для одного процесса — dict[str, asyncio.Future] in-flight задач и asyncio.Lock на ключ. Параллельные coroutine-ы одного ключа подключаются к единственному уже запущенному await.

# adapters/out/cache/product_cache_adapter.py
import asyncio
import json
from typing import Optional

import redis.asyncio as aioredis

from core.ports.out.product_repository import ProductRepository
from core.dto.top_products_dto import TopProductsDto
from config.cache_config import CacheSettings


class ProductCacheAdapter:
    def __init__(
        self,
        redis: aioredis.Redis,
        product_repo: ProductRepository,
        settings: CacheSettings,
    ) -> None:
        self._redis = redis
        self._product_repo = product_repo
        self._settings = settings
        self._in_flight: dict[str, asyncio.Future] = {}
        self._lock = asyncio.Lock()

    async def get_top_products(self) -> list[TopProductsDto]:
        key = "top-products:global"

        raw = await self._redis.get(key)
        if raw:
            return [TopProductsDto(**item) for item in json.loads(raw)]

        async with self._lock:                         # R-CACHE-STAMP-1: single-flight
            if key in self._in_flight:
                return await self._in_flight[key]

            future: asyncio.Future = asyncio.get_event_loop().create_future()
            self._in_flight[key] = future
            try:
                result = await self._load_and_set(key)
                future.set_result(result)
                return result
            except Exception as exc:
                future.set_exception(exc)
                raise
            finally:
                self._in_flight.pop(key, None)

    async def _load_and_set(self, key: str) -> list[TopProductsDto]:
        products = await self._product_repo.find_top_100()
        await self._redis.set(
            key,
            json.dumps([p.model_dump() for p in products]),
            ex=self._settings.ttl.top_products,
        )
        return products

asyncio.Lock в сочетании с _in_flight dict гарантирует: только одна coroutine запускает _load_and_set, остальные awaiting той же Future. pop в finally обеспечивает очистку при любом исходе — следующий промах запустит новую загрузку.

Ограничение: asyncio.Lock живёт в памяти одного процесса Uvicorn. Для multi-worker или multi-instance деплоя каждый worker запустит свой _load_and_set — stampede между процессами сохраняется.

Multi-instance — distributed lock через redis.lock

R-CACHE-STAMP-2: для Redis-backed cache с несколькими воркерами нужен distributed lock. redis.asyncio предоставляет redis.lock.Lock с поддержкой async with.

# adapters/out/cache/order_summary_cache_adapter.py
import asyncio
import json

import redis.asyncio as aioredis
from redis.asyncio.lock import Lock as RedisLock

from core.ports.out.order_repository import OrderRepository
from core.dto.order_summary_dto import OrderSummaryDto
from config.cache_config import CacheSettings


class OrderSummaryCacheAdapter:
    def __init__(
        self,
        redis: aioredis.Redis,
        order_repo: OrderRepository,
        settings: CacheSettings,
    ) -> None:
        self._redis = redis
        self._order_repo = order_repo
        self._settings = settings

    async def get_order_summary(self, order_id: str) -> OrderSummaryDto:
        key = f"order-summary:{order_id}"

        raw = await self._redis.get(key)
        if raw:
            return OrderSummaryDto(**json.loads(raw))

        lock_key = f"lock:{key}"
        # R-CACHE-STAMP-2: distributed lock — виден всем инстансам
        lock: RedisLock = self._redis.lock(
            lock_key,
            timeout=self._settings.lock.ttl_seconds,
            blocking_timeout=0.15,
        )
        acquired = await lock.acquire(blocking=True)
        if not acquired:
            # не дождались lock — возвращаем прямой запрос как fallback
            return await self._order_repo.find_summary(order_id)

        try:
            # double-check: пока ждали lock, другой worker уже заполнил cache
            raw = await self._redis.get(key)
            if raw:
                return OrderSummaryDto(**json.loads(raw))

            summary = await self._order_repo.find_summary(order_id)
            await self._redis.set(
                key,
                summary.model_dump_json(),
                ex=self._settings.ttl.order_summary,
            )
            return summary
        finally:
            await lock.release()

blocking_timeout=0.15 — worker ждёт lock максимум 150ms. Если не получил — fallback на прямой запрос в БД. Double-check после acquire критичен: пока этот worker ждал, другой уже заполнил cache.

Комбинация: single-flight + distributed lock

Для максимальной защиты — оба слоя вместе: asyncio.Lock сокращает внутрипроцессные запросы до одного, redis.lock защищает от cross-process stampede.

# adapters/out/cache/customer_profile_cache_adapter.py
import asyncio
import json

import redis.asyncio as aioredis

from core.ports.out.customer_repository import CustomerRepository
from core.dto.customer_profile_dto import CustomerProfileDto
from config.cache_config import CacheSettings


class CustomerProfileCacheAdapter:
    def __init__(
        self,
        redis: aioredis.Redis,
        customer_repo: CustomerRepository,
        settings: CacheSettings,
    ) -> None:
        self._redis = redis
        self._customer_repo = customer_repo
        self._settings = settings
        self._in_flight: dict[str, asyncio.Future] = {}
        self._local_lock = asyncio.Lock()

    async def get_customer_profile(self, customer_id: str) -> CustomerProfileDto:
        key = f"customer-profiles:{customer_id}"

        raw = await self._redis.get(key)
        if raw:
            return CustomerProfileDto(**json.loads(raw))

        async with self._local_lock:
            if key in self._in_flight:
                return await self._in_flight[key]

            future: asyncio.Future = asyncio.get_event_loop().create_future()
            self._in_flight[key] = future
            try:
                result = await self._load_with_distributed_lock(key, customer_id)
                future.set_result(result)
                return result
            except Exception as exc:
                future.set_exception(exc)
                raise
            finally:
                self._in_flight.pop(key, None)

    async def _load_with_distributed_lock(
        self, key: str, customer_id: str
    ) -> CustomerProfileDto:
        lock = self._redis.lock(
            f"lock:{key}",
            timeout=self._settings.lock.ttl_seconds,
            blocking_timeout=0.15,
        )
        acquired = await lock.acquire(blocking=True)
        if not acquired:
            return await self._customer_repo.find_profile(customer_id)

        try:
            raw = await self._redis.get(key)
            if raw:
                return CustomerProfileDto(**json.loads(raw))

            profile = await self._customer_repo.find_profile(customer_id)
            await self._redis.set(
                key,
                profile.model_dump_json(),
                ex=self._settings.ttl.customer_profile,
            )
            return profile
        finally:
            await lock.release()

Hot keys — refresh-ahead

R-CACHE-STAMP-3: для известных горячих ключей (главная, каталог, курсы Сбера) лучший подход — держать cache всегда заполненным. APScheduler или FastAPI lifespan background task обновляет ключ до истечения TTL.

# adapters/out/cache/product_refresh_job.py
import asyncio
import json
import logging

import redis.asyncio as aioredis

from core.ports.out.product_repository import ProductRepository
from config.cache_config import CacheSettings

logger = logging.getLogger(__name__)


class ProductRefreshJob:
    def __init__(
        self,
        redis: aioredis.Redis,
        product_repo: ProductRepository,
        settings: CacheSettings,
    ) -> None:
        self._redis = redis
        self._product_repo = product_repo
        self._settings = settings
        self._task: asyncio.Task | None = None

    def start(self) -> None:
        self._task = asyncio.create_task(self._refresh_loop())

    def stop(self) -> None:
        if self._task:
            self._task.cancel()

    async def _refresh_loop(self) -> None:
        while True:
            try:
                products = await self._product_repo.find_top_100()
                await self._redis.set(
                    "top-products:global",
                    json.dumps([p.model_dump() for p in products]),
                    ex=self._settings.ttl.top_products,
                )
                logger.debug("top-products:global refreshed")
            except Exception:
                logger.exception("top-products refresh failed")
            await asyncio.sleep(self._settings.refresh_interval.top_products)

Подключение через FastAPI lifespan:

# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI

from adapters.out.cache.product_refresh_job import ProductRefreshJob


@asynccontextmanager
async def lifespan(app: FastAPI):
    job: ProductRefreshJob = app.state.product_refresh_job
    job.start()
    yield
    job.stop()


app = FastAPI(lifespan=lifespan)

Cache всегда заполнен → stampede невозможен по дизайну. Интервал обновления (refresh_interval.top_products) должен быть вдвое меньше TTL, чтобы не возникло окна между expiry и следующим refresh.

Refresh-ahead применим только для ограниченного набора известных ключей — если ключей миллионы (по одному на заказ, на покупателя), фоновое обновление всех невозможно. Для таких случаев — distributed lock или probabilistic refresh.

Probabilistic refresh

Для ключей менее предсказуемых, чем top-products, но всё равно горячих: обновлять до истечения TTL с вероятностью, растущей по мере приближения к expiry.

# adapters/out/cache/sber_rates_cache_adapter.py
import json
import random
import time
from dataclasses import dataclass
from typing import Any

import redis.asyncio as aioredis

from core.ports.out.sber_rates_port import SberRatesPort
from core.dto.rates_dto import RatesDto
from config.cache_config import CacheSettings


@dataclass
class CachedEntry:
    value: dict[str, Any]
    cached_at: float


class SberRatesCacheAdapter:
    def __init__(
        self,
        redis: aioredis.Redis,
        rates_api: SberRatesPort,
        settings: CacheSettings,
    ) -> None:
        self._redis = redis
        self._rates_api = rates_api
        self._settings = settings

    async def get_rates(self) -> RatesDto:
        key = "sber-rates:current"
        raw = await self._redis.get(key)

        entry: CachedEntry | None = None
        if raw:
            data = json.loads(raw)
            entry = CachedEntry(value=data["value"], cached_at=data["cached_at"])

        if entry is None or self._should_refresh(entry):
            rates = await self._rates_api.fetch_current()
            payload = json.dumps({
                "value": rates.model_dump(),
                "cached_at": time.time(),
            })
            await self._redis.set(key, payload, ex=self._settings.ttl.sber_rates)
            return rates

        return RatesDto(**entry.value)

    def _should_refresh(self, entry: CachedEntry) -> bool:
        age = time.time() - entry.cached_at
        ratio = age / self._settings.ttl.sber_rates
        return random.random() < ratio ** 3

При ratio = 0.5 (половина TTL прошла) вероятность refresh = 12.5%; при ratio = 0.9 — 73%. Нагрузка размазана по времени, пик при expiry устранён.

Что запрещено

АнтипаттернПравилоЧто взамен
Нет защиты от stampede на hot endpointsR-CACHE-STAMP-X1Single-flight + distributed lock или refresh-ahead
asyncio.Lock как защита multi-instance кешаR-CACHE-STAMP-X2redis.lock поверх redis.asyncio
lock.acquire без double-check после полученияR-CACHE-STAMP-2GET key сразу после acquire
Refresh-ahead для миллионов ключейR-CACHE-STAMP-3Distributed lock или probabilistic refresh
Lock без timeout (риск deadlock)R-CACHE-STAMP-2timeout= обязателен в redis.lock
FLUSHDB при холодном старте как «сброс перед заполнением»R-CACHE-INV-X1Точечный DELETE по конкретному ключу

Куда дальше

  • Паттерны — cache-aside, write-through и refresh-ahead для hot keys.
  • TTL — короткий TTL → больше miss-ов → больше stampede; как выбрать порог.
  • Observability — hit rate через prometheus-client, поиск проблемных ключей.
  • Invalidation — DELETE на write, invalidation через domain event handler.
  • Конфигурация — redis.asyncio, JSON-сериализация, per-cache TTL, fail-fast без Redis.
  • Ключи — namespace-префиксы, explicit-ключи, хеширование sensitive.
  • Где кешируем — какие данные кешировать, какие нет.