There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"
There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"
The parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors() (This means that parallel streams leave one processor for the calling thread).
For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.
This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)
To change the way parallel streams are executed, you can either
- submit the parallel stream execution to your own ForkJoinPool:
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();or - you can change the size of the common pool using system properties:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")for a target parallelism of 20 threads.
Example of the latter on my machine which has 8 processors. If I run the following program:
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});
The output is:
215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416
So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.
The Oracle's implementation[1] of parallel stream uses the current thread and in addition to that, if needed, also the threads that compose the default fork join pool ForkJoinPool.commonPool(), which has a default size equal to one less than the number of cores of your CPU.
That default size of the common pool can be changed with this property:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
Alternatively, you can use your own pool:
ForkJoinPool myPool = new ForkJoinPool(8);
myPool.submit(() ->
list.parallelStream().forEach(/* Do Something */);
).get();
Regarding the order, jobs will be executed as soon as a thread is available, in no specific order.
As correctly pointed out by @Holger this is an implementation specific detail (with just one vague reference at the bottom of a document), both approaches will work on Oracle's JVM but are definitely not guaranteed to work on JVMs from other vendors, the property could not exist in a non-Oracle implementation and Streams could not even use a ForkJoinPool internally rendering the alternative based on the behavior of ForkJoinTask.fork completely useless (see here for details on this).
While @uraimo is correct, the answer depends on exactly what "Do Something" does. The parallel.streams API uses the CountedCompleter Class which has some interesting problems. Since the F/J framework does not use a separate object to hold results, long chains may result in an OOME. Also those long chains can sometimes cause a Stack Overflow. The answer to those problems is the use of the Paraquential technique as I pointed out in this article.
The other problem is excessive thread creation when using nested parallel forEach.
Given a large number of tasks, are threads or parallel streams more performant?
Is it possible to use virtual threads with parallel streams?
A surprising pain point regarding Parallel Java Streams (featuring mailing list discussion with Viktor Klang).
Which way it is thread-safe in Java Stream?
Videos
Is it better to make a thread for each element of an array or to use a parallel stream? (The array is indexed).
All parallel streams share the same Fork-Join pool yeah? Can we configure that pool to use virtual threads so we can have multiple parallel streams without any of them being blocked by each other? Is there something I'm missing that doesn't make this a good idea?