← назад к разделу

Когда программа ждёт ответа от сети или базы данных, основной поток стоит без дела — тратит время, которое мог бы потратить на другую работу. CompletableFuture решает эту проблему: он позволяет запустить задачу в фоне и продолжить работу, не останавливая всё приложение.

Что такое асинхронность простыми словами

Представьте, что вы заказали пиццу. Синхронный подход — стоять у двери и ждать курьера, ничего не делая. Асинхронный — оставить дверь открытой, заняться своими делами и вернуться только когда позвонят.

В программировании синхронный вызов блокирует поток: он не делает ничего, пока не получит результат. Асинхронный вызов отправляет задачу на выполнение, а поток немедленно освобождается для другой работы.

До Java 8 для асинхронных задач использовали Future — но он умел только два действия: проверить, готово ли (isDone()), и заблокироваться в ожидании (get()). Выстроить цепочку из нескольких шагов или обработать ошибку без оборачивания в try-catch было неудобно.

CompletableFuture — это расширенный Future. Он реализует интерфейс CompletionStage, что позволяет строить цепочки обработки, комбинировать несколько асинхронных задач и обрабатывать ошибки декларативно.

Запуск задачи: supplyAsync и runAsync

Самый простой способ запустить задачу асинхронно — supplyAsync. Он принимает Supplier<T> и возвращает CompletableFuture<T>:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // эта задача выполнится в фоновом потоке
    return fetchUserFromDatabase(userId);
});

Если возвращаемое значение не нужно — используйте runAsync:

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    sendNotificationEmail(userId);
});

На каком пуле выполняется задача

По умолчанию CompletableFuture использует общий пул ForkJoinPool.commonPool(). Этот пул разделяется между всеми CompletableFuture в JVM. Для CPU-интенсивных задач это нормально, но для операций ввода-вывода (запросы к базе, HTTP-вызовы) лучше передавать собственный Executor:

ExecutorService ioPool = Executors.newFixedThreadPool(20);

CompletableFuture<String> future = CompletableFuture.supplyAsync(
    () -> fetchUserFromDatabase(userId),
    ioPool  // задача уйдёт именно в этот пул
);

Почему важно разделять? Если заблокировать все потоки commonPool ожиданием сетевых ответов, другие CompletableFuture в JVM встанут в очередь — включая те, что вообще не связаны с вашим кодом.

Цепочки обработки: thenApply и thenAccept

Получив результат асинхронной задачи, вы часто хотите что-то с ним сделать. Для этого есть операторы-трансформации:

  • thenApply(Function<T, R>) — преобразует результат, возвращает новый CompletableFuture<R>.
  • thenAccept(Consumer<T>) — потребляет результат, возвращает CompletableFuture<Void>.
  • thenRun(Runnable) — запускает действие после завершения, результат предыдущего шага игнорируется.
CompletableFuture<String> result = CompletableFuture
    .supplyAsync(() -> fetchUserFromDatabase(userId))   // возвращает User
    .thenApply(user -> user.getEmail())                 // достаём email
    .thenApply(email -> email.toLowerCase());           // приводим к нижнему регистру

Каждый thenApply создаёт новый этап цепочки. Эти этапы выполняются в том же потоке, что завершил предыдущий шаг — это важно понимать при отладке.

Если хотите, чтобы следующий этап гарантированно выполнился в отдельном потоке из пула, используйте вариант с суффиксом Async:

.thenApplyAsync(user -> heavyTransformation(user), ioPool)

Композиция задач: thenCompose

Представьте, что по результату первой асинхронной задачи нужно запустить вторую — тоже асинхронную. Наивный подход даст CompletableFuture<CompletableFuture<T>> — вложенный future внутри future.

thenCompose разворачивает это в плоскую цепочку:

CompletableFuture<Order> result = CompletableFuture
    .supplyAsync(() -> findUser(userId))          // CompletableFuture<User>
    .thenCompose(user ->
        fetchLastOrder(user.getId())              // возвращает CompletableFuture<Order>
    );                                            // результат — CompletableFuture<Order>, не вложенный

Короткая формула: thenApply — когда следующий шаг синхронный; thenCompose — когда следующий шаг сам возвращает CompletableFuture.

Объединение результатов: thenCombine и allOf

Иногда нужно запустить две задачи параллельно и объединить их результаты. thenCombine ждёт завершения обоих CompletableFuture и применяет функцию:

CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> fetchUser(userId));
CompletableFuture<Wallet> walletFuture = CompletableFuture.supplyAsync(() -> fetchWallet(userId));

CompletableFuture<String> combined = userFuture.thenCombine(
    walletFuture,
    (user, wallet) -> user.getName() + " — баланс: " + wallet.getBalance()
);

Обе задачи выполняются параллельно, результат собирается только когда обе готовы.

Для ожидания нескольких задач без объединения значений есть allOf:

CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
all.join(); // дождаться всех

А anyOf возвращает результат первой завершившейся задачи — полезно для реализации паттерна «кто первый ответит».

Обработка ошибок: exceptionally и handle

Если в одном из этапов цепочки выброшено исключение, оно «оборачивается» и передаётся дальше по цепочке как причина провала CompletableFuture. По умолчанию промежуточные .thenApply пропускают провальный future без вызова функции — это ловушка для новичков: ошибка тихо проходит через всю цепочку.

Чтобы явно обработать ошибку, используйте:

exceptionally(Function<Throwable, T>) — обработчик вызывается только при ошибке, в норме шаг прозрачно пропускается:

CompletableFuture<String> result = CompletableFuture
    .supplyAsync(() -> fetchUserFromDatabase(userId))
    .thenApply(User::getEmail)
    .exceptionally(ex -> {
        log.error("Не удалось получить пользователя", ex);
        return "unknown@example.com"; // значение по умолчанию
    });

handle(BiFunction<T, Throwable, R>) — вызывается всегда, независимо от успеха или ошибки; один из аргументов будет null:

.handle((user, ex) -> {
    if (ex != null) {
        return fallbackUser();
    }
    return user;
})

Грабли: блокирующий get и проглоченные исключения

Ловушка 1: блокирующий get

Вызов .get() блокирует текущий поток до готовности результата. Если сделать это в потоке ForkJoinPool.commonPool(), можно заблокировать весь пул:

// Плохо: блокируем поток пула ради синхронного ожидания
CompletableFuture.supplyAsync(() -> fetchUser(userId))
    .thenApply(user -> {
        // этот код выполняется в потоке commonPool
        String email = CompletableFuture.supplyAsync(() -> fetchEmail(userId)).get(); // БЛОКИРОВКА
        return email;
    });

Правильное решение — thenCompose вместо вложенного .get().

.join() делает то же самое, что .get(), но бросает непроверяемое исключение (CompletionException) вместо проверяемого ExecutionException. Для цепочек не подходит по той же причине — только для верхнего уровня, уже за пределами пула.

Ловушка 2: проглоченные исключения

Если к CompletableFuture не подключить обработчик ошибок и нигде не вызвать .get() или .join() — исключение просто пропадает. Особенно часто это случается с runAsync:

CompletableFuture.runAsync(() -> {
    throw new RuntimeException("Что-то сломалось");
    // исключение нигде не видно, если не вызвать .join() или не добавить .exceptionally()
});

Правило: всегда завершайте цепочку либо .exceptionally(), либо .handle(), либо явным .join() там, где ошибку можно поймать.

Коротко

  • CompletableFuture — асинхронная задача с поддержкой цепочек и обработки ошибок, в отличие от старого Future.
  • supplyAsync / runAsync — запуск задачи; без Executor-аргумента — в commonPool, для I/O лучше передавать свой пул.
  • thenApply — трансформация результата (синхронный следующий шаг); thenCompose — когда следующий шаг сам возвращает CompletableFuture.
  • thenCombine — объединить два параллельных результата; allOf — дождаться всех.
  • exceptionally — обработчик ошибки с подстановкой значения; handle — обработчик, вызываемый всегда.
  • Блокирующий .get() внутри потока пула ведёт к дедлоку — заменяйте на thenCompose.
  • Необработанные исключения в CompletableFuture молча теряются — всегда добавляйте обработчик в конец цепочки.

Что почитать дальше

  • ExecutorService и пулы потоков — как устроены пулы, на которых работают асинхронные задачи.
  • Виртуальные потоки — альтернатива CompletableFuture для высококонкурентного I/O в Java 21.
  • Типичные ошибки многопоточности — дедлоки, гонки и другие проблемы, которые легко допустить в асинхронном коде.