There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

Answer from Lukáš Křečan on Stack Overflow
Top answer
1 of 16
520

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

2 of 16
251

The parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors() (This means that parallel streams leave one processor for the calling thread).

For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.

This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)

To change the way parallel streams are executed, you can either

  • submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); or
  • you can change the size of the common pool using system properties: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") for a target parallelism of 20 threads.

Example of the latter on my machine which has 8 processors. If I run the following program:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

The output is:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.

🌐
Lankydan
lankydan.dev › 2017 › 02 › 01 › common-fork-join-pool-and-streams
Common Fork Join Pool and Streams | Lanky Dan Blog
February 1, 2017 - Thread [main] [Daemon Thread ... into the forEach code it shows that when a parallel stream is ran it uses the main thread and the threads in the Common Fork Join Pool....
Discussions

ForkJoinPool and Nested (Parallel) Streams - Or why are inner streams faster with new pools ?
Please ensure that: Your code is properly formatted as code block - see the sidebar (About on mobile) for instructions You include any and all error messages in full You ask clear questions You demonstrate effort in solving your question/problem - plain posting your assignments is forbidden (and such posts will be removed) as is asking for or giving solutions. Trying to solve problems on your own is a very important skill. Also, see Learn to help yourself in the sidebar If any of the above points is not met, your post can and will be removed without further warning. Code is to be formatted as code block (old reddit: empty line before the code, each code line indented by 4 spaces, new reddit: https://i.imgur.com/EJ7tqek.png ) or linked via an external code hoster, like pastebin.com, github gist, github, bitbucket, gitlab, etc. Please, do not use triple backticks (```) as they will only render properly on new reddit, not on old reddit. Code blocks look like this: public class HelloWorld { public static void main(String[] args) { System.out.println("Hello World!"); } } You do not need to repost unless your post has been removed by a moderator. Just use the edit function of reddit to make sure your post complies with the above. If your post has remained in violation of these rules for a prolonged period of time (at least an hour), a moderator may remove it at their discretion. In this case, they will comment with an explanation on why it has been removed, and you will be required to resubmit the entire post following the proper procedures. To potential helpers Please, do not help if any of the above points are not met, rather report the post. We are trying to improve the quality of posts here. In helping people who can't be bothered to comply with the above points, you are doing the community a disservice. I am a bot, and this action was performed automatically. Please contact the moderators of this subreddit if you have any questions or concerns. More on reddit.com
🌐 r/javahelp
8
3
December 3, 2024
java - Why does the parallel stream not use all the threads of the ForkJoinPool? - Stack Overflow
Originally this answer was an elaborate ... parallelism level, because there are always idle workers available to process the stream. That's incorrect. The actual answer is provided in the original question to which this was marked as duplicate - using a custom ForkJoinPool for stream processing is not officially supported, and when using forEach, the default ... More on stackoverflow.com
🌐 stackoverflow.com
java - StreamEx.parallel().forEach() does not run in parallel after .map() - Stack Overflow
I noticed that if I use StreamEx lib to parallel out my streams with a custom ForkJoinPool as below - the subsequent actions do run in parallel threads from that pool. However, if I add a map() ope... More on stackoverflow.com
🌐 stackoverflow.com
concurrency - Why does parallelStream use a ForkJoinPool, not a normal thread pool? - Stack Overflow
Here's where the ForkJoinPool shines with its multiple queues. Every worker just takes tasks from its own queue, which doesn't need to be synchronized by blocking most of the time, and if it's empty, it can steal a task from another worker, but from the other end of the queue, which also leads rarely to synchronization overhead as work-stealing is supposed to be rather rare. Now what does that have to do with parallel streams... More on stackoverflow.com
🌐 stackoverflow.com
🌐
Medium
medium.com › @thecodealchemistX › java-parallel-streams-and-the-fork-join-pool-what-every-developer-needs-to-understand-ec549803e414
Java Parallel Streams and the Fork/Join Pool: What Every Developer Needs to Understand | by The Code Alchemist | Medium
May 18, 2025 - List<Integer> sharedList = Collections.synchronizedList(new ArrayList<>()); numbers.parallelStream() .forEach(n -> sharedList.add(n)); // Race condition risk · Fix: Use collect() to safely aggregate results: List<Integer> safeList = numbers.parallelStream() .collect(Collectors.toList()); If your app uses the common pool for other tasks (e.g., custom Fork/Join tasks), Parallel Streams may suffer. Fix: Use a custom Fork/Join Pool for your Parallel Stream: ForkJoinPool customPool = new ForkJoinPool(4); long sum = customPool.submit(() -> numbers.parallelStream() .mapToLong(Integer::longValue) .sum() ).join(); customPool.shutdown(); Understanding Java Parallel Streams and the Fork/Join Pool is crucial for every developer aiming to optimize performance.
🌐
Baeldung
baeldung.com › home › java › java streams › when to use a parallel stream in java
When to Use a Parallel Stream in Java | Baeldung
November 10, 2025 - List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4); listOfNumbers.parallelStream().forEach(number -> System.out.println(number + " " + Thread.currentThread().getName()) ); Parallel streams enable us to execute code in parallel on separate cores.
🌐
Reddit
reddit.com › r/javahelp › forkjoinpool and nested (parallel) streams - or why are inner streams faster with new pools ?
r/javahelp on Reddit: ForkJoinPool and Nested (Parallel) Streams - Or why are inner streams faster with new pools ?
December 3, 2024 -

Hi all, so I've been doing some benchmarking at work to suss out how good (or bad) things are at various places. One of the instances I benchmarked recently was an instance where someone had coded up two nested parallelStreams. Something like so:

inputStream.parallel().forEach(
  streamElement -> someList.stream().parallel()
                      .forEach( 
                        innerEle -> {
                        // some work here using streamElement and innerEle
                        }).toList();
)

My immediate thought was that since all parallelStreams draw from ForkJoinPool.commonPool() they'd end up fighting for resources and potentially make the whole thing slower.

But my next thought was...how much slower ?

So I went ahead and made a benchmark with JMH where I tested 3 conditions:

  • Nested parallel streams

  • Outer parallel stream and inner sequential stream

  • Nested parallel streams but with a new forkJoinPool for the inner stream so that it doesn't compete with the common pool. There's no real reason for me adding this in other than sheer curiosity.

The results are ... interesting. Here's my benchmarking code:

public class ParallelPerf {
  u/State(Scope.Benchmark)
  public static class StateData{
    public static final List<Integer> outerLoop = IntStream.range(0, 32).boxed().toList();
    public static final List<Integer> innerLoop = IntStream.range(0, 32).boxed().toList();
  }
  private static void runInNewPool(Runnable task) {
    ForkJoinPool pool = new ForkJoinPool();
    try {
      pool.submit(task).join();
    } finally {
      pool.shutdown();
    }
  }
  private static void innerParallelLoop() {
    StateData.innerLoop.parallelStream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  private static void innerSequentialLoop() {
    StateData.innerLoop.stream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  @Benchmark
  public void testingNewPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      runInNewPool(ParallelPerf::innerParallelLoop);
      bh.consume(i);
    });
  }

  @Benchmark
  public void testingCommonPoolWithSequentialInner(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerSequentialLoop();
      bh.consume(i);
    });
  }
  @Benchmark
  public void testingCommonPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerParallelLoop();
      bh.consume(i);
    });
  }
}

And here are the results on my system:

Benchmark                                           Mode  Cnt   Score   Error  Units
ParallelPerf.testingCommonPool                     thrpt   25   1.992 ± 0.018  ops/s
ParallelPerf.testingCommonPoolWithSequentialInner  thrpt   25   1.802 ± 0.015  ops/s
ParallelPerf.testingNewPool                        thrpt   25  23.136 ± 1.738  ops/s

Assuming my benching code is correct and I haven't screwed anything up, I'm quite surprised that the code with new pools is around 20x faster than the others. Why is it so much faster ?

One potential reason I could think of (caveat - I haven't verified this at all) is that maybe the new pool is able to grab one of the waiting threads from the common pool ? But this would indicate that the threads within commonPool are unable to do so, which doesn't seem right.

So fellow redditors - any guesses/insights as to what might be happening here ?

Top answer
1 of 2
2
It takes time to spin up and shut down a new thread pool.
2 of 2
1
Please ensure that: Your code is properly formatted as code block - see the sidebar (About on mobile) for instructions You include any and all error messages in full You ask clear questions You demonstrate effort in solving your question/problem - plain posting your assignments is forbidden (and such posts will be removed) as is asking for or giving solutions. Trying to solve problems on your own is a very important skill. Also, see Learn to help yourself in the sidebar If any of the above points is not met, your post can and will be removed without further warning. Code is to be formatted as code block (old reddit: empty line before the code, each code line indented by 4 spaces, new reddit: https://i.imgur.com/EJ7tqek.png ) or linked via an external code hoster, like pastebin.com, github gist, github, bitbucket, gitlab, etc. Please, do not use triple backticks (```) as they will only render properly on new reddit, not on old reddit. Code blocks look like this: public class HelloWorld { public static void main(String[] args) { System.out.println("Hello World!"); } } You do not need to repost unless your post has been removed by a moderator. Just use the edit function of reddit to make sure your post complies with the above. If your post has remained in violation of these rules for a prolonged period of time (at least an hour), a moderator may remove it at their discretion. In this case, they will comment with an explanation on why it has been removed, and you will be required to resubmit the entire post following the proper procedures. To potential helpers Please, do not help if any of the above points are not met, rather report the post. We are trying to improve the quality of posts here. In helping people who can't be bothered to comply with the above points, you are doing the community a disservice. I am a bot, and this action was performed automatically. Please contact the moderators of this subreddit if you have any questions or concerns.
🌐
Baeldung
baeldung.com › home › java › java concurrency › custom thread pools in java parallel streams
Custom Thread Pools in Java Parallel Streams | Baeldung
December 28, 2023 - We used the ForkJoinPool constructor with a parallelism level of 4. Some experimentation is required to determine the optimal value for different environments, but a good rule of thumb is simply choosing the number based on how many cores your CPU has. Next, we processed the content of the parallel Stream...
🌐
DZone
dzone.com › coding › javascript › a look at forkjoinpool and parallel streams
A Look at ForkJoinPool and Parallel Streams
December 30, 2016 - To support parallelism for collections using parallel streams, a common ForkJoinPool is used internally.
Find elsewhere
🌐
Coderanch
coderanch.com › t › 653300 › java › ForkJoinPool-exit-processing-elements-ParallelStream
ForkJoinPool exit without processing all elements in the ParallelStream (Features new in Java 8 forum at Coderanch)
I am executing a parallel stream with n number of elements; however the the forJoin finish before my collection 'records' processes all elements in parallel stream. Below my code: Am I missing something in Parallel Stream? I was expecting ForJoinPool to be closed after all elements in forEach(...
🌐
DEV Community
dev.to › igalhaddad › java-8-parallel-stream-with-threadpool-32kd
Java 8 Parallel Stream with ThreadPool - DEV Community
February 10, 2020 - at com.github.igalhaddad.threa...runWorker(ForkJoinPool.java:1598) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177) The testParallel(int parallelism) method execute Arrays.parallelSetAll which is in fact just a simple parallel stream, as implemented in the java source code: public static void parallelSetAll(long[] array, IntToLongFunction generator) { Objects.requireNonNull(generator); IntStream.range(0, array.length).parallel().forEach(i -> { array[i] ...
🌐
Medium
medium.com › @alxkm › java-forkjoinpool-a-comprehensive-guide-041fb51215db
Java ForkJoinPool: A Comprehensive Guide | by Alex Klimenko | Medium
June 25, 2024 - The ForkJoinPool class is a powerful tool in Java’s concurrency framework, enabling efficient parallel execution of tasks. It is particularly useful for recursive algorithms and CPU-bound tasks. By configuring ForkJoinPool and utilizing it within frameworks like Java Streams and Spring, developers can significantly improve application performance.
🌐
sqlpey
sqlpey.com › java › java-parallel-streams-custom-forkjoinpool
Java Parallel Streams: Customizing ForkJoinPool Execution - …
November 4, 2025 - ForkJoinPool fjpool = new ForkJoinPool(10); System.out.println(“stream.parallel”); IntStream range = IntStream.range(0, 20); fjpool.submit(() -> range.parallel() .forEach((int theInt) -> { try { Thread.sleep(100); } catch (Exception ignore) {} System.out.println(Thread.currentThread().getName() + " – " + theInt); })).get(); System.out.println(“list.parallelStream”); int [] array = IntStream.range(0, 20).toArray(); List<Integer> list = new ArrayList<>(); for (int theInt: array) { list.add(theInt); } fjpool.submit(() -> list.parallelStream() .forEach((theInt) -> { try { Thread.sleep(100); } catch (Exception ignore) {} System.out.println(Thread.currentThread().getName() + " – " + theInt); })).get(); Testing common pool (non-parallel) -Djava.util.concurrent.ForkJoinPool.common.parallelism=16 ·
🌐
Geeky Hacker
geekyhacker.com › home › java › control threads number in java parallel stream
Control threads number in Java parallel stream - Geeky Hacker
March 30, 2024 - import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream; public class Application { private void processLargeDataSet() { List<String> largeDataset = getLargeDataset(); ForkJoinPool customThreadPool = new ForkJoinPool(5); customThreadPool.submit(() -> largeDataset.parallelStream().forEach(System.out::println)); customThreadPool.shutdownNow(); } private List<String> getLargeDataset() { List<String> largeDataset = new ArrayList<>(); IntStream.range(0, Integer.MAX_VALUE).forEach(i -> { largeDataset.add(UUID.randomUUID().toString()); }); return largeDataset; } }
🌐
Christian-fries
christian-fries.de › blog › files › 2014-nested-java-8-parallel-foreach.html
Nested parallelism, Java 8 parallel streams, Bug in the ForkJoinPool framework. | Software (English Text) | fr135.de
The parallel streams use a common ForkJoinPool as a backend, and this behavior is implemented in the ForkJoinTask: it distinguishes between a ForkJoinWorkerThread and the main Thread and all tasks which cannot be completed by a ForkJoinWorkerThread can run (sequentially) on the main Thread.
Top answer
1 of 2
11

Update

Originally this answer was an elaborate explanation claiming that the ForkJoinPool applies back-pressure and doesn't even reach the prescribed parallelism level, because there are always idle workers available to process the stream.

That's incorrect.

The actual answer is provided in the original question to which this was marked as duplicate - using a custom ForkJoinPool for stream processing is not officially supported, and when using forEach, the default pool parallelism is used to determine the stream spliterator behavior.

Here's an example how when tasks are manually submitted to a custom ForkJoinPool, the pool's active thread count easily reaches its parallelism level:

for (int i = 0; i < 1_000_000; ++i) {
   forkJoinPool.submit(() -> {
      try {
         Thread.sleep(1);
         thNames.add(Thread.currentThread().getName());
         System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
   });
}

Thanks to Stuart Marks for pointing this out and to Sotirios Delimanolis for arguing that my original answer is wrong :)

2 of 2
-5

It seems to me that when you submit a lambda to the FJP that lambda will use the common pool and not the FJP. Sotirios Delimanolis proved this with his comment, above. What you are submitting is a Task that runs in your FJP.

Try profiling this code to see what threads are actually being used.

You cannot name the threads within the FJP.

🌐
Medium
medium.com › geekculture › pitfalls-of-java-parallel-streams-731fe0c1eb5f
Internals Of Java Parallel Streams | by Thameena S | Geek Culture | Medium
July 1, 2021 - Consider an example where a stream is run on a list of numbers, which transforms the number by multiplying it by 10 and returns the result. The name of the thread being used is printed in transform(). ... The sequential execution uses only the main thread whereas parallel execution uses both main thread and threads from ForkJoinPool...
🌐
Orz
blog.orz.at › 2017 › 10 › 18 › forkjoin
Javaのforkjoinとparallel()について | たむたむの日記
May 7, 2023 - ForkJoinPool.submit(stream.parallel().forEach) はこの構造を一気に構築する。
🌐
GitHub
github.com › ferstl › parallel-stream-support
GitHub - ferstl/parallel-stream-support: Parallel streams in Java with a custom ForkJoinPool · GitHub
Q: Does this library also support sequential streams? A: Yes. Just call sequential() on the stream and it will be processed within the calling thread. When created, all streams of this library are configured to be parallel.
Starred by 31 users
Forked by 3 users
Languages   Java
🌐
DZone
dzone.com › data engineering › databases › common fork join pool and streams
Common Fork Join Pool and Streams
February 6, 2017 - Thread [main] [Daemon Thread ... into the forEach code, it shows that when a parallel stream is run, it uses the main thread and the threads in the Common Fork Join Pool....
Top answer
1 of 2
6

Simple answer: it's a bug. I filed and fixed it. This was overlooked by tests as tests only check that all the operations are executed in the specified pool, but don't check whether different threads of pool are used (sometimes it's ok if parallelization does not work, e.g. for a stream of one element only).

A fix is available in 0.6.4 release. In previous releases to work-around the problem you may consider using .parallel().parallel(fjp): it should parallelize correctly.

Please consider reporting StreamEx problems to official StreamEx issue tracker. I visit StackOverflow only occasionally these days, so may overlook the problems reported here.

2 of 2
3

The problem is that as soon as you call the map(...) method, StreamEx creates the underlying Java 8 stream with the sequential/parallel configuration as of that point, (i.e. sequential), and calling parallel(...) after that does not appear to update the underlying Java 8 stream.

The solution depends on what you are trying to achieve. If you're happy for your map(...) operation to be run in parallel as well, then just move the parallel(...) operation up so that it is the first thing after the of(...).

However, if you want some operations to be carried out sequentially, before some parallel operations, then you'd be better using two streams. For example, following the style of your sample code:

public static void executeAsParallelAfterMapV2() {
    logger.info("executeAsParallelAfterMapV2():");
    List<String> testList = getTestList();
    StreamEx<String> sequentialStream = StreamEx
            .of(testList)
            .map(item -> {
                logger.info("Mapping {}", item);
                return item + "_mapped";
            });
    logger.info("sequentialStream.isParallel(): {}", sequentialStream.isParallel());

    List<String> afterSequentialProcessing = sequentialStream.toList();
    StreamEx<String> streamOfItems = StreamEx.of(afterSequentialProcessing)
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

This gives something like:

20:43:36.835 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapV2():
20:43:36.883 [main] INFO scott.streams.ParallelExample - sequentialStream.isParallel(): false
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_0
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_1
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_2
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_3
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_4
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_5
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_6
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_7
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_8
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_9
20:43:36.886 [main] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:43:36.889 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:43:36.889 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped

An Aside...

Out of interest, if you create a Java 8 stream directly, (not using StreamEx), and put the parallel() operation below the map(...), then it does update the type of the (whole) stream to be parallel:

public static void executeAsParallelAfterMapJava8Stream() throws InterruptedException {
    logger.info("executeAsParallelAfterMapJava8Stream():");
    List<String> testList = getTestList();

    s3ThreadPool.submit(() -> {
        Stream<String> streamOfItems = testList.stream()
                .map(item -> {
                    logger.info("Mapping {}", item);
                    return item + "_mapped";
                })
                .parallel();
        logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
        streamOfItems.forEach(item -> handleItem(item));
    }).join();
}

If you create a similar unit test then you get something similar to:

20:36:23.469 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapJava8Stream():
20:36:23.517 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_6
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_2
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_8
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_5
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_4
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_9
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_1
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_3
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_7
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:36:23.521 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:36:23.521 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_0
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped
Top answer
1 of 1
8

One important thing is that a ForkJoinPool can execute "normal" tasks (e.g. Runnable, Callable) as well, so it's not just meant to be used with recursively-created tasks.

Another (important) thing is that ForkJoinPool has multiple queues, one for each worker thread, for the tasks, where a normal executor (e.g. ThreadPoolExecutor) has just one. This has much impact on what kind of tasks they should run.

The smaller and the more tasks a normal executor has to execute, the higher is the overhead of synchronization for distributing tasks to the workers. If most of the tasks are small, the workers will access the internal task queue often, which leads to synchronization overhead.

Here's where the ForkJoinPool shines with its multiple queues. Every worker just takes tasks from its own queue, which doesn't need to be synchronized by blocking most of the time, and if it's empty, it can steal a task from another worker, but from the other end of the queue, which also leads rarely to synchronization overhead as work-stealing is supposed to be rather rare.

Now what does that have to do with parallel streams? The streams-framework is designed to be easy to use. Parallel streams are supposed to be used when you want to split something up in many concurrent tasks easily, where all tasks are rather small and simple. Here's the point where the ForkJoinPool is the reasonable choice. It provides the better performance on huge numbers of smaller tasks and it can handle longer tasks as well, if it has to.