Separating writes and reads is easy. Harder is explaining how the changes that happened on the write side make it into the read model. If the link is unreliable, the read model "lags" forever: the data is stale and the client is unaware of it.
Here we'll work through the standard scheme for the Python stack: outbox + Kafka + idempotent consumer.
Why you can't just "publish the event after the commit"
It seems logical: we saved the aggregate, the commit went through — we sent the event to Kafka. But there are two failure scenarios here:
- The commit went through, and at that moment Kafka is unavailable — the event is lost, the read model diverges forever.
- Kafka received the event, but the commit failed with an error — the read model got data that doesn't exist on the write side.
Both cases break the system in different ways, and both are hard to diagnose.
The outbox pattern solves the problem differently: the event is written to the same database, in the same transaction, as the change to the aggregate. A separate process (the relay) reads this table and publishes to Kafka. As long as the record sits in the outbox, the relay will keep trying to deliver it. The transaction failed — neither the aggregate nor the event was saved. Kafka went down — the event went nowhere, it's in the database.
Outbox: saving the event together with the aggregate
When the command handler saves the aggregate, the repository takes all of the aggregate's "uncommitted" events and writes them into the outbox table in the same AsyncSession:
# adapter/out/persistence/order_repository.py
class SqlAlchemyOrderRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def save(self, order: Order) -> None:
orm_row = to_orm(order)
await self._session.merge(orm_row)
for event in order.pull_events():
await self._session.execute(
insert(OutboxRow).values(
id=uuid4(),
event_type=type(event).__name__,
payload=to_json(event),
aggregate_id=str(order.id.value),
created_at=utcnow(),
published=False,
)
)
The command handler simply calls save and commit — it knows nothing about the write to the outbox:
# core/order/usecase/confirm_order.py
class ConfirmOrderHandler:
def __init__(self, orders: OrderRepository, uow: UnitOfWork, clock: Clock) -> None:
self._orders = orders
self._uow = uow
self._clock = clock
async def handle(self, cmd: ConfirmOrder) -> OrderId:
async with self._uow:
order = await self._orders.by_id(cmd.order_id)
order.confirm(self._clock)
await self._orders.save(order)
await self._uow.commit()
return order.id
The event itself is a plain dataclass, not an ORM model:
@dataclass(frozen=True)
class OrderConfirmed:
event_id: UUID
order_id: OrderId
status: str
confirmed_at: datetime
aggregate_version: int
Important: the event must not be tied to an ORM class of the write schema. If the structure of the orders table changes, the event format stays the same — otherwise any migration would break the readers.
Relay: delivering from the outbox to Kafka
The relay is an asyncio task (or a separate worker) that periodically takes unpublished rows from the outbox and sends them to Kafka:
async def relay_loop(session_factory, producer):
while True:
async with session_factory() as session:
async with session.begin():
rows = (await session.scalars(
select(OutboxRow)
.where(OutboxRow.published == False)
.order_by(OutboxRow.created_at)
.limit(100)
.with_for_update(skip_locked=True)
)).all()
for row in rows:
await producer.send_and_wait(
topic="order-events",
key=row.aggregate_id.encode(),
value=row.payload.encode(),
)
row.published = True
await asyncio.sleep(0.1)
SKIP LOCKED lets you run several relay workers in parallel without conflicts. The Kafka producer is configured with enable_idempotence=True — this is protection against duplicates at the Kafka level.
Idempotent consumer: what to do when an event arrives twice
Kafka guarantees "at-least-once" delivery. In practice, a single message may arrive twice: a network failure, a partition rebalance. If the consumer is not protected against duplicates, the read model gets updated twice — and this can lead to an incorrect state.
Two ways to protect against it:
A table of processed events
We store (event_id, consumer) — the pair we've already seen. Before updating the read model we check:
async def on_order_confirmed(self, raw: bytes) -> None:
event: OrderConfirmed = from_json(raw, OrderConfirmed)
async with self._session_factory() as session:
async with session.begin():
already = await session.scalar(
select(ProcessedEventRow).where(
ProcessedEventRow.event_id == event.event_id,
ProcessedEventRow.consumer == "order-summary-projector",
)
)
if already:
return
session.add(ProcessedEventRow(
event_id=event.event_id,
consumer="order-summary-projector",
))
await session.execute(
update(OrderSummaryRow)
.where(OrderSummaryRow.order_id == event.order_id.value)
.values(status=event.status, confirmed_at=event.confirmed_at)
)
A precise guarantee, but every event adds a row to the table.
UPDATE by aggregate version
If the event carries an aggregate_version, you can apply the UPDATE only when the event is newer than the current state:
async def on_order_confirmed(self, raw: bytes) -> None:
event: OrderConfirmed = from_json(raw, OrderConfirmed)
async with self._session_factory() as session:
async with session.begin():
result = await session.execute(
update(OrderSummaryRow)
.where(
OrderSummaryRow.order_id == event.order_id.value,
OrderSummaryRow.version < event.aggregate_version,
)
.values(
status=event.status,
confirmed_at=event.confirmed_at,
version=event.aggregate_version,
)
)
if result.rowcount == 0:
logger.debug("skip stale or duplicate: event_id=%s", event.event_id)
No extra table is needed. Suitable when events of one aggregate go into the same Kafka partition (with aggregate_id as the key).
Rebuilding the read model after a failure
A new read model or a loss of data after a failure is no reason to wait for all events to replay from Kafka again (the history may be large). It's simpler to take the data straight from the write side:
class OrderSummaryBootstrap:
async def run_if_empty(self) -> None:
async with self._read() as session:
count = await session.scalar(select(func.count()).select_from(OrderSummaryRow))
if count:
return
await self._rebuild_all()
async def _rebuild_all(self) -> None:
last_id = 0
while True:
async with self._write() as session:
rows = (await session.scalars(
select(OrderRow)
.where(OrderRow.id > last_id)
.order_by(OrderRow.id)
.limit(500)
)).all()
if not rows:
break
async with self._read() as session:
async with session.begin():
for row in rows:
await session.merge(to_summary(row))
last_id = rows[-1].id
It is launched in the FastAPI lifespan before traffic starts being accepted:
@asynccontextmanager
async def lifespan(app: FastAPI):
await order_summary_bootstrap.run_if_empty()
async with kafka_relay_task():
yield
How to explain to the client that data may lag
The read projection is not updated instantly. A client who just created an order and immediately requested it from the read model may get stale data or even a 404. This is normal — but it needs to be stated explicitly in the API documentation.
In FastAPI this is done right in the endpoint's docstring:
@router.get(
"/orders/{order_id}/summary",
response_model=OrderSummaryResponse,
)
async def get_order_summary(order_id: UUID, ...) -> OrderSummaryResponse:
"""
Returns the read projection of the order from the order_summary table.
**Eventual consistency**: a delay of up to 1 second is possible between the write
and the appearance of the update in this projection.
For immediate consistency, use GET /orders/{order_id} —
the full aggregate from the write store.
"""
...
Swagger UI shows this description automatically — there's no need to explain it in separate documentation.
Read-your-writes: when the client needs to see its own data right away
Sometimes the requirement is clear-cut: after a write, the client must immediately get up-to-date data. Three options, in increasing order of reliability:
Sticky session in the gateway. Requests from one client are routed to one pod. Works only if the read model is a local cache of that same service. Fragile, doesn't scale across services.
Polling in the command handler. After the commit we wait until the record appears in the read projection:
async def handle(self, cmd: ConfirmOrder) -> OrderId:
async with self._uow:
order = await self._orders.by_id(cmd.order_id)
order.confirm(self._clock)
await self._orders.save(order)
await self._uow.commit()
await self._poll_until_visible(order.id, timeout=2.0)
return order.id
async def _poll_until_visible(self, order_id: OrderId, timeout: float) -> None:
deadline = monotonic() + timeout
while monotonic() < deadline:
if await self._summary_view.exists(order_id):
return
await asyncio.sleep(0.05)
Downside: the real latency of the POST request becomes p99 = 2 seconds. Not suitable for frequent operations.
A separate endpoint from the write store. The simplest and most honest solution: for scenarios that require immediate consistency — a separate endpoint that reads directly from the write side.
@router.get("/orders/{order_id}/summary")
async def get_order_summary(...):
"""Eventual consistency, from the read projection."""
...
@router.get("/orders/{order_id}")
async def get_order(...):
"""Immediate consistency, from the write store."""
...
Two endpoints explicitly show the trade-off to the client. In most cases this is the right choice.
Common mistakes
A synchronous write to the read table inside the command transaction. It seems convenient: we updated the aggregate — and immediately updated the projection. In reality it destroys the whole point of the separation: on a transaction rollback the read model diverges, and as the load grows the locks become a bottleneck.
A PG trigger from the write table onto the read table. Invisible logic that breaks on bulk operations and doesn't carry over to another database. An explicit consumer in Python is always better.
The event payload is an ORM model. If the event carries an OrderRow, then any migration of the table schema will break the consumer. The event must be an independent @dataclass with its own versioning.
A consumer without duplicate protection. At-least-once means duplicates will happen. Without idempotency protection, the read model will be corrupted on the very first retry.
In short
- Write and read in CQRS are linked by outbox + Kafka + consumer, not a direct write to the read table.
- The outbox write goes in the same transaction as the change to the aggregate — so there's never "an event exists but no data" or the reverse.
- The consumer must be idempotent: use a
processed_eventtable or an UPDATE by aggregate version. - A new read model is rebuilt with a batch query from the write side, not from Kafka history.
- The delay (eventual consistency) is declared in the endpoint's docstring — the client shouldn't have to guess.
- For "see my own data right away" scenarios, a separate endpoint from the write store is simpler than polling.
What to read next
- Command side — how an event is registered in the aggregate and the handler.
- Query side — how the read-only repository is arranged.
- Read-model — where and in what form the projection is stored.
- When CQRS is justified — when it's worth applying and when it isn't.