Java CompletableFuture
Compone cómputos asíncronos con CompletableFuture: thenApply, thenCompose, allOf, exceptionally y los errores más comunes a evitar.
Future es un manejador de resultado de un solo uso: envías, esperas, lees. No puede encadenarse. Si quieres "hacer A, luego con el resultado de A hacer B, luego combinar B con C y pasarlo a D" sin escribir una máquina de estados a mano, necesitas CompletableFuture — el rediseño de Java 8 de la idea de resultado asíncrono centrado en la composición.
CompletableFuture<V> implementa Future<V>, por lo que toda la API antigua sigue disponible. La novedad es la API de combinadores: más de treinta métodos que permiten construir grafos de flujo de datos de trabajo asíncrono — aplicar funciones, ejecutar efectos secundarios, combinar múltiples futuros, recuperarse de excepciones, aplicar tiempos límite — sin bloquear nunca un hilo para esperar un resultado intermedio.
Los métodos de inicio
Normalmente no construyes un CompletableFuture directamente. Comienzas una cadena con uno de estos:
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> log("hello"));
CompletableFuture<String> c = CompletableFuture.completedFuture("ready");
CompletableFuture<String> d = CompletableFuture.failedFuture(new IOException("nope"));| Inicio | Comportamiento |
|---|---|
supplyAsync(Supplier) | Ejecuta un Supplier en el pool común y devuelve su valor |
runAsync(Runnable) | Ejecuta un Runnable en el pool común, sin valor |
completedFuture(v) | Un futuro ya resuelto con el valor dado |
failedFuture(t) | Un futuro ya fallido con el throwable dado |
supplyAsync y runAsync tienen sobrecargas que aceptan un Executor explícito. Casi siempre querrás pasar uno. El valor por defecto es ForkJoinPool.commonPool() — un pool compartido dimensionado según el número de CPUs, adecuado para trabajo CPU corto, pero desastroso si le asignas I/O (una llamada lenta bloquea un núcleo para todos). Pasa siempre un executor explícito para I/O o trabajo de coste indeterminado.
Encadenamiento: thenApply, thenAccept, thenRun
Los combinadores más simples transforman un futuro en otro:
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<String> b = a.thenApply(n -> "value is " + n); // transform
CompletableFuture<Void> c = a.thenAccept(n -> System.out.println(n)); // consume, no result
CompletableFuture<Void> d = a.thenRun(() -> System.out.println("done")); // side-effect, ignore value| Método | Tipo de lambda | Devuelve |
|---|---|---|
thenApply | Function<T,U> | CompletableFuture<U> |
thenAccept | Consumer<T> | CompletableFuture<Void> |
thenRun | Runnable | CompletableFuture<Void> |
Cada método tiene tres variantes:
thenApply(fn)— se ejecuta en el hilo que completó la etapa anteriorthenApplyAsync(fn)— se ejecuta en el pool comúnthenApplyAsync(fn, executor)— se ejecuta en un executor específico
La forma sin Async es la más rápida (sin cambio de hilo), pero significa que tu fn se ejecuta en cualquier hilo que haya completado la etapa anterior — posiblemente el hilo de I/O que no quieres ocupar con trabajo CPU. Las formas *Async son la opción más segura por defecto en pipelines heterogéneas.
thenCompose — aplanar un futuro de un futuro
thenApply funciona bien cuando la función devuelve un valor simple. Cuando devuelve otro CompletableFuture, no quieres un CompletableFuture<CompletableFuture<V>> — quieres thenCompose:
CompletableFuture<User> user = lookupUser(id);
CompletableFuture<Profile> profile = user.thenCompose(u -> loadProfile(u.profileId()));
// ^ Function<User, CompletableFuture<Profile>>thenCompose es flatMap para futuros. Úsalo cuando el siguiente paso sea en sí mismo asíncrono; usa thenApply cuando no lo sea.
Combinar dos futuros: thenCombine
Cuando tienes dos valores asíncronos independientes y quieres combinarlos:
CompletableFuture<Integer> price = fetchPrice(symbol);
CompletableFuture<Integer> shares = fetchShares(account);
CompletableFuture<Integer> total = price.thenCombine(shares, (p, s) -> p * s);thenCombine espera a que ambas entradas completen y luego aplica un BiFunction a sus resultados. Los dos futuros se ejecutan en paralelo — price y shares ya están en vuelo cuando se registra thenCombine. El combinador se ejecuta en el hilo que complete en segundo lugar.
La versión "any", applyToEither, toma el primer resultado e ignora el segundo.
Muchos futuros: allOf y anyOf
Cuando el paralelismo abarca una colección de futuros:
List<CompletableFuture<String>> all = ids.stream()
.map(this::fetchAsync)
.toList();
CompletableFuture<Void> doneAll = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
CompletableFuture<Object> firstOne = CompletableFuture.anyOf(all.toArray(new CompletableFuture[0]));allOf completa cuando todas las entradas han terminado. Devuelve CompletableFuture<Void> — para obtener la lista de resultados, hay que usar thenApply y extraerlos:
CompletableFuture<List<String>> results = doneAll.thenApply(v ->
all.stream().map(CompletableFuture::join).toList()); // .join() never blocks here — they're all completeanyOf devuelve el valor del primer input que complete (como Object — no hay forma de expresar "cualquiera de estos futuros tipados" con un único tipo de retorno).
Manejo de errores: exceptionally y handle
Un CompletableFuture puede fallar (cualquier etapa que lance una excepción produce un futuro fallido aguas abajo). Los combinadores que recuperan o transforman:
CompletableFuture<String> safe = riskyAsync()
.exceptionally(ex -> "fallback for: " + ex.getMessage());
CompletableFuture<String> either = riskyAsync()
.handle((value, ex) -> ex == null ? value : "fallback");| Método | Cuándo se ejecuta | Qué devuelve |
|---|---|---|
exceptionally(fn) | Solo en caso de fallo; recibe la causa | Valor recuperado |
handle(bi) | Siempre; recibe (value, ex) (uno es null) | Valor transformado |
whenComplete(bi) | Siempre; recibe (value, ex) | El mismo futuro, solo efecto secundario |
exceptionally es la ruta simple de "capturar y reemplazar". handle es la opción más general de "ejecutar siempre, decidir según el resultado" — útil cuando quieres registrar cada finalización independientemente del éxito.
orTimeout y completeOnTimeout
Java 9 añadió tiempos límite directamente a la API de futuros:
CompletableFuture<String> withDeadline = riskyAsync()
.orTimeout(2, TimeUnit.SECONDS); // completes exceptionally if not done in 2s
CompletableFuture<String> withDefault = riskyAsync()
.completeOnTimeout("fallback", 2, TimeUnit.SECONDS);Estos permiten expresar plazos sin escribir tu propio vigilante. Usan hilos programados internos, por lo que son baratos de adjuntar.
No bloquees en etapas asíncronas
El mayor error con CompletableFuture: llamar a .get() o .join() dentro de una etapa Async. Eso es un hilo del pool del executor sentado inactivo esperando a otro hilo del mismo pool — bajo carga, puedes bloquear todo el pool.
// WRONG — joining inside an async stage on the common pool
CompletableFuture.supplyAsync(() -> {
Integer x = anotherFuture().join(); // blocks a pool thread
return x * 2;
});
// RIGHT — compose instead of join
anotherFuture().thenApply(x -> x * 2);Si te encuentras usando .get() dentro de una etapa Async, en realidad querías thenCompose/thenApply.
Usa tu propio executor
El pool común por defecto es adecuado para trabajo CPU corto. Para I/O o cualquier cosa que pueda bloquearse, usa el tuyo propio:
ExecutorService io = Executors.newFixedThreadPool(50, namedFactory("io"));
ExecutorService cpu = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), namedFactory("cpu"));
CompletableFuture.supplyAsync(this::loadFromDb, io)
.thenApplyAsync(this::transform, cpu)
.thenAcceptAsync(this::sendToClient, io);Cada paso se ejecuta en el pool correcto. El pool común queda libre para parallelStream y otros usos del framework. Combinarlos así es el núcleo de un Java asíncrono bien comportado.
Ejemplo práctico: una pipeline asíncrona pequeña
El programa siguiente obtiene un "usuario" y un "perfil" en paralelo, los combina, aplica un plazo y se recupera de una ruta de fallo.
Lo que se puede observar en la ejecución:
- La sección 1 usó
thenCombinesobre dos obtenciones independientes. Se ejecutaron en paralelo —name(50 ms) yage(80 ms) ya estaban en vuelo antes de adjuntar el combinador. El futuro combinado completó poco después de que el más lento terminara. Esa es la clave del paralelismo: una pipeline asíncrona no espera en cada paso, compone los pasos como un grafo. - La sección 2 usó
thenComposepara encadenar pasos donde cada uno es en sí mismo asíncrono.thenApplyhabría dadoCompletableFuture<CompletableFuture<String>>— inútil.thenComposeaplana, comoflatMaphace con streams yOptional. - La sección 3 usó
allOfsobre una lista y luegothenApplypara extraer los valores. El propioallOfdevuelveVoid; la recolección de resultados es un stream separado sobre los futuros (ya completos) usandojoin(). Las llamadas ajoin()no bloquean aquí porqueallOfya ha completado. - La sección 4 mostró
exceptionallyrecuperándose de una tarea que lanzó una excepción. El futuro aguas arriba falló; el futuro aguas abajo devolvió la cadena de reserva. Sinexceptionally(ohandle), el fallo se propagaría a.join()como unCompletionException. - La sección 5 usó
orTimeoutpara aplicar un plazo de 100 ms a una tarea de 500 ms. El futuro completó excepcionalmente conTimeoutException; eljoinlo relanzó dentro deCompletionException. Esta es la forma correcta de expresar "quiero este resultado, pero solo si llega lo suficientemente rápido". - La sección 6 usó
handlepara bifurcar según éxito/fallo en un único paso.handlesiempre se ejecuta y recibe ambos(value, ex)— uno es null. Útil cuando quieres una cola uniforme de la pipeline independientemente de si el trabajo tuvo éxito.
Qué sigue
El siguiente capítulo, Java Fork/Join, cubre el ForkJoinPool — el pool de robo de trabajo que respalda los streams paralelos y el pool común de CompletableFuture, y la herramienta adecuada para trabajo CPU divide y vencerás.