← назад к разделу

Приложение растёт, и мастер-база начинает задыхаться от тяжёлых SELECT — отчётов, аналитики, поиска. Пока аналитический запрос сканирует миллионы строк, обычные OLTP-операции ждут блокировок и тормозят. Решение — поставить рядом реплику и направлять на неё чтение.

Как работает streaming replication

PostgreSQL записывает каждое изменение в журнал WAL (Write-Ahead Log). Реплика постоянно получает этот журнал с мастера и проигрывает его у себя — так данные на реплике повторяют данные мастера.

Несколько понятий, которые важно знать:

  • Master (primary) — единственный узел, который принимает запись: INSERT, UPDATE, DELETE.
  • Replica (standby, hot standby) — проигрывает WAL с мастера, отвечает только на чтение.
  • Replication lag — задержка между записью на мастере и появлением данных на реплике. В нормальных условиях — 50–500 миллисекунд, под нагрузкой или при больших транзакциях — до нескольких секунд.

По умолчанию репликация асинхронная: мастер не ждёт подтверждения от реплики перед тем, как ответить клиенту. Это быстро, но означает небольшое отставание реплики.

Зачем нужна read-replica

Три основных сценария:

Разгрузка мастера. Тяжёлые SELECT уходят на реплику и не мешают OLTP-операциям. Аналогия: открыть второй кассовый узел для медленных покупателей, чтобы быстрая очередь не стояла.

Высокая доступность (HA). Если мастер упал, реплику можно повысить до нового мастера (failover). Данные не теряются, приложение продолжает работать.

Геораспределение. Реплика поднимается в другом датацентре или регионе, рядом с пользователями — снижается задержка чтения.

Что не стоит делать с репликой: читать данные сразу после записи в расчёте на свежий результат — реплика отстаёт и может не знать о только что вставленной строке. Об этом подробнее ниже.

Маршрутизация запросов

Приложение держит два пула соединений — один к мастеру, второй к реплике. Запросы в контексте read-only транзакции уходят на реплику, остальные — на мастер.

Важный нюанс: выбор источника нужно откладывать до момента первого запроса, а не до открытия соединения. Иначе признак «только чтение» ещё не известен и роутинг не сработает правильно.

// HikariCP + AbstractRoutingDataSource + LazyConnectionDataSourceProxy
public enum DataSourceType { MASTER, REPLICA }

@Component
public class TransactionRoutingDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return TransactionSynchronizationManager.isCurrentTransactionReadOnly()
            ? DataSourceType.REPLICA
            : DataSourceType.MASTER;
    }
}

@Configuration
public class DataSourceConfig {

    @Bean @Primary
    public DataSource routingDataSource(DataSource master, DataSource replica) {
        var routing = new TransactionRoutingDataSource();
        routing.setTargetDataSources(Map.of(
            DataSourceType.MASTER, master,
            DataSourceType.REPLICA, replica
        ));
        routing.setDefaultTargetDataSource(master);
        return new LazyConnectionDataSourceProxy(routing);
    }
}

// Использование:
@Transactional(readOnly = true)
public List<OrderView> findOrders(long customerId) {
    // уходит на реплику
}

@Transactional
public OrderId createOrder(CreateOrderCommand cmd) {
    // уходит на мастер
}
// pgxpool: два пула, выбор через контекст
type DB struct {
    Master  *pgxpool.Pool
    Replica *pgxpool.Pool
}

type ctxKey string
const readOnlyKey ctxKey = "readOnly"

func WithReadOnly(ctx context.Context) context.Context {
    return context.WithValue(ctx, readOnlyKey, true)
}

func (db *DB) Pool(ctx context.Context) *pgxpool.Pool {
    if v, ok := ctx.Value(readOnlyKey).(bool); ok && v {
        return db.Replica
    }
    return db.Master
}

// Использование:
func (r *OrderRepo) FindOrders(ctx context.Context, customerID int64) ([]Order, error) {
    rows, err := r.db.Pool(WithReadOnly(ctx)).Query(ctx,
        "SELECT id, status FROM orders WHERE customer_id = $1", customerID)
    // ...
}

