Опирается на правила:
GOTEST-19…GOTEST-22из Go Test Strategy → раздел 6. Kafka, Redis, async — по умолчанию НЕТ.
Важно знать
- Kafka не поднимаем в интеграционных —
GOTEST-X7; проверяемoutbox-строки черезDatabasePreparer.FindOutboxEvents.- Redis не поднимаем — build-tag
integrationили env-переменная подменяетcache.ClientнаNoopCache.- Idempotent consumer тестируем напрямую:
handler.Handle(ctx, testMsg), без брокера.- Outbox-relay вызываем синхронно в тесте:
relay.ProcessPending(ctx), не ждём фонового тика.testify/assert.Eventuallyиtime.Sleepзапрещены — признак недетерминированного дизайна (GOTEST-X1).- Цель — детерминированные, быстрые тесты без async-рассинхронизации.
- Если Kafka реально нужна — отдельный build-tag
e2e, отдельный CI-этап.
Главный принцип UCP-тестов в Go: синхронность. Каждое async-добавление (Kafka producer/consumer, Redis cache, фоновый воркер) вносит неопределённость в момент завершения операции. В Go все они заменяются на прямую проверку строк в БД или на явный вызов объекта-обработчика.
Kafka — не поднимаем
GOTEST-19: Outbox-проверка вместо Kafka-брокера.
В production-коде handler публикует событие в outbox:
func (s *OrderService) Create(ctx context.Context, cmd CreateOrderCommand) (*Order, error) {
order := newOrder(cmd.CustomerID, cmd.Amount, s.clock.Now(), s.ids.Next())
if err := s.repo.Save(ctx, order); err != nil {
return nil, err
}
event := outbox.Event{
AggregateType: "Order",
AggregateID: order.ID,
EventType: "ORDER_CREATED",
Payload: map[string]any{"customerId": order.CustomerID, "amount": order.Amount},
}
if err := s.outboxRepo.Append(ctx, event); err != nil {
return nil, err
}
return order, nil
}
В тесте Kafka не нужна и relay не запускается. Достаточно проверить, что строка появилась в outbox:
func TestCreateOrder_PublishesToOutbox(t *testing.T) {
srv, prep := newTestServer(t)
prep.Clear(t).CreateCustomer(t, "c1")
// Act
resp, err := http.Post(srv.URL+"/orders", "application/json",
strings.NewReader(`{"customerId":"c1","amount":100}`))
require.NoError(t, err)
require.Equal(t, http.StatusCreated, resp.StatusCode)
// Assert
events := prep.FindOutboxEvents(t, "ORDER_CREATED")
require.Len(t, events, 1)
assert.Equal(t, "c1", events[0].Payload["customerId"])
}
FindOutboxEvents — метод OrderDatabasePreparer, читает outbox напрямую через pgx:
func (p *OrderDatabasePreparer) FindOutboxEvents(t *testing.T, eventType string) []OutboxRow {
t.Helper()
rows, err := p.pool.Query(context.Background(),
"SELECT payload FROM outbox WHERE event_type = $1", eventType)
require.NoError(t, err)
defer rows.Close()
var result []OutboxRow
for rows.Next() {
var row OutboxRow
require.NoError(t, rows.Scan(&row.Payload))
result = append(result, row)
}
return result
}
Что даёт этот подход:
- Синхронность — outbox строка создана внутри той же транзакции, видна сразу.
- Полная проверка — event_type и payload проверяются точно.
- Скорость — нет ожидания Kafka producer flush или consumer lag.
Что не проверяется: что Kafka доставила сообщение consumer-у. Это задача E2E-уровня.
Redis — не поднимаем
GOTEST-20: build-tag или env-профиль подменяет cache.Client на NoopCache.
Определяем интерфейс кеша в домене:
type CacheClient interface {
Get(ctx context.Context, key string) ([]byte, error)
Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
}
Реальная Redis-реализация используется в production. В тестах — NoopCache, который просто ничего не делает:
type NoopCache struct{}
func (n *NoopCache) Get(_ context.Context, _ string) ([]byte, error) {
return nil, nil
}
func (n *NoopCache) Set(_ context.Context, _ string, _ []byte, _ time.Duration) error {
return nil
}
Подключение через env или конструктор в newTestServer:
func newTestServer(t *testing.T) (*httptest.Server, *OrderDatabasePreparer) {
t.Helper()
pool, _ := pgxpool.New(context.Background(), testDSN)
t.Cleanup(pool.Close)
q := db.New(pool)
repo := order.NewRepository(q)
cache := &NoopCache{}
clock := &fixedClock{at: time.Date(2025, 3, 10, 12, 0, 0, 0, time.UTC)}
ids := &seqIDGenerator{}
svc := order.NewService(repo, cache, clock, ids)
r := chi.NewRouter()
orderhttp.Mount(r, svc)
srv := httptest.NewServer(r)
t.Cleanup(srv.Close)
return srv, &OrderDatabasePreparer{pool: pool}
}
@Cacheable-аналогов в Go нет — кеш явно вызывается в Service или Repository. NoopCache делает эти вызовы прозрачными: тест проверяет бизнес-логику, не поведение Redis.
Если нужно специально тестировать cache-aside или eviction — отдельный тест с sync.Map-реализацией интерфейса, не Redis-контейнер.
Idempotent consumer — тест как объект
GOTEST-21: handler вызывается напрямую, без брокера.
В production consumer принимает сообщение из Kafka и делегирует handler-у:
func (c *PaymentEventConsumer) Run(ctx context.Context) {
for msg := range c.reader.Messages(ctx) {
if err := c.handler.Handle(ctx, msg); err != nil {
c.logger.Error("handle payment event", "err", err)
}
}
}
PaymentEventHandler реализует идемпотентность через processed_event-таблицу:
func (h *PaymentEventHandler) Handle(ctx context.Context, msg kafka.Message) error {
var event PaymentChargedEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
return err
}
marked, err := h.processedRepo.TryMark(ctx, event.EventID, "order-payment")
if err != nil {
return err
}
if !marked {
return nil
}
order, err := h.orderRepo.FindBySagaID(ctx, event.SagaID)
if err != nil {
return err
}
order.ApplyPayment(event.Amount)
return h.orderRepo.Save(ctx, order)
}
Тест вызывает handler.Handle напрямую — нет необходимости поднимать брокер:
func TestPaymentEventHandler_Handle_WhenNewEvent_UpdatesOrderToPaid(t *testing.T) {
pool := testPool(t)
prep := &OrderDatabasePreparer{pool: pool}
prep.Clear(t).CreateCustomer(t, "c1").CreateOrder(t, "o1", "c1", "AWAITING_PAYMENT")
q := db.New(pool)
handler := payment.NewEventHandler(
order.NewRepository(q),
processed.NewRepository(q),
)
// Arrange
msg := kafka.Message{
Value: mustJSON(t, PaymentChargedEvent{
EventID: "evt-1",
SagaID: "saga-1",
Amount: 100,
}),
}
// Act
err := handler.Handle(context.Background(), msg)
require.NoError(t, err)
// Assert
row := prep.FindOrder(t, "o1")
assert.Equal(t, "PAID", row.Status)
}
Второй вызов с тем же EventID — проверяем идемпотентность:
func TestPaymentEventHandler_Handle_WhenDuplicateEvent_IsIdempotent(t *testing.T) {
pool := testPool(t)
prep := &OrderDatabasePreparer{pool: pool}
prep.Clear(t).CreateCustomer(t, "c1").CreateOrder(t, "o1", "c1", "AWAITING_PAYMENT")
q := db.New(pool)
handler := payment.NewEventHandler(
order.NewRepository(q),
processed.NewRepository(q),
)
msg := kafka.Message{
Value: mustJSON(t, PaymentChargedEvent{
EventID: "evt-dup",
SagaID: "saga-1",
Amount: 100,
}),
}
// Act — первый вызов
require.NoError(t, handler.Handle(context.Background(), msg))
// Act — повторный вызов с тем же EventID
require.NoError(t, handler.Handle(context.Background(), msg))
// Assert — статус изменился ровно один раз
row := prep.FindOrder(t, "o1")
assert.Equal(t, "PAID", row.Status)
}
Outbox-relay — вызываем синхронно
GOTEST-22: relay.ProcessPending(ctx) вместо ожидания фонового тика.
Outbox-relay в production работает как фоновый воркер с тикером:
func (r *OutboxRelay) Run(ctx context.Context) {
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
r.ProcessPending(ctx)
}
}
}
ProcessPending выбирает необработанные строки из outbox и публикует в Kafka:
func (r *OutboxRelay) ProcessPending(ctx context.Context) {
events, err := r.outboxRepo.FindPending(ctx, r.batchSize)
if err != nil {
r.logger.Error("find pending outbox events", "err", err)
return
}
for _, evt := range events {
if err := r.producer.Publish(ctx, evt); err != nil {
r.logger.Error("publish outbox event", "err", err, "id", evt.ID)
continue
}
_ = r.outboxRepo.MarkPublished(ctx, evt.ID)
}
}
В тесте не запускаем Run. Вызываем ProcessPending напрямую после того, как HTTP-запрос создал события:
func TestOutboxRelay_ProcessPending_PublishesOrderCreatedEvent(t *testing.T) {
pool := testPool(t)
prep := &OrderDatabasePreparer{pool: pool}
prep.Clear(t).CreateCustomer(t, "c1")
kafkaStub := &inMemoryKafkaProducer{}
relay := outbox.NewRelay(
outbox.NewRepository(db.New(pool)),
kafkaStub,
10,
)
srv, _ := newTestServerWithPool(t, pool)
// Arrange — создаём заказ через HTTP
resp, err := http.Post(srv.URL+"/orders", "application/json",
strings.NewReader(`{"customerId":"c1","amount":100}`))
require.NoError(t, err)
require.Equal(t, http.StatusCreated, resp.StatusCode)
// Act — вызываем relay синхронно
relay.ProcessPending(context.Background())
// Assert — Kafka producer получил событие
require.Len(t, kafkaStub.Messages, 1)
assert.Equal(t, "ORDER_CREATED", kafkaStub.Messages[0].EventType)
}
inMemoryKafkaProducer — простая in-memory реализация интерфейса producer-а:
type inMemoryKafkaProducer struct {
Messages []outbox.Event
}
func (p *inMemoryKafkaProducer) Publish(_ context.Context, evt outbox.Event) error {
p.Messages = append(p.Messages, evt)
return nil
}
Обратите внимание: в этом тесте мы проверяем именно relay-логику (что он читает pending и вызывает producer). Сам Kafka-брокер по-прежнему не нужен.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
testcontainers-go для Kafka/Redis в интеграционном тесте | GOTEST-X7 | Outbox-проверка, NoopCache |
testify/assert.Eventually для ожидания consumer-а | GOTEST-X1 | handler.Handle(ctx, msg) напрямую |
time.Sleep для ожидания relay-тика | GOTEST-X1 | relay.ProcessPending(ctx) явно |
Мок KafkaProducer интерфейса через testify/mock в relay-тесте | GOTEST-X7 | inMemoryKafkaProducer — простая реализация |
| Проверка Kafka-доставки через polling | GOTEST-X1 | outbox-таблица + in-memory producer в relay-тесте |
| Redis-контейнер для проверки cache-aside | GOTEST-X7 | отдельный unit с sync.Map-реализацией |
os.Setenv("KAFKA_BROKERS", ...) в тесте без сброса | GOTEST-X7 | NoopCache/in-memory через конструктор newTestServer |
Куда дальше
- Go: базовая инфраструктура — TestMain и Testcontainers — как устроен
TestMainиnewTestServer. - Go: DatabasePreparer — fluent-методы
FindOutboxEvents,Clear,CreateOrder. - Go: детерминизм — Clock и IDGenerator —
fixedClock,seqIDGenerator. - Пирамида тестов Go — где Kafka E2E и build-tag
e2e.