W3docs

Streams paralelos en Java

Procesa streams de Java en paralelo para ganar velocidad: cuándo parallelStream ayuda y cuándo empeora las cosas.

Un stream paralelo es el mismo pipeline de stream que has estado escribiendo, pero la JVM puede dividir la fuente en fragmentos y procesarlos en múltiples hilos. El cambio en el punto de llamada es mínimo:

long total = nums.parallelStream().mapToLong(n -> heavy(n)).sum();
//              ^^^^^^^^^^^^^^^^^

o:

long total = nums.stream().parallel().mapToLong(n -> heavy(n)).sum();

La estructura del pipeline, las operaciones, el resultado — todo sin cambios. Lo que cambia es quién lo ejecuta: en lugar de un solo hilo recorriendo la fuente, varios trabajadores del ForkJoinPool común (uno por núcleo de CPU, menos uno) dividen el trabajo y un coordinador fusiona sus resultados parciales. Cuando el trabajo por elemento es suficientemente pesado y la fuente se divide bien, el pipeline termina en aproximadamente tiempo de reloj / núcleos. Cuando no es así, el paralelo es más lento que el secuencial — y a veces incorrecto. Este capítulo trata sobre distinguir ambos casos.

Qué hace realmente "parallel"

Un stream secuencial pasa un elemento por el pipeline, luego el siguiente. Un stream paralelo:

  1. Divide la fuente en sub-streams mediante el Spliterator de la fuente. Los arrays, ArrayList, IntStream.range y fuentes similares se dividen eficientemente en O(1). LinkedList, Files.lines, Stream.iterate y Stream.generate o se dividen mal o se niegan a dividirse.
  2. Ejecuta la cadena de operaciones intermedias de cada sub-stream en un hilo trabajador del pool común.
  3. Fusiona los resultados parciales — para reduce y collect, para eso sirve el combiner.

forEach en un stream paralelo llama a tu Consumer desde múltiples hilos de forma concurrente y en orden no especificado. forEachOrdered preserva el orden de encuentro al costo de sincronización. findFirst en paralelo es más costoso que findAny por la misma razón — tiene que coordinar para identificar la primera coincidencia.

El contrato — qué debe cumplir tu pipeline

El paralelo solo da una respuesta correcta cuando el pipeline obedece tres reglas. El código secuencial que las ignora sigue funcionando; el código paralelo que las ignora produce resultados sin sentido de forma silenciosa.

  1. El reductor debe ser asociativo. f(f(a, b), c) == f(a, f(b, c)). +, *, max, min, unión de conjuntos, concatenación de listas — todos califican. La resta, la división, "primera coincidencia" y "agregar a lista con orden" no. Si pasas un BinaryOperator no asociativo a reduce o Collectors.reducing, la respuesta depende de cómo la JVM decida dividir.
  2. El pipeline debe ser sin estado. Tus lambdas no deben leer ni escribir estado mutable compartido. Una lambda que captura y muta un ArrayList externo, incrementa un int[] externo, o usa cualquier contador no atómico tendrá condiciones de carrera en paralelo.
  3. El pipeline debe estar libre de efectos secundarios. El logging está bien; persistir a través de un destino seguro para hilos está bien; todo lo demás es un error esperando que un trabajador lo intercale de forma diferente.

Los collectors incorporados en Collectors cumplen 1–3 por construcción (cuando se usan como se documenta). Tus propias lambdas dentro de map, filter, reduce y peek son las que debes vigilar.

Cuándo ayuda el paralelo (y cuándo no)

Un stream paralelo gana solo cuando el trabajo por elemento es lo suficientemente grande como para superar el costo de coordinación — división, planificación, fusión y la administración del framework. Un modelo mental aproximado:

  • Fuente grande + trabajo por elemento limitado por CPU + fusión barata + fuente divisible = el paralelo suele ganar. Procesamiento de imágenes por píxel, análisis por registro, hashing por archivo — casos clásicos.
  • Fuente pequeña = gana el secuencial. El inicio del pool es más costoso que todo el cálculo.
  • Trabajo por elemento barato = gana el secuencial. nums.stream().mapToInt(Integer::intValue).sum() es más rápido que su versión parallelStream() hasta que nums tiene millones de elementos; en tamaños pequeños domina la sobrecarga del framework.
  • E/S bloqueante por elemento = los streams paralelos son la herramienta equivocada. El ForkJoinPool común está dimensionado para trabajo de CPU; una llamada de E/S bloqueante ocupa un trabajador y deja sin recursos a todos los demás streams paralelos en la JVM (incluidos los de las bibliotecas). Usa CompletableFuture con un executor acotado para la distribución de E/S.
  • Fuente no divisible = el paralelo o bien recurre al secuencial o se divide mal. Files.lines, Stream.iterate, Stream.generate y LinkedList.stream() son los que dividen peor; los arrays, ArrayList e IntStream.range son los que dividen mejor.

El consejo honesto: usa secuencial por defecto; cambia a paralelo solo cuando tengas una razón medida, con números de jmh o tiempos de reloj en mano.

Operaciones que se complican en paralelo

Algunas operaciones cuyo significado cambia cuando el pipeline va en paralelo:

  • forEach — se ejecuta desde múltiples hilos, en orden no especificado. Si el orden importa, usa forEachOrdered (que tiene costo de sincronización).
  • findFirst — debe coordinarse entre trabajadores para identificar la primera coincidencia en el orden de encuentro. Usa findAny si no te importa cuál gana.
  • limit / skip — bien definidos en streams ordenados, pero más costosos en paralelo porque la JVM debe respetar el orden. En un stream paralelo donde el orden no importa, stream.parallel().unordered().limit(n) es más barato.
  • distinct / sorted — deben coordinarse entre trabajadores; el buffer que mantienen es compartido.
  • reduce con la sobrecarga de 3 argumentos usa el combiner para fusionar las salidas de los trabajadores. Con la sobrecarga de 2 argumentos, la JVM usa la identidad dos veces más el acumulador — mismo contrato, misma regla de asociatividad.
  • collect — los Collectors están diseñados para ser seguros en paralelo; la advertencia es que el contenedor de resultado puede ser un HashMap o ArrayList normal, y la colección paralela coordina internamente para mantenerlo seguro. Tus collectors downstream deben obedecer el contrato.

La trampa del estado compartido, de forma concreta

El error más común en código paralelo de principiantes:

// WRONG -- looks fine, races in parallel
List<String> shouts = new ArrayList<>();
words.parallelStream().forEach(w -> shouts.add(w.toUpperCase()));

ArrayList.add no es seguro para hilos; los trabajadores concurrentes pueden perder elementos, agregar duplicados, lanzar ArrayIndexOutOfBoundsException o corromper la lista en silencio. La versión correcta expresa el resultado como la salida del pipeline, no un efecto secundario de él:

List<String> shouts = words.parallelStream().map(String::toUpperCase).toList();

toList(), como cualquier otro collector y terminal que produce un valor, está diseñado para uso paralelo. En el momento en que uses forEach para mutar una variable externa, habrás abandonado el camino seguro.

Si realmente necesitas un destino seguro para hilos para forEach, usa ConcurrentLinkedQueue, AtomicLong, LongAdder o Collections.synchronizedList(...). Pero casi siempre, la respuesta correcta es "no usar forEach para acumulación; dejar que el pipeline construya el resultado".

ForkJoinPool y por qué importa

Por defecto, todos los streams paralelos en tu JVM comparten el pool común, dimensionado a Runtime.getRuntime().availableProcessors() - 1 hilos trabajadores. Esto tiene dos consecuencias:

  • Un stream paralelo de larga duración monopoliza el pool. Cualquier otro stream paralelo — incluidos los de las bibliotecas — hará cola detrás de él.
  • Un stream paralelo que bloquea (E/S, locks, Thread.sleep) ocupa un hilo trabajador sin hacer ningún trabajo, reduciendo a la mitad el tamaño efectivo del pool mientras espera.

Puedes dedicar un pool privado para un pipeline específico:

try (var pool = new java.util.concurrent.ForkJoinPool(4)) {
    long total = pool.submit(() ->
        nums.parallelStream().mapToLong(n -> heavy(n)).sum()
    ).get();
}

Esta es la decisión correcta para cómputo de larga duración que no quieres compartir con el resto de la JVM. Sigue siendo la decisión incorrecta para E/S bloqueante — cambia a hilos virtuales o una cadena explícita de CompletableFuture en un executor de E/S acotado.

Un ejemplo práctico: aceleración paralela, la trampa del estado compartido y un error de asociatividad

El programa a continuación mide el tiempo secuencial vs. paralelo para una suma IntStream limitada por CPU, demuestra la condición de carrera de estado compartido con forEach, muestra la versión correcta basada en collectors, y contrasta los reductores asociativos (Integer::sum) con los no asociativos ((a, b) -> a - b) bajo paralelo.

java— editable, runs on the server

Qué destacar de la ejecución:

  • La suma paralela produjo el mismo resultado que la secuencial y (en cualquier máquina multinúcleo) terminó en una fracción del tiempo de reloj. La llamada heavy por elemento está limitada por CPU y la fuente (un int[]) se divide bien — los dos ingredientes que el paralelo necesita.
  • El forEach que mutó badSink o perdió elementos o falló. No hay solución que añada un synchronized aquí sin hacer la versión paralela más lenta que la secuencial. La solución es no escribir forEach para acumulación — usa un collector o un terminal que produzca el resultado.
  • Integer::sum es asociativo; la reducción paralela produjo la misma respuesta que la secuencial. El (a, b) -> a - b no asociativo produjo respuestas diferentes en secuencial vs. paralelo porque la JVM es libre de dividir y fusionar en cualquier orden equivalente-asociativo. El mismo código, dos respuestas — el síntoma que todo error de streams paralelos termina produciendo.
  • parallel().forEach(...) imprimió 0..15 en algún orden no monótono; parallel().forEachOrdered(...) los imprimió en orden al costo de sincronización entre trabajadores. Si tu forEach se preocupa por el orden, lo estás pagando.
  • El ForkJoinPool(2) privado ejecutó el pipeline contra un pool dedicado. Úsalo cuando tengas un trabajo de cómputo de larga duración y no quieras que comparta el pool común con el resto de la JVM. No lo uses como parche para E/S bloqueante — ese es un problema diferente con una herramienta diferente.

Qué sigue

Ahora puedes razonar sobre cualquier pipeline de stream: cuándo escribir uno, cómo construirlo, qué es lazy, qué hace cortocircuito, qué se ejecuta en paralelo de forma segura y qué no. Todavía queda una abstracción central sobre la mesa — la que permite a un pipeline expresar "este valor puede estar ausente" sin un solo null. El siguiente capítulo, Java Optional, cubre Optional<T> — qué es, dónde la API de streams deja sus cabos sueltos, y cómo usar map, flatMap, orElse e ifPresent para escribir código que es null-safe por construcción.

Práctica

Práctica
`nums.parallelStream().reduce(0, (a, b) -> a - b)` devuelve una respuesta diferente a su contraparte con `stream()`. ¿Por qué?
`nums.parallelStream().reduce(0, (a, b) -> a - b)` devuelve una respuesta diferente a su contraparte con `stream()`. ¿Por qué?
Was this page helpful?