Опирается на правила: GOTEST-19GOTEST-22 из Go Test Strategy → раздел 6. Kafka, Redis, async — по умолчанию НЕТ.

Важно знать

  • Kafka не поднимаем в интеграционных — GOTEST-X7; проверяем outbox-строки через DatabasePreparer.FindOutboxEvents.
  • Redis не поднимаем — build-tag integration или env-переменная подменяет cache.Client на NoopCache.
  • Idempotent consumer тестируем напрямую: handler.Handle(ctx, testMsg), без брокера.
  • Outbox-relay вызываем синхронно в тесте: relay.ProcessPending(ctx), не ждём фонового тика.
  • testify/assert.Eventually и time.Sleep запрещены — признак недетерминированного дизайна (GOTEST-X1).
  • Цель — детерминированные, быстрые тесты без async-рассинхронизации.
  • Если Kafka реально нужна — отдельный build-tag e2e, отдельный CI-этап.

Главный принцип UCP-тестов в Go: синхронность. Каждое async-добавление (Kafka producer/consumer, Redis cache, фоновый воркер) вносит неопределённость в момент завершения операции. В Go все они заменяются на прямую проверку строк в БД или на явный вызов объекта-обработчика.

Kafka — не поднимаем

GOTEST-19: Outbox-проверка вместо Kafka-брокера.

В production-коде handler публикует событие в outbox:

func (s *OrderService) Create(ctx context.Context, cmd CreateOrderCommand) (*Order, error) {
    order := newOrder(cmd.CustomerID, cmd.Amount, s.clock.Now(), s.ids.Next())

    if err := s.repo.Save(ctx, order); err != nil {
        return nil, err
    }

    event := outbox.Event{
        AggregateType: "Order",
        AggregateID:   order.ID,
        EventType:     "ORDER_CREATED",
        Payload:       map[string]any{"customerId": order.CustomerID, "amount": order.Amount},
    }
    if err := s.outboxRepo.Append(ctx, event); err != nil {
        return nil, err
    }

    return order, nil
}

В тесте Kafka не нужна и relay не запускается. Достаточно проверить, что строка появилась в outbox:

func TestCreateOrder_PublishesToOutbox(t *testing.T) {
    srv, prep := newTestServer(t)
    prep.Clear(t).CreateCustomer(t, "c1")

    // Act
    resp, err := http.Post(srv.URL+"/orders", "application/json",
        strings.NewReader(`{"customerId":"c1","amount":100}`))
    require.NoError(t, err)
    require.Equal(t, http.StatusCreated, resp.StatusCode)

    // Assert
    events := prep.FindOutboxEvents(t, "ORDER_CREATED")
    require.Len(t, events, 1)
    assert.Equal(t, "c1", events[0].Payload["customerId"])
}

FindOutboxEvents — метод OrderDatabasePreparer, читает outbox напрямую через pgx:

func (p *OrderDatabasePreparer) FindOutboxEvents(t *testing.T, eventType string) []OutboxRow {
    t.Helper()
    rows, err := p.pool.Query(context.Background(),
        "SELECT payload FROM outbox WHERE event_type = $1", eventType)
    require.NoError(t, err)
    defer rows.Close()

    var result []OutboxRow
    for rows.Next() {
        var row OutboxRow
        require.NoError(t, rows.Scan(&row.Payload))
        result = append(result, row)
    }
    return result
}

Что даёт этот подход:

  • Синхронность — outbox строка создана внутри той же транзакции, видна сразу.
  • Полная проверка — event_type и payload проверяются точно.
  • Скорость — нет ожидания Kafka producer flush или consumer lag.

Что не проверяется: что Kafka доставила сообщение consumer-у. Это задача E2E-уровня.

Redis — не поднимаем

GOTEST-20: build-tag или env-профиль подменяет cache.Client на NoopCache.

Определяем интерфейс кеша в домене:

type CacheClient interface {
    Get(ctx context.Context, key string) ([]byte, error)
    Set(ctx context.Context, key string, value []byte, ttl time.Duration) error
}

Реальная Redis-реализация используется в production. В тестах — NoopCache, который просто ничего не делает:

type NoopCache struct{}

func (n *NoopCache) Get(_ context.Context, _ string) ([]byte, error) {
    return nil, nil
}

func (n *NoopCache) Set(_ context.Context, _ string, _ []byte, _ time.Duration) error {
    return nil
}

Подключение через env или конструктор в newTestServer:

func newTestServer(t *testing.T) (*httptest.Server, *OrderDatabasePreparer) {
    t.Helper()
    pool, _ := pgxpool.New(context.Background(), testDSN)
    t.Cleanup(pool.Close)

    q := db.New(pool)
    repo := order.NewRepository(q)
    cache := &NoopCache{}
    clock := &fixedClock{at: time.Date(2025, 3, 10, 12, 0, 0, 0, time.UTC)}
    ids := &seqIDGenerator{}
    svc := order.NewService(repo, cache, clock, ids)

    r := chi.NewRouter()
    orderhttp.Mount(r, svc)
    srv := httptest.NewServer(r)
    t.Cleanup(srv.Close)

    return srv, &OrderDatabasePreparer{pool: pool}
}

@Cacheable-аналогов в Go нет — кеш явно вызывается в Service или Repository. NoopCache делает эти вызовы прозрачными: тест проверяет бизнес-логику, не поведение Redis.

Если нужно специально тестировать cache-aside или eviction — отдельный тест с sync.Map-реализацией интерфейса, не Redis-контейнер.

Idempotent consumer — тест как объект

GOTEST-21: handler вызывается напрямую, без брокера.

В production consumer принимает сообщение из Kafka и делегирует handler-у:

func (c *PaymentEventConsumer) Run(ctx context.Context) {
    for msg := range c.reader.Messages(ctx) {
        if err := c.handler.Handle(ctx, msg); err != nil {
            c.logger.Error("handle payment event", "err", err)
        }
    }
}

PaymentEventHandler реализует идемпотентность через processed_event-таблицу:

func (h *PaymentEventHandler) Handle(ctx context.Context, msg kafka.Message) error {
    var event PaymentChargedEvent
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return err
    }

    marked, err := h.processedRepo.TryMark(ctx, event.EventID, "order-payment")
    if err != nil {
        return err
    }
    if !marked {
        return nil
    }

    order, err := h.orderRepo.FindBySagaID(ctx, event.SagaID)
    if err != nil {
        return err
    }
    order.ApplyPayment(event.Amount)
    return h.orderRepo.Save(ctx, order)
}

Тест вызывает handler.Handle напрямую — нет необходимости поднимать брокер:

func TestPaymentEventHandler_Handle_WhenNewEvent_UpdatesOrderToPaid(t *testing.T) {
    pool := testPool(t)
    prep := &OrderDatabasePreparer{pool: pool}
    prep.Clear(t).CreateCustomer(t, "c1").CreateOrder(t, "o1", "c1", "AWAITING_PAYMENT")

    q := db.New(pool)
    handler := payment.NewEventHandler(
        order.NewRepository(q),
        processed.NewRepository(q),
    )

    // Arrange
    msg := kafka.Message{
        Value: mustJSON(t, PaymentChargedEvent{
            EventID: "evt-1",
            SagaID:  "saga-1",
            Amount:  100,
        }),
    }

    // Act
    err := handler.Handle(context.Background(), msg)
    require.NoError(t, err)

    // Assert
    row := prep.FindOrder(t, "o1")
    assert.Equal(t, "PAID", row.Status)
}

Второй вызов с тем же EventID — проверяем идемпотентность:

func TestPaymentEventHandler_Handle_WhenDuplicateEvent_IsIdempotent(t *testing.T) {
    pool := testPool(t)
    prep := &OrderDatabasePreparer{pool: pool}
    prep.Clear(t).CreateCustomer(t, "c1").CreateOrder(t, "o1", "c1", "AWAITING_PAYMENT")

    q := db.New(pool)
    handler := payment.NewEventHandler(
        order.NewRepository(q),
        processed.NewRepository(q),
    )

    msg := kafka.Message{
        Value: mustJSON(t, PaymentChargedEvent{
            EventID: "evt-dup",
            SagaID:  "saga-1",
            Amount:  100,
        }),
    }

    // Act — первый вызов
    require.NoError(t, handler.Handle(context.Background(), msg))
    // Act — повторный вызов с тем же EventID
    require.NoError(t, handler.Handle(context.Background(), msg))

    // Assert — статус изменился ровно один раз
    row := prep.FindOrder(t, "o1")
    assert.Equal(t, "PAID", row.Status)
}

Outbox-relay — вызываем синхронно

GOTEST-22: relay.ProcessPending(ctx) вместо ожидания фонового тика.

Outbox-relay в production работает как фоновый воркер с тикером:

func (r *OutboxRelay) Run(ctx context.Context) {
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            r.ProcessPending(ctx)
        }
    }
}

ProcessPending выбирает необработанные строки из outbox и публикует в Kafka:

func (r *OutboxRelay) ProcessPending(ctx context.Context) {
    events, err := r.outboxRepo.FindPending(ctx, r.batchSize)
    if err != nil {
        r.logger.Error("find pending outbox events", "err", err)
        return
    }
    for _, evt := range events {
        if err := r.producer.Publish(ctx, evt); err != nil {
            r.logger.Error("publish outbox event", "err", err, "id", evt.ID)
            continue
        }
        _ = r.outboxRepo.MarkPublished(ctx, evt.ID)
    }
}

В тесте не запускаем Run. Вызываем ProcessPending напрямую после того, как HTTP-запрос создал события:

func TestOutboxRelay_ProcessPending_PublishesOrderCreatedEvent(t *testing.T) {
    pool := testPool(t)
    prep := &OrderDatabasePreparer{pool: pool}
    prep.Clear(t).CreateCustomer(t, "c1")

    kafkaStub := &inMemoryKafkaProducer{}
    relay := outbox.NewRelay(
        outbox.NewRepository(db.New(pool)),
        kafkaStub,
        10,
    )

    srv, _ := newTestServerWithPool(t, pool)

    // Arrange — создаём заказ через HTTP
    resp, err := http.Post(srv.URL+"/orders", "application/json",
        strings.NewReader(`{"customerId":"c1","amount":100}`))
    require.NoError(t, err)
    require.Equal(t, http.StatusCreated, resp.StatusCode)

    // Act — вызываем relay синхронно
    relay.ProcessPending(context.Background())

    // Assert — Kafka producer получил событие
    require.Len(t, kafkaStub.Messages, 1)
    assert.Equal(t, "ORDER_CREATED", kafkaStub.Messages[0].EventType)
}

inMemoryKafkaProducer — простая in-memory реализация интерфейса producer-а:

type inMemoryKafkaProducer struct {
    Messages []outbox.Event
}

func (p *inMemoryKafkaProducer) Publish(_ context.Context, evt outbox.Event) error {
    p.Messages = append(p.Messages, evt)
    return nil
}

Обратите внимание: в этом тесте мы проверяем именно relay-логику (что он читает pending и вызывает producer). Сам Kafka-брокер по-прежнему не нужен.

Что запрещено

АнтипаттернПравилоЧто взамен
testcontainers-go для Kafka/Redis в интеграционном тестеGOTEST-X7Outbox-проверка, NoopCache
testify/assert.Eventually для ожидания consumer-аGOTEST-X1handler.Handle(ctx, msg) напрямую
time.Sleep для ожидания relay-тикаGOTEST-X1relay.ProcessPending(ctx) явно
Мок KafkaProducer интерфейса через testify/mock в relay-тестеGOTEST-X7inMemoryKafkaProducer — простая реализация
Проверка Kafka-доставки через pollingGOTEST-X1outbox-таблица + in-memory producer в relay-тесте
Redis-контейнер для проверки cache-asideGOTEST-X7отдельный unit с sync.Map-реализацией
os.Setenv("KAFKA_BROKERS", ...) в тесте без сбросаGOTEST-X7NoopCache/in-memory через конструктор newTestServer

Куда дальше

  • Go: базовая инфраструктура — TestMain и Testcontainers — как устроен TestMain и newTestServer.
  • Go: DatabasePreparer — fluent-методы FindOutboxEvents, Clear, CreateOrder.
  • Go: детерминизм — Clock и IDGenerator — fixedClock, seqIDGenerator.
  • Пирамида тестов Go — где Kafka E2E и build-tag e2e.