Streams paralelos en Java
Procesa streams de Java en paralelo para ganar velocidad: cuándo parallelStream ayuda y cuándo empeora las cosas.
Un stream paralelo es el mismo pipeline de stream que has estado escribiendo, pero la JVM puede dividir la fuente en fragmentos y procesarlos en múltiples hilos. El cambio en el punto de llamada es mínimo:
long total = nums.parallelStream().mapToLong(n -> heavy(n)).sum();
// ^^^^^^^^^^^^^^^^^o:
long total = nums.stream().parallel().mapToLong(n -> heavy(n)).sum();La estructura del pipeline, las operaciones, el resultado — todo sin cambios. Lo que cambia es quién lo ejecuta: en lugar de un solo hilo recorriendo la fuente, varios trabajadores del ForkJoinPool común (uno por núcleo de CPU, menos uno) dividen el trabajo y un coordinador fusiona sus resultados parciales. Cuando el trabajo por elemento es suficientemente pesado y la fuente se divide bien, el pipeline termina en aproximadamente tiempo de reloj / núcleos. Cuando no es así, el paralelo es más lento que el secuencial — y a veces incorrecto. Este capítulo trata sobre distinguir ambos casos.
Qué hace realmente "parallel"
Un stream secuencial pasa un elemento por el pipeline, luego el siguiente. Un stream paralelo:
- Divide la fuente en sub-streams mediante el
Spliteratorde la fuente. Los arrays,ArrayList,IntStream.rangey fuentes similares se dividen eficientemente en O(1).LinkedList,Files.lines,Stream.iterateyStream.generateo se dividen mal o se niegan a dividirse. - Ejecuta la cadena de operaciones intermedias de cada sub-stream en un hilo trabajador del pool común.
- Fusiona los resultados parciales — para
reduceycollect, para eso sirve elcombiner.
forEach en un stream paralelo llama a tu Consumer desde múltiples hilos de forma concurrente y en orden no especificado. forEachOrdered preserva el orden de encuentro al costo de sincronización. findFirst en paralelo es más costoso que findAny por la misma razón — tiene que coordinar para identificar la primera coincidencia.
El contrato — qué debe cumplir tu pipeline
El paralelo solo da una respuesta correcta cuando el pipeline obedece tres reglas. El código secuencial que las ignora sigue funcionando; el código paralelo que las ignora produce resultados sin sentido de forma silenciosa.
- El reductor debe ser asociativo.
f(f(a, b), c) == f(a, f(b, c)).+,*,max,min, unión de conjuntos, concatenación de listas — todos califican. La resta, la división, "primera coincidencia" y "agregar a lista con orden" no. Si pasas unBinaryOperatorno asociativo areduceoCollectors.reducing, la respuesta depende de cómo la JVM decida dividir. - El pipeline debe ser sin estado. Tus lambdas no deben leer ni escribir estado mutable compartido. Una lambda que captura y muta un
ArrayListexterno, incrementa unint[]externo, o usa cualquier contador no atómico tendrá condiciones de carrera en paralelo. - El pipeline debe estar libre de efectos secundarios. El logging está bien; persistir a través de un destino seguro para hilos está bien; todo lo demás es un error esperando que un trabajador lo intercale de forma diferente.
Los collectors incorporados en Collectors cumplen 1–3 por construcción (cuando se usan como se documenta). Tus propias lambdas dentro de map, filter, reduce y peek son las que debes vigilar.
Cuándo ayuda el paralelo (y cuándo no)
Un stream paralelo gana solo cuando el trabajo por elemento es lo suficientemente grande como para superar el costo de coordinación — división, planificación, fusión y la administración del framework. Un modelo mental aproximado:
- Fuente grande + trabajo por elemento limitado por CPU + fusión barata + fuente divisible = el paralelo suele ganar. Procesamiento de imágenes por píxel, análisis por registro, hashing por archivo — casos clásicos.
- Fuente pequeña = gana el secuencial. El inicio del pool es más costoso que todo el cálculo.
- Trabajo por elemento barato = gana el secuencial.
nums.stream().mapToInt(Integer::intValue).sum()es más rápido que su versiónparallelStream()hasta quenumstiene millones de elementos; en tamaños pequeños domina la sobrecarga del framework. - E/S bloqueante por elemento = los streams paralelos son la herramienta equivocada. El
ForkJoinPoolcomún está dimensionado para trabajo de CPU; una llamada de E/S bloqueante ocupa un trabajador y deja sin recursos a todos los demás streams paralelos en la JVM (incluidos los de las bibliotecas). UsaCompletableFuturecon un executor acotado para la distribución de E/S. - Fuente no divisible = el paralelo o bien recurre al secuencial o se divide mal.
Files.lines,Stream.iterate,Stream.generateyLinkedList.stream()son los que dividen peor; los arrays,ArrayListeIntStream.rangeson los que dividen mejor.
El consejo honesto: usa secuencial por defecto; cambia a paralelo solo cuando tengas una razón medida, con números de jmh o tiempos de reloj en mano.
Operaciones que se complican en paralelo
Algunas operaciones cuyo significado cambia cuando el pipeline va en paralelo:
forEach— se ejecuta desde múltiples hilos, en orden no especificado. Si el orden importa, usaforEachOrdered(que tiene costo de sincronización).findFirst— debe coordinarse entre trabajadores para identificar la primera coincidencia en el orden de encuentro. UsafindAnysi no te importa cuál gana.limit/skip— bien definidos en streams ordenados, pero más costosos en paralelo porque la JVM debe respetar el orden. En un stream paralelo donde el orden no importa,stream.parallel().unordered().limit(n)es más barato.distinct/sorted— deben coordinarse entre trabajadores; el buffer que mantienen es compartido.reducecon la sobrecarga de 3 argumentos usa elcombinerpara fusionar las salidas de los trabajadores. Con la sobrecarga de 2 argumentos, la JVM usa la identidad dos veces más el acumulador — mismo contrato, misma regla de asociatividad.collect— losCollectorsestán diseñados para ser seguros en paralelo; la advertencia es que el contenedor de resultado puede ser unHashMapoArrayListnormal, y la colección paralela coordina internamente para mantenerlo seguro. Tus collectors downstream deben obedecer el contrato.
La trampa del estado compartido, de forma concreta
El error más común en código paralelo de principiantes:
// WRONG -- looks fine, races in parallel
List<String> shouts = new ArrayList<>();
words.parallelStream().forEach(w -> shouts.add(w.toUpperCase()));ArrayList.add no es seguro para hilos; los trabajadores concurrentes pueden perder elementos, agregar duplicados, lanzar ArrayIndexOutOfBoundsException o corromper la lista en silencio. La versión correcta expresa el resultado como la salida del pipeline, no un efecto secundario de él:
List<String> shouts = words.parallelStream().map(String::toUpperCase).toList();toList(), como cualquier otro collector y terminal que produce un valor, está diseñado para uso paralelo. En el momento en que uses forEach para mutar una variable externa, habrás abandonado el camino seguro.
Si realmente necesitas un destino seguro para hilos para forEach, usa ConcurrentLinkedQueue, AtomicLong, LongAdder o Collections.synchronizedList(...). Pero casi siempre, la respuesta correcta es "no usar forEach para acumulación; dejar que el pipeline construya el resultado".
ForkJoinPool y por qué importa
Por defecto, todos los streams paralelos en tu JVM comparten el pool común, dimensionado a Runtime.getRuntime().availableProcessors() - 1 hilos trabajadores. Esto tiene dos consecuencias:
- Un stream paralelo de larga duración monopoliza el pool. Cualquier otro stream paralelo — incluidos los de las bibliotecas — hará cola detrás de él.
- Un stream paralelo que bloquea (E/S, locks,
Thread.sleep) ocupa un hilo trabajador sin hacer ningún trabajo, reduciendo a la mitad el tamaño efectivo del pool mientras espera.
Puedes dedicar un pool privado para un pipeline específico:
try (var pool = new java.util.concurrent.ForkJoinPool(4)) {
long total = pool.submit(() ->
nums.parallelStream().mapToLong(n -> heavy(n)).sum()
).get();
}Esta es la decisión correcta para cómputo de larga duración que no quieres compartir con el resto de la JVM. Sigue siendo la decisión incorrecta para E/S bloqueante — cambia a hilos virtuales o una cadena explícita de CompletableFuture en un executor de E/S acotado.
Un ejemplo práctico: aceleración paralela, la trampa del estado compartido y un error de asociatividad
El programa a continuación mide el tiempo secuencial vs. paralelo para una suma IntStream limitada por CPU, demuestra la condición de carrera de estado compartido con forEach, muestra la versión correcta basada en collectors, y contrasta los reductores asociativos (Integer::sum) con los no asociativos ((a, b) -> a - b) bajo paralelo.
Qué destacar de la ejecución:
- La suma paralela produjo el mismo resultado que la secuencial y (en cualquier máquina multinúcleo) terminó en una fracción del tiempo de reloj. La llamada
heavypor elemento está limitada por CPU y la fuente (unint[]) se divide bien — los dos ingredientes que el paralelo necesita. - El
forEachque mutóbadSinko perdió elementos o falló. No hay solución que añada unsynchronizedaquí sin hacer la versión paralela más lenta que la secuencial. La solución es no escribirforEachpara acumulación — usa un collector o un terminal que produzca el resultado. Integer::sumes asociativo; la reducción paralela produjo la misma respuesta que la secuencial. El(a, b) -> a - bno asociativo produjo respuestas diferentes en secuencial vs. paralelo porque la JVM es libre de dividir y fusionar en cualquier orden equivalente-asociativo. El mismo código, dos respuestas — el síntoma que todo error de streams paralelos termina produciendo.parallel().forEach(...)imprimió0..15en algún orden no monótono;parallel().forEachOrdered(...)los imprimió en orden al costo de sincronización entre trabajadores. Si tuforEachse preocupa por el orden, lo estás pagando.- El
ForkJoinPool(2)privado ejecutó el pipeline contra un pool dedicado. Úsalo cuando tengas un trabajo de cómputo de larga duración y no quieras que comparta el pool común con el resto de la JVM. No lo uses como parche para E/S bloqueante — ese es un problema diferente con una herramienta diferente.
Qué sigue
Ahora puedes razonar sobre cualquier pipeline de stream: cuándo escribir uno, cómo construirlo, qué es lazy, qué hace cortocircuito, qué se ejecuta en paralelo de forma segura y qué no. Todavía queda una abstracción central sobre la mesa — la que permite a un pipeline expresar "este valor puede estar ausente" sin un solo null. El siguiente capítulo, Java Optional, cubre Optional<T> — qué es, dónde la API de streams deja sus cabos sueltos, y cómo usar map, flatMap, orElse e ifPresent para escribir código que es null-safe por construcción.