Опирается на правила:
R-CACHE-STAMP-1…R-CACHE-STAMP-3иR-CACHE-STAMP-X1…R-CACHE-STAMP-X2из Caching Style Guide → раздел 7. Cache stampede.
Важно знать
- Cache stampede (thundering herd) — все параллельные coroutine-ы видят cache miss на истёкшем ключе и одновременно уходят в БД, которая получает N одинаковых тяжёлых запросов вместо одного.
- Single-flight (
asyncio.Lock+dictin-flight) — Python-идиома для одного процесса: повторные coroutine-ы одного ключа ждут первогоawait, не запускают второй.asyncio.Lockне виден другим процессам (R-CACHE-STAMP-X2): в multi-instance деплое каждый worker Uvicorn запустит свой запрос. Нужен distributed lock —redis.lock/aioredlock.- Hot keys (главная, top-100, курсы валют) — лучшая защита — refresh-ahead через APScheduler или
asynciobackground task: cache всегда заполнен, stampede невозможен по дизайну.- Probabilistic refresh — обновление до истечения TTL с вероятностью, растущей по мере приближения к expiry. Размазывает момент обновления, убирает единый пик.
- Игнорировать stampede для hot endpoints (>100 RPS) — гарантированный DB-инцидент при рестарте Redis или массовом evict (
R-CACHE-STAMP-X1).R-CACHE-STAMP-X2:asyncio.Lockне защищает distributed cache — используйredis.lockповерхredis.asyncio.
Cache stampede — частая причина каскадных отказов после рестарта Redis, после FLUSHDB по ошибке или после деплоя с новыми именами ключей. UCP формулирует три уровня защиты под Python: single-flight для одного процесса, distributed lock для multi-instance и refresh-ahead для известных hot keys.
Что такое stampede
Сценарий: ключ top-products:global истёк в 14:00.
T=0 worker-1 GET → cache MISS → SELECT TOP 100 ... (тяжёлый запрос, 250ms)
T=15ms worker-2 GET → cache MISS → SELECT TOP 100 ... (не знает о worker-1)
T=30ms worker-3 GET → cache MISS → SELECT TOP 100 ...
...
T=200ms worker-12 GET → cache MISS → SELECT TOP 100 ...
T=250ms worker-1 → SET top-products:global ...
T=260ms worker-2 → SET top-products:global ... — поверх worker-1
БД получила 12 одинаковых тяжёлых запросов вместо одного. При 500 RPS в момент expiry это легко сотни параллельных coroutine-ов — БД начинает отказывать, latency растёт у всех downstream.
Один процесс — single-flight через asyncio.Lock
R-CACHE-STAMP-1: Python-идиома для одного процесса — dict[str, asyncio.Future] in-flight задач и asyncio.Lock на ключ. Параллельные coroutine-ы одного ключа подключаются к единственному уже запущенному await.
# adapters/out/cache/product_cache_adapter.py
import asyncio
import json
from typing import Optional
import redis.asyncio as aioredis
from core.ports.out.product_repository import ProductRepository
from core.dto.top_products_dto import TopProductsDto
from config.cache_config import CacheSettings
class ProductCacheAdapter:
def __init__(
self,
redis: aioredis.Redis,
product_repo: ProductRepository,
settings: CacheSettings,
) -> None:
self._redis = redis
self._product_repo = product_repo
self._settings = settings
self._in_flight: dict[str, asyncio.Future] = {}
self._lock = asyncio.Lock()
async def get_top_products(self) -> list[TopProductsDto]:
key = "top-products:global"
raw = await self._redis.get(key)
if raw:
return [TopProductsDto(**item) for item in json.loads(raw)]
async with self._lock: # R-CACHE-STAMP-1: single-flight
if key in self._in_flight:
return await self._in_flight[key]
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._in_flight[key] = future
try:
result = await self._load_and_set(key)
future.set_result(result)
return result
except Exception as exc:
future.set_exception(exc)
raise
finally:
self._in_flight.pop(key, None)
async def _load_and_set(self, key: str) -> list[TopProductsDto]:
products = await self._product_repo.find_top_100()
await self._redis.set(
key,
json.dumps([p.model_dump() for p in products]),
ex=self._settings.ttl.top_products,
)
return products
asyncio.Lock в сочетании с _in_flight dict гарантирует: только одна coroutine запускает _load_and_set, остальные awaiting той же Future. pop в finally обеспечивает очистку при любом исходе — следующий промах запустит новую загрузку.
Ограничение: asyncio.Lock живёт в памяти одного процесса Uvicorn. Для multi-worker или multi-instance деплоя каждый worker запустит свой _load_and_set — stampede между процессами сохраняется.
Multi-instance — distributed lock через redis.lock
R-CACHE-STAMP-2: для Redis-backed cache с несколькими воркерами нужен distributed lock. redis.asyncio предоставляет redis.lock.Lock с поддержкой async with.
# adapters/out/cache/order_summary_cache_adapter.py
import asyncio
import json
import redis.asyncio as aioredis
from redis.asyncio.lock import Lock as RedisLock
from core.ports.out.order_repository import OrderRepository
from core.dto.order_summary_dto import OrderSummaryDto
from config.cache_config import CacheSettings
class OrderSummaryCacheAdapter:
def __init__(
self,
redis: aioredis.Redis,
order_repo: OrderRepository,
settings: CacheSettings,
) -> None:
self._redis = redis
self._order_repo = order_repo
self._settings = settings
async def get_order_summary(self, order_id: str) -> OrderSummaryDto:
key = f"order-summary:{order_id}"
raw = await self._redis.get(key)
if raw:
return OrderSummaryDto(**json.loads(raw))
lock_key = f"lock:{key}"
# R-CACHE-STAMP-2: distributed lock — виден всем инстансам
lock: RedisLock = self._redis.lock(
lock_key,
timeout=self._settings.lock.ttl_seconds,
blocking_timeout=0.15,
)
acquired = await lock.acquire(blocking=True)
if not acquired:
# не дождались lock — возвращаем прямой запрос как fallback
return await self._order_repo.find_summary(order_id)
try:
# double-check: пока ждали lock, другой worker уже заполнил cache
raw = await self._redis.get(key)
if raw:
return OrderSummaryDto(**json.loads(raw))
summary = await self._order_repo.find_summary(order_id)
await self._redis.set(
key,
summary.model_dump_json(),
ex=self._settings.ttl.order_summary,
)
return summary
finally:
await lock.release()
blocking_timeout=0.15 — worker ждёт lock максимум 150ms. Если не получил — fallback на прямой запрос в БД. Double-check после acquire критичен: пока этот worker ждал, другой уже заполнил cache.
Комбинация: single-flight + distributed lock
Для максимальной защиты — оба слоя вместе: asyncio.Lock сокращает внутрипроцессные запросы до одного, redis.lock защищает от cross-process stampede.
# adapters/out/cache/customer_profile_cache_adapter.py
import asyncio
import json
import redis.asyncio as aioredis
from core.ports.out.customer_repository import CustomerRepository
from core.dto.customer_profile_dto import CustomerProfileDto
from config.cache_config import CacheSettings
class CustomerProfileCacheAdapter:
def __init__(
self,
redis: aioredis.Redis,
customer_repo: CustomerRepository,
settings: CacheSettings,
) -> None:
self._redis = redis
self._customer_repo = customer_repo
self._settings = settings
self._in_flight: dict[str, asyncio.Future] = {}
self._local_lock = asyncio.Lock()
async def get_customer_profile(self, customer_id: str) -> CustomerProfileDto:
key = f"customer-profiles:{customer_id}"
raw = await self._redis.get(key)
if raw:
return CustomerProfileDto(**json.loads(raw))
async with self._local_lock:
if key in self._in_flight:
return await self._in_flight[key]
future: asyncio.Future = asyncio.get_event_loop().create_future()
self._in_flight[key] = future
try:
result = await self._load_with_distributed_lock(key, customer_id)
future.set_result(result)
return result
except Exception as exc:
future.set_exception(exc)
raise
finally:
self._in_flight.pop(key, None)
async def _load_with_distributed_lock(
self, key: str, customer_id: str
) -> CustomerProfileDto:
lock = self._redis.lock(
f"lock:{key}",
timeout=self._settings.lock.ttl_seconds,
blocking_timeout=0.15,
)
acquired = await lock.acquire(blocking=True)
if not acquired:
return await self._customer_repo.find_profile(customer_id)
try:
raw = await self._redis.get(key)
if raw:
return CustomerProfileDto(**json.loads(raw))
profile = await self._customer_repo.find_profile(customer_id)
await self._redis.set(
key,
profile.model_dump_json(),
ex=self._settings.ttl.customer_profile,
)
return profile
finally:
await lock.release()
Hot keys — refresh-ahead
R-CACHE-STAMP-3: для известных горячих ключей (главная, каталог, курсы Сбера) лучший подход — держать cache всегда заполненным. APScheduler или FastAPI lifespan background task обновляет ключ до истечения TTL.
# adapters/out/cache/product_refresh_job.py
import asyncio
import json
import logging
import redis.asyncio as aioredis
from core.ports.out.product_repository import ProductRepository
from config.cache_config import CacheSettings
logger = logging.getLogger(__name__)
class ProductRefreshJob:
def __init__(
self,
redis: aioredis.Redis,
product_repo: ProductRepository,
settings: CacheSettings,
) -> None:
self._redis = redis
self._product_repo = product_repo
self._settings = settings
self._task: asyncio.Task | None = None
def start(self) -> None:
self._task = asyncio.create_task(self._refresh_loop())
def stop(self) -> None:
if self._task:
self._task.cancel()
async def _refresh_loop(self) -> None:
while True:
try:
products = await self._product_repo.find_top_100()
await self._redis.set(
"top-products:global",
json.dumps([p.model_dump() for p in products]),
ex=self._settings.ttl.top_products,
)
logger.debug("top-products:global refreshed")
except Exception:
logger.exception("top-products refresh failed")
await asyncio.sleep(self._settings.refresh_interval.top_products)
Подключение через FastAPI lifespan:
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from adapters.out.cache.product_refresh_job import ProductRefreshJob
@asynccontextmanager
async def lifespan(app: FastAPI):
job: ProductRefreshJob = app.state.product_refresh_job
job.start()
yield
job.stop()
app = FastAPI(lifespan=lifespan)
Cache всегда заполнен → stampede невозможен по дизайну. Интервал обновления (refresh_interval.top_products) должен быть вдвое меньше TTL, чтобы не возникло окна между expiry и следующим refresh.
Refresh-ahead применим только для ограниченного набора известных ключей — если ключей миллионы (по одному на заказ, на покупателя), фоновое обновление всех невозможно. Для таких случаев — distributed lock или probabilistic refresh.
Probabilistic refresh
Для ключей менее предсказуемых, чем top-products, но всё равно горячих: обновлять до истечения TTL с вероятностью, растущей по мере приближения к expiry.
# adapters/out/cache/sber_rates_cache_adapter.py
import json
import random
import time
from dataclasses import dataclass
from typing import Any
import redis.asyncio as aioredis
from core.ports.out.sber_rates_port import SberRatesPort
from core.dto.rates_dto import RatesDto
from config.cache_config import CacheSettings
@dataclass
class CachedEntry:
value: dict[str, Any]
cached_at: float
class SberRatesCacheAdapter:
def __init__(
self,
redis: aioredis.Redis,
rates_api: SberRatesPort,
settings: CacheSettings,
) -> None:
self._redis = redis
self._rates_api = rates_api
self._settings = settings
async def get_rates(self) -> RatesDto:
key = "sber-rates:current"
raw = await self._redis.get(key)
entry: CachedEntry | None = None
if raw:
data = json.loads(raw)
entry = CachedEntry(value=data["value"], cached_at=data["cached_at"])
if entry is None or self._should_refresh(entry):
rates = await self._rates_api.fetch_current()
payload = json.dumps({
"value": rates.model_dump(),
"cached_at": time.time(),
})
await self._redis.set(key, payload, ex=self._settings.ttl.sber_rates)
return rates
return RatesDto(**entry.value)
def _should_refresh(self, entry: CachedEntry) -> bool:
age = time.time() - entry.cached_at
ratio = age / self._settings.ttl.sber_rates
return random.random() < ratio ** 3
При ratio = 0.5 (половина TTL прошла) вероятность refresh = 12.5%; при ratio = 0.9 — 73%. Нагрузка размазана по времени, пик при expiry устранён.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Нет защиты от stampede на hot endpoints | R-CACHE-STAMP-X1 | Single-flight + distributed lock или refresh-ahead |
asyncio.Lock как защита multi-instance кеша | R-CACHE-STAMP-X2 | redis.lock поверх redis.asyncio |
lock.acquire без double-check после получения | R-CACHE-STAMP-2 | GET key сразу после acquire |
| Refresh-ahead для миллионов ключей | R-CACHE-STAMP-3 | Distributed lock или probabilistic refresh |
Lock без timeout (риск deadlock) | R-CACHE-STAMP-2 | timeout= обязателен в redis.lock |
FLUSHDB при холодном старте как «сброс перед заполнением» | R-CACHE-INV-X1 | Точечный DELETE по конкретному ключу |
Куда дальше
- Паттерны — cache-aside, write-through и refresh-ahead для hot keys.
- TTL — короткий TTL → больше miss-ов → больше stampede; как выбрать порог.
- Observability — hit rate через
prometheus-client, поиск проблемных ключей. - Invalidation —
DELETEна write, invalidation через domain event handler. - Конфигурация —
redis.asyncio, JSON-сериализация, per-cache TTL, fail-fast без Redis. - Ключи — namespace-префиксы, explicit-ключи, хеширование sensitive.
- Где кешируем — какие данные кешировать, какие нет.