Опирается на правила: R-CACHE-PATTERN-1R-CACHE-PATTERN-3 и R-CACHE-PATTERN-X1R-CACHE-PATTERN-X2 из Caching Style Guide → раздел 6. Паттерны.

Важно знать

  • Cache-aside (явный get + set + delete на write) — дефолтный паттерн UCP. Нет Spring-декларативности, зато поведение читается прямо в коде.
  • Write-through — явный cache.set на write вместо delete. Для high read+write одного значения, где следующий read нужен немедленно.
  • Refresh-ahead через APScheduler или asyncio-задачу — для критичных hot keys. Перезаливает кеш до истечения TTL, cache miss исключён по дизайну.
  • Write-behind (write в кеш, async в БД позже) — запрещён для money/critical. Crash сервиса до flush = потеря данных.
  • Один cache-namespace = один паттерн. Mix cache-aside и write-through на одном namespace = непонятная invalidation.
  • В Python нет @Cacheable/@CacheEvict — логика кеша явная в хендлере или вынесена в cache-порт. aiocache @cached — сокращает шаблон для простых read-методов.
  • Stampede в Python — asyncio.Lock для одного процесса, redis.lock для distributed. Refresh-ahead исключает stampede по дизайну.

Три паттерна — три точки на спектре «свежесть vs производительность». Cache-aside — простой и дешёвый в поддержке; write-through — кеш всегда свежий ценой двойной записи; refresh-ahead — максимальная защита от cache miss ценой фоновой нагрузки.

Cache-aside — дефолт

R-CACHE-PATTERN-1: read через явный get → miss → load → set; write через явный delete.

GET request
  │
  ├── cache.get(key)
  │     ├── HIT  → return cached value
  │     └── MISS → repo.find() → cache.set(key, value, ttl) → return value
  │
PATCH/DELETE request
  │
  └── repo.save() → cache.delete(key) → следующий GET загрузит свежее

Реализация через явный CachePort в хендлере:

# core/handlers/get_order_summary.py
import orjson
from core.models import OrderSummary
from core.ports.cache_port import CachePort
from core.ports.order_repository import OrderRepository


class GetOrderSummaryHandler:
    _NAMESPACE = "order-summaries"

    def __init__(self, cache: CachePort, repo: OrderRepository, ttl: int) -> None:
        self._cache = cache
        self._repo = repo
        self._ttl = ttl

    async def execute(self, order_id: str) -> OrderSummary:
        key = f"{self._NAMESPACE}:{order_id}"

        raw = await self._cache.get(key)
        if raw is not None:
            return OrderSummary.model_validate(orjson.loads(raw))

        summary = await self._repo.find_summary(order_id)
        await self._cache.set(key, orjson.dumps(summary.model_dump()).decode(), self._ttl)
        return summary
# core/handlers/confirm_order.py
from core.ports.cache_port import CachePort
from core.ports.order_repository import OrderRepository
from core.events import OrderConfirmedEvent


class ConfirmOrderHandler:
    _NAMESPACE = "order-summaries"

    def __init__(self, cache: CachePort, repo: OrderRepository) -> None:
        self._cache = cache
        self._repo = repo

    async def execute(self, order_id: str) -> OrderConfirmedEvent:
        order = await self._repo.find_by_id(order_id)
        order.confirm()
        await self._repo.save(order)
        await self._cache.delete(f"{self._NAMESPACE}:{order_id}")
        return OrderConfirmedEvent(order_id=order_id)

Свойства:

  • Прозрачный код. Логика кеша читается буквально — нет магии декораторов.
  • Кеш отстаёт от БД на TTL. Write делает delete, но между write и следующим read в distributed-среде возможна небольшая гонка. Для большинства сценариев — приемлемо.
  • Холодный старт затратный. После рестарта или массового evict все read-ы промахиваются.
  • Простой fallback. Если Redis недоступен — читаем из репозитория, сервис продолжает работать с повышенной latency.

Для простых read-методов без бизнес-логики aiocache @cached убирает шаблон:

# adapters/out/cache/product_summary_cache.py
from aiocache import cached
from aiocache.serializers import JsonSerializer
from app.config import settings


@cached(
    ttl=settings.cache.namespaces["product-catalog"].ttl_seconds,
    key_builder=lambda f, *args, **kwargs: f"product-catalog:{kwargs['product_id']}",
    serializer=JsonSerializer(),
)
async def get_cached_product_summary(product_id: str) -> dict:
    return await product_repo.find_summary(product_id)

key_builder обязателен — автоматический ключ из всех аргументов ломается при смене сигнатуры (R-CACHE-KEY-X1).

Write-through

R-CACHE-PATTERN-2: на write — cache.set вместо cache.delete, кеш обновляется значением, которое только что записали в БД.

# core/handlers/update_customer_profile.py
import orjson
from core.models import CustomerProfile
from core.ports.cache_port import CachePort
from core.ports.customer_repository import CustomerRepository


class UpdateCustomerProfileHandler:
    _NAMESPACE = "customer-profiles"

    def __init__(self, cache: CachePort, repo: CustomerRepository, ttl: int) -> None:
        self._cache = cache
        self._repo = repo
        self._ttl = ttl

    async def execute(self, customer_id: str, name: str, email: str) -> CustomerProfile:
        profile = await self._repo.find_by_id(customer_id)
        profile.update(name=name, email=email)
        saved = await self._repo.save(profile)

        key = f"{self._NAMESPACE}:{customer_id}"
        await self._cache.set(key, orjson.dumps(saved.model_dump()).decode(), self._ttl)
        return saved

Отличие от cache-aside: cache.set(key, value) вместо cache.delete(key). Следующий GET после PATCH попадает в кеш сразу, без промаха.

Когда применять:

  • High read + write ratio одного значения с continuation. После update_customer_profile тот же клиентский flow сразу читает профиль — write-through даёт ответ из кеша без похода в БД.
  • Write-сторона уверена в финальном значении (не saga, не async event).

Когда не применять:

  • Write проходит через несколько сервисов (saga, outbox). Write-through на первом шаге создаёт stale кеш, пока остальные шаги не завершились.
  • Handler возвращает None или только id — нечего класть в кеш.

Refresh-ahead

R-CACHE-PATTERN-3: фоновая задача перезаливает hot keys до истечения TTL.

Реализация через APScheduler:

# adapters/jobs/top_products_refresh_job.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import orjson
from core.ports.cache_port import CachePort
from core.ports.product_repository import ProductRepository
from app.config import settings


class TopProductsRefreshJob:
    _KEY = "product-catalog:top-100"

    def __init__(self, cache: CachePort, repo: ProductRepository) -> None:
        self._cache = cache
        self._repo = repo
        self._ttl = settings.cache.namespaces["product-catalog"].ttl_seconds

    async def refresh(self) -> None:
        top = await self._repo.find_top_100()
        payload = orjson.dumps([p.model_dump() for p in top]).decode()
        await self._cache.set(self._KEY, payload, self._ttl)
# main.py
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI
from adapters.jobs.top_products_refresh_job import TopProductsRefreshJob


@asynccontextmanager
async def lifespan(app: FastAPI):
    job = TopProductsRefreshJob(cache=app.state.cache, repo=app.state.product_repo)
    scheduler = AsyncIOScheduler()
    scheduler.add_job(job.refresh, "interval", seconds=30)
    scheduler.start()
    await job.refresh()  # заполнить кеш на старте, не ждать первого запроса
    yield
    scheduler.shutdown()

app = FastAPI(lifespan=lifespan)

await job.refresh() до yield — при старте кеш уже заполнен, первый пользователь не платит DB-latency.

Альтернатива без APScheduler — asyncio-задача:

import asyncio


async def refresh_loop(job: TopProductsRefreshJob, interval: int) -> None:
    while True:
        await job.refresh()
        await asyncio.sleep(interval)


# в lifespan:
asyncio.create_task(refresh_loop(job, interval=30))

Свойства:

  • No cache miss for hot keys. Кеш всегда заполнен, пользователи не ждут DB.
  • Постоянная нагрузка на БД. Каждые 30 секунд вне зависимости от read-нагрузки.
  • Только для известных hot keys. product-catalog:top-100, главная страница, dashboard. Для unbounded keys (customer-profiles:{id}) refresh-ahead невозможен.

Combo: refresh-ahead для hot + cache-aside для остального — стандартная стратегия для высоконагруженных систем.

Что запрещено

АнтипаттернПравилоЧто взамен
Write-behind для money/critical (write в кеш, async в БД)R-CACHE-PATTERN-X1write-through или cache-aside
Mix cache-aside и write-through на одном namespaceR-CACHE-PATTERN-X2один namespace = один паттерн
Refresh-ahead для unbounded keysR-CACHE-PATTERN-3cache-aside для всех
Cache-aside без fallback при Redis-outageR-CACHE-PATTERN-1try/except → read из репозитория
aiocache @cached без явного key_builderR-CACHE-KEY-X1key_builder=lambda f, *a, **kw: f"ns:{kw['id']}"
cache.set без TTL (ex=None)R-CACHE-TTL-X1explicit TTL из pydantic-settings
asyncio.Lock как distributed-защита от stampedeR-CACHE-STAMP-X2redis.lock / refresh-ahead

Куда дальше

  • Invalidation — cache.delete на write, invalidation по доменным событиям.
  • Cache stampede — redis.lock для distributed-защиты, refresh-ahead как альтернатива.
  • TTL — типовые значения TTL по характеру данных.
  • Конфигурация — CachePort, redis.asyncio, pydantic-settings.
  • Ключи — namespace-префикс, explicit ключи, sensitive-данные.
  • Где кешируем — read-проекции vs агрегаты, money trade-off.
  • Observability — hit rate, prometheus-client, Redis Exporter.