There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

Answer from Lukáš Křečan on Stack Overflow
Top answer
1 of 16
520

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

2 of 16
251

The parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors() (This means that parallel streams leave one processor for the calling thread).

For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.

This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)

To change the way parallel streams are executed, you can either

  • submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); or
  • you can change the size of the common pool using system properties: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") for a target parallelism of 20 threads.

Example of the latter on my machine which has 8 processors. If I run the following program:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

The output is:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.

Discussions

Custom Thread Pools In Java Parallel Streams
Wish I read this before starting my current project. I was getting crazy memory leaks before I found out you had to shutdown the thread pools. Good article for beginners in parallel processing More on reddit.com
🌐 r/java
6
36
February 14, 2023
Is it possible to use virtual threads with parallel streams?
Parallel streams don't have to use the global fork join pool. If you use them in the context of your own fork join pool, that's the one they will use More on reddit.com
🌐 r/java
20
37
March 22, 2023
Pitfalls Of Java Parallel Streams
This is one of my main frustrations with Java streams. The API is harder to use than it could be to ensure that operations can be parallelised, but I almost never want parallel execution. More on reddit.com
🌐 r/programming
12
10
June 4, 2021
A surprising pain point regarding Parallel Java Streams (featuring mailing list discussion with Viktor Klang).
I did want to follow up about one point Viktor made later on in the conversation. https://mail.openjdk.org/pipermail/core-libs-dev/2024-November/134542.html And here is the quote. In a potential future where all intermediate operations are Gatherer-based, and all terminal operations are Collector-based, it would just work as expected. But with that said, I'm not sure it is practically achievable because some operations might not have the same performance-characteristics as before. Me personally, I would GLADLY accept a flag on stream (similar to parallel() or unordered()) that would allow me to guarantee that my stream never pre-fetches, even if I take a massive performance hit. If that can be accomplished by making all intermediate operations be implemented by a Gatherer under the hood, that is A-OK with me. The reality is, not all streams are compute bound. Some are IO bound, but are otherwise, a great fit for streams. Having a method that allows us to optimize for that fact is a new type of performance enhancement that I would greatly appreciate, even if it degrades performance in other ways. More on reddit.com
🌐 r/java
94
223
November 20, 2024
🌐
Java Code Geeks
javacodegeeks.com › home › core java
Java 8 Parallel Streams - Custom Thread Pools Examples - Java Code Geeks
April 28, 2021 - Examples on how to use custom pools with the Parallel streams API which avoids common thread pool usage. In this tutorial, You’ll learn how to create custom thread pools in Java 8 for bulk data processing with parallel streams powerful API.
🌐
Javaprogramto
javaprogramto.com › 2020 › 05 › java-8-parallel-streams-custom-threadpool.html
Java 8 Parallel Streams - Custom Thread Pools Examples JavaProgramTo.com
May 2, 2021 - In these cases, It is good to go with the custom thread pools with the parallel streams combination. Look at the below program, that runs with 5 threads using ForkJoinPool and inside creating a new parallel stream to find the sum of all numbers for the given range. package com.javaprogramto.java8.streams.parallel.streams; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import java.util.stream.IntStream; public class CustomPoolParallelStreams { public static void main(String[] args) throws Execu
🌐
Reddit
reddit.com › r/java › custom thread pools in java parallel streams
r/java on Reddit: Custom Thread Pools In Java Parallel Streams
February 14, 2023 - News, Technical discussions, research papers and assorted things of interest related to the Java programming language NO programming help, NO learning Java related questions, NO installing or downloading Java questions, NO JVM languages - Exclusively Java ... Archived post. New comments cannot be posted and votes cannot be cast. Share ... Wish I read this before starting my current project. I was getting crazy memory leaks before I found out you had to shutdown the thread pools. Good article for beginners in parallel processing
🌐
Javacodemonk
javacodemonk.com › java-8-parallel-stream-custom-threadpool-48643a91
Java 8 Parallel Stream custom ThreadPool
August 23, 2020 - There are two approaches to configure custom sized thread pool for Java 8 parallel stream operations - configure default common pool and running parallel stream operation inside ForkJoinPool.
🌐
DEV Community
dev.to › igalhaddad › java-8-parallel-stream-with-threadpool-32kd
Java 8 Parallel Stream with ThreadPool - DEV Community
February 10, 2020 - The creation of the thread pool is done by the utility. We control the number of threads in the pool (int parallelism), the name of the threads in the pool (useful when investigating threads dump), and optionally a timeout limit.
Find elsewhere
🌐
Glassthought
glassthought.com › notes › lc2wo9z3twpqypqncncbgzd
Parallel Streams with Custom Thread Pool - GlassThought.com
December 21, 2023 - import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; public class ParallelStreamExample { public static void main(String[] args) { List<Integer> data = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); int numberOfThreads = 4; // Specify the number of threads ForkJoinPool customThreadPool = new ForkJoinPool(numberOfThreads); try { // Submit the parallel stream execution to the custom thread pool List<Integer> result = customThreadPool.submit( () -> data.parallelStream() .map(ParallelStreamExample::process) .collect(Collectors.toList()) ).join(); // This will wait for all tasks to complete System.out.println("Processed data: " + result); } finally { customThreadPool.shutdown(); // Always shutdown the thread pool } } private static int process(int input) { // Example processing (this could be any operation) return input * input; } }
🌐
Geeky Hacker
geekyhacker.com › home › java › control threads number in java parallel stream
Control threads number in Java parallel stream - Geeky Hacker
March 30, 2024 - Then we created a custom fork-join pool with a size of five. Finally, we executed the parallelStream statement inside of the submit block. This control threads number in Java parallel stream and enforces the stream to process five records max at the same time.
🌐
GitHub
github.com › ferstl › parallel-stream-support
GitHub - ferstl/parallel-stream-support: Parallel streams in Java with a custom ForkJoinPool · GitHub
Parallel streams in Java with a custom ForkJoinPool - ferstl/parallel-stream-support
Starred by 31 users
Forked by 3 users
Languages   Java
🌐
javaspring
javaspring.net › blog › custom-thread-pool-in-java-8-parallel-stream
Is It Possible to Specify a Custom Thread Pool for Java 8 Parallel Streams? A Guide for Multi-Threaded Server Applications — javaspring.net
Avoid Shared Mutable State: Parallel streams require thread-safe operations (use ConcurrentHashMap or immutable objects). While Java 8 parallel streams do not directly support custom thread pools, you can work around this by submitting stream ...
🌐
Medium
medium.com › geekculture › pitfalls-of-java-parallel-streams-731fe0c1eb5f
Internals Of Java Parallel Streams | by Thameena S | Geek Culture | Medium
July 1, 2021 - When tasks are run in parallel using Java parallel streams, it internally uses threads from the default thread pool of ForkJoinPool called the commonPool(), which is a static thread pool.
🌐
Blogger
minborgsjavapot.blogspot.com › 2016 › 11 › work-with-parallel-database-streams.html
Minborg's Java Pot: Work with Parallel Database Streams using Custom Thread Pools
November 10, 2016 - final ForkJoinPool forkJoinPool = new ForkJoinPool(3); forkJoinPool.submit(() -> candidatesHigh.stream() .parallel() .filter(PrimeCandidate.PRIME.isNull()) .map(pc -> pc.setPrime(PrimeUtil.isPrime(pc.getValue()))) .forEach(candidatesHigh.updater()); ); try { forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1, TimeUnit.HOURS); } catch (InterruptedException ie) { ie.printStackTrace(); } The application code is unmodified, but wrapped into a custom ForkJoinPool that we can control ourselves. In the example above, we setup a thread pool with just three worker threads.
🌐
Java2Blog
java2blog.com › home › core java › java 8 › java parallel stream
Java Parallel Stream - Java2Blog
January 26, 2021 - To solve this issue, you can create own thread pool while processing the stream. ... This will create ForkJoinPool with target parallelism level. If you don’t pass parallelism, it will equal to the number of processors by default.
🌐
Java crumbs
blog.krecan.net › 2014 › 03 › 18 › how-to-specify-thread-pool-for-java-8-parallel-streams
How to specify thread-pool for Java 8 parallel streams | Java crumbs
March 18, 2014 - Just by calling the parallel() method, I will ensure that the stream library will split the stream to smaller chunks which will be processed in parallel. Great. There is only one drawback. It is not possible to specify the thread-pool to be used.
🌐
Codementor
codementor.io › community › controlling parallelism of java 8 collection streams
Controlling Parallelism of Java 8 Collection Streams | Codementor
January 23, 2017 - final int parallelism = 10; ForkJoinPool forkJoinPool = null; try { forkJoinPool = new ForkJoinPool(parallelism); forkJoinPool.submit(() -> //parallel stream invoked here feedUrls.parallelStream() .map(this::printFeedInfoStart) .map(new FeedParser()::parseFeed) .map(firebasePersistor::persist) .map(this::printFeedInfoEnd) .collect(Collectors.toList()) ).get(); //this makes it an overall blocking call } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { if (forkJoinPool != null) { forkJoinPool.shutdown(); //always remember to shutdown the pool } }
🌐
Medium
michaelbespalov.medium.com › parallel-stream-pitfalls-and-how-to-avoid-them-91f11808a16c
Parallel stream pitfalls and how to avoid them | by Michael Bespalov | Medium
December 20, 2017 - Now we can create a custom pool and use it to run the task above: ForkJoinPool pool = new ForkJoinPool(POOL_SIZE); int result = pool.submit(task).get(); Running the code above with pool size of 8, we can now verify that threads ForkJoinPool-1-worker-0 to ForkJoinPool-1-worker-7 are used. This way we essentially force the parallel stream to have a ForkJoin as a parent thread, instead of a regular thread, so ForkJoinPool.commonPool is not used.