func (r *OrderRepo) CreateOrder(ctx context.Context, cmd CreateOrderCmd) (int64, error) {
    var id int64
    err := r.db.Pool(ctx).QueryRow(ctx,
        "INSERT INTO orders (customer_id) VALUES ($1) RETURNING id", cmd.CustomerID,
    ).Scan(&id)
    return id, err
}
// node-postgres (pg): два Pool, выбор функцией
const pools = {
    master:  new Pool({ connectionString: process.env.DB_MASTER_URL }),
    replica: new Pool({ connectionString: process.env.DB_REPLICA_URL }),
};

function getPool(readOnly: boolean): Pool {
    return readOnly ? pools.replica : pools.master;
}

// Использование:
export async function findOrders(customerId: bigint): Promise<Order[]> {
    const { rows } = await getPool(true).query<Order>(
        'SELECT id, status FROM orders WHERE customer_id = $1',
        [customerId],
    );
    return rows;
}

export async function createOrder(cmd: CreateOrderCmd): Promise<bigint> {
    const { rows } = await getPool(false).query<{ id: bigint }>(
        'INSERT INTO orders (customer_id) VALUES ($1) RETURNING id',
        [cmd.customerId],
    );
    return rows[0].id;
}
# psycopg (v3): два пула через AsyncConnectionPool
master_pool  = AsyncConnectionPool(conninfo=MASTER_DSN, open=False)
replica_pool = AsyncConnectionPool(conninfo=REPLICA_DSN, open=False)

def get_pool(read_only: bool) -> AsyncConnectionPool:
    return replica_pool if read_only else master_pool

# Использование:
async def find_orders(customer_id: int) -> list[dict]:
    async with get_pool(read_only=True).connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                "SELECT id, status FROM orders WHERE customer_id = %s",
                (customer_id,),
            )
            return await cur.fetchall()

async def create_order(customer_id: int) -> int:
    async with get_pool(read_only=False).connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                "INSERT INTO orders (customer_id) VALUES (%s) RETURNING id",
                (customer_id,),
            )
            row = await cur.fetchone()
            return row[0]

Read-after-write: распространённая ловушка

Типичный сценарий: пользователь создаёт заказ, и приложение сразу показывает ему список заказов. Запись уходит на мастер, а чтение — на реплику, которая ещё не получила свежий WAL. Только что созданный заказ не появится в ответе.

createOrder(req)   → мастер  ✓
listOrders(userId) → реплика ✗  (заказ может отсутствовать)

Три способа это обойти:

Читать с мастера после записи

Самый простой вариант для страниц, где пользователь ожидает свежих данных сразу после своего действия.

@Transactional   // без readOnly=true — пойдёт на мастер
public List<Order> myOrdersFromMaster(long customerId) {
    return orderRepo.findByCustomerId(customerId);
}
// ctx без WithReadOnly — выбирается пул мастера
func (r *OrderRepo) MyOrdersFromMaster(ctx context.Context, customerID int64) ([]Order, error) {
    rows, err := r.db.Pool(ctx).Query(ctx,
        "SELECT id, status FROM orders WHERE customer_id = $1", customerID)
    // ...
}
// getPool(false) — явно мастер
export async function myOrdersFromMaster(customerId: bigint): Promise<Order[]> {
    const { rows } = await getPool(false).query<Order>(
        'SELECT id, status FROM orders WHERE customer_id = $1',
        [customerId],
    );
    return rows;
}
# read_only=False — явно мастер
async def my_orders_from_master(customer_id: int) -> list[dict]:
    async with get_pool(read_only=False).connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                "SELECT id, status FROM orders WHERE customer_id = %s",
                (customer_id,),
            )
            return await cur.fetchall()

Вернуть данные сразу из операции записи

Данные уже в памяти после INSERT — не нужно делать отдельный SELECT. RETURNING в PostgreSQL возвращает вставленную строку прямо в рамках той же транзакции на мастере.

// jOOQ: INSERT ... RETURNING возвращает запись мастера
public OrderResponse createOrder(CreateOrderCommand cmd) {
    OrdersRecord saved = dsl
        .insertInto(ORDERS)
        .set(ORDERS.CUSTOMER_ID, cmd.customerId())
        .returning()
        .fetchOne();
    return OrderResponse.from(saved);
}
func (r *OrderRepo) CreateOrder(ctx context.Context, cmd CreateOrderCmd) (*Order, error) {
    var o Order
    err := r.db.Pool(ctx).QueryRow(ctx,
        `INSERT INTO orders (customer_id) VALUES ($1)
         RETURNING id, customer_id, created_at`,
        cmd.CustomerID,
    ).Scan(&o.ID, &o.CustomerID, &o.CreatedAt)
    return &o, err
}
export async function createOrder(cmd: CreateOrderCmd): Promise<Order> {
    const { rows } = await getPool(false).query<Order>(
        `INSERT INTO orders (customer_id) VALUES ($1)
         RETURNING id, customer_id, created_at`,
        [cmd.customerId],
    );
    return rows[0];
}
async def create_order(customer_id: int) -> dict:
    async with get_pool(read_only=False).connection() as conn:
        async with conn.cursor(row_factory=dict_row) as cur:
            await cur.execute(
                """INSERT INTO orders (customer_id) VALUES (%s)
                   RETURNING id, customer_id, created_at""",
                (customer_id,),
            )
            return await cur.fetchone()

Подождать, пока реплика догонит мастер

Более сложный подход: после записи получить текущую позицию WAL на мастере (LSN) и опрашивать реплику, пока она не проиграет до этой позиции. Подходит для редких специфических случаев, когда ни первый, ни второй способ неприменимы.

String lsn = masterJdbc.queryForObject(
    "SELECT pg_current_wal_lsn()", String.class);

do {
    String replayLsn = replicaJdbc.queryForObject(
        "SELECT pg_last_wal_replay_lsn()", String.class);
    if (lsnGte(replayLsn, lsn)) break;
    Thread.sleep(50);
} while (true);
func waitForReplica(ctx context.Context, db *DB, lsn string) error {
    for {
        var replayLSN string
        err := db.Replica.QueryRow(ctx,
            "SELECT pg_last_wal_replay_lsn()").Scan(&replayLSN)
        if err != nil {
            return err
        }
        if lsnGte(replayLSN, lsn) {
            return nil
        }
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(50 * time.Millisecond):
        }
    }
}
async function waitForReplica(lsn: string, timeoutMs = 5000): Promise<void> {
    const deadline = Date.now() + timeoutMs;
    while (Date.now() < deadline) {
        const { rows } = await getPool(true).query<{ replay: string }>(
            'SELECT pg_last_wal_replay_lsn() AS replay',
        );
        if (lsnGte(rows[0].replay, lsn)) return;
        await new Promise(r => setTimeout(r, 50));
    }
    throw new Error('replica catch-up timeout');
}
async def wait_for_replica(lsn: str, timeout: float = 5.0) -> None:
    loop = asyncio.get_running_loop()
    deadline = loop.time() + timeout
    while loop.time() < deadline:
        async with get_pool(read_only=True).connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT pg_last_wal_replay_lsn()")
                (replay_lsn,) = await cur.fetchone()
        if lsn_gte(str(replay_lsn), lsn):
            return
        await asyncio.sleep(0.05)
    raise TimeoutError("replica catch-up timeout")

Synchronous replication

По умолчанию мастер отвечает клиенту сразу после записи в WAL, не дожидаясь реплики. Можно включить синхронный режим:

# postgresql.conf на мастере
synchronous_commit = on
synchronous_standby_names = 'replica1'

В этом режиме мастер ждёт подтверждения от реплики перед ответом на COMMIT. Гарантия сильнее, но цена — latency каждой транзакции увеличивается на сетевой round-trip и fsync реплики (1–5 мс локально, десятки миллисекунд при геораспределении).

Для большинства задач синхронная репликация не нужна: асинхронная схема с правильным роутингом покрывает 99% случаев. Sync имеет смысл только там, где данные критически важны и потеря даже миллисекунды записи неприемлема.

Failover

Если мастер падает, инструменты вроде Patroni или repmgr обнаруживают это и повышают реплику до нового мастера. DNS или балансировщик переключаются на новый адрес, пулы соединений переподключаются.

На время переключения (обычно 10–60 секунд) записи завершаются ошибками. Для критичных операций стоит добавить retry с экспоненциальной задержкой:

@Retryable(
    retryFor = SQLException.class,
    maxAttempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2)
)
public OrderId createOrder(CreateOrderCommand cmd) {
    // ...
}
func withRetry(ctx context.Context, maxAttempts int, fn func() error) error {
    delay := time.Second
    for attempt := range maxAttempts {
        err := fn()
        if err == nil {
            return nil
        }
        if attempt == maxAttempts-1 {
            return err
        }
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(delay):
            delay *= 2
        }
    }
    return nil
}
async function withRetry<T>(
    fn: () => Promise<T>,
    maxAttempts = 5,
    delayMs = 1000,
): Promise<T> {
    for (let attempt = 0; attempt < maxAttempts; attempt++) {
        try {
            return await fn();
        } catch (err) {
            if (attempt === maxAttempts - 1) throw err;
            await new Promise(r => setTimeout(r, delayMs * 2 ** attempt));
        }
    }
    throw new Error('unreachable');
}
from tenacity import retry, stop_after_attempt, wait_exponential
from psycopg import OperationalError

@retry(
    retry=retry_if_exception_type(OperationalError),
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=16),
)
async def create_order(customer_id: int) -> int:
    async with get_pool(read_only=False).connection() as conn:
        async with conn.cursor() as cur:
            await cur.execute(
                "INSERT INTO orders (customer_id) VALUES (%s) RETURNING id",
                (customer_id,),
            )
            row = await cur.fetchone()
            return row[0]

Logical replication

Помимо streaming replication в PostgreSQL есть logical replication. Она копирует не весь WAL-поток, а изменения по конкретным таблицам — можно реплицировать подмножество таблиц, менять схему, направлять данные в другую систему.

Типичные применения:

  • Перекачка данных из PostgreSQL в аналитическое хранилище или Kafka.
  • Онлайн-миграция между двумя экземплярами PostgreSQL.
  • Мультимастер с разрешением конфликтов.

Для задачи «разгрузить мастер через read-replica» лучше подходит обычная streaming replication — она проще и быстрее. Logical имеет больший накладной расход.

Мониторинг отставания реплики

Отставание реплики можно смотреть прямо в PostgreSQL.

На мастере — состояние всех реплик:

SELECT application_name, state, replay_lag
FROM pg_stat_replication;

На самой реплике — сколько прошло с последней проигранной транзакции:

SELECT now() - pg_last_xact_replay_timestamp() AS replication_lag;

Стоит настроить алерт, если отставание превышает 30 секунд или в очереди накопилось более 1 ГБ WAL — это признак проблемы с производительностью или сетью.

Коротко

  • Streaming replication: мастер пишет WAL, реплика проигрывает. Отставание в норме — 50–500 мс.
  • Реплика снимает нагрузку тяжёлых SELECT с мастера и служит резервом при сбое.
  • Приложение держит два пула соединений; выбор мастер/реплика делается по признаку read-only.
  • Выбор источника нужно откладывать до первого запроса, а не до открытия соединения.
  • Read-after-write через реплику не работает: реплика отстаёт. Решения — читать с мастера, возвращать данные через RETURNING, или ждать catch-up по LSN.
  • Синхронная репликация замедляет каждый COMMIT — нужна только в редких критических случаях.
  • Мониторинг: pg_stat_replication на мастере, pg_last_xact_replay_timestamp() на реплике, алерт при lag > 30 сек.

Что почитать дальше

  • Пулы соединений в PostgreSQL — как настроить два пула для мастера и реплики.
  • Уровни изоляции транзакций — как read-only транзакции взаимодействуют с изоляцией.
  • WAL и производительность записи — что именно реплицируется и как это влияет на скорость.