There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"
There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"
The parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors() (This means that parallel streams leave one processor for the calling thread).
For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.
This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)
To change the way parallel streams are executed, you can either
- submit the parallel stream execution to your own ForkJoinPool:
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();or - you can change the size of the common pool using system properties:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")for a target parallelism of 20 threads.
Example of the latter on my machine which has 8 processors. If I run the following program:
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});
The output is:
215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416
So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.
ForkJoinPool and Nested (Parallel) Streams - Or why are inner streams faster with new pools ?
java - Why does the parallel stream not use all the threads of the ForkJoinPool? - Stack Overflow
java - StreamEx.parallel().forEach() does not run in parallel after .map() - Stack Overflow
concurrency - Why does parallelStream use a ForkJoinPool, not a normal thread pool? - Stack Overflow
What is the benefit of using a custom ForkJoinPool for parallel streams
Is setting java.util.concurrent.ForkJoinPool.common.parallelism always reliable
How does CompletableFuture help with custom ForkJoinPools
Videos
Hi all, so I've been doing some benchmarking at work to suss out how good (or bad) things are at various places. One of the instances I benchmarked recently was an instance where someone had coded up two nested parallelStreams. Something like so:
inputStream.parallel().forEach(
streamElement -> someList.stream().parallel()
.forEach(
innerEle -> {
// some work here using streamElement and innerEle
}).toList();
)My immediate thought was that since all parallelStreams draw from ForkJoinPool.commonPool() they'd end up fighting for resources and potentially make the whole thing slower.
But my next thought was...how much slower ?
So I went ahead and made a benchmark with JMH where I tested 3 conditions:
Nested parallel streams
Outer parallel stream and inner sequential stream
Nested parallel streams but with a new forkJoinPool for the inner stream so that it doesn't compete with the common pool. There's no real reason for me adding this in other than sheer curiosity.
The results are ... interesting. Here's my benchmarking code:
public class ParallelPerf {
u/State(Scope.Benchmark)
public static class StateData{
public static final List<Integer> outerLoop = IntStream.range(0, 32).boxed().toList();
public static final List<Integer> innerLoop = IntStream.range(0, 32).boxed().toList();
}
private static void runInNewPool(Runnable task) {
ForkJoinPool pool = new ForkJoinPool();
try {
pool.submit(task).join();
} finally {
pool.shutdown();
}
}
private static void innerParallelLoop() {
StateData.innerLoop.parallelStream().unordered().forEach(i -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
private static void innerSequentialLoop() {
StateData.innerLoop.stream().unordered().forEach(i -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
@Benchmark
public void testingNewPool(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
runInNewPool(ParallelPerf::innerParallelLoop);
bh.consume(i);
});
}
@Benchmark
public void testingCommonPoolWithSequentialInner(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerSequentialLoop();
bh.consume(i);
});
}
@Benchmark
public void testingCommonPool(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerParallelLoop();
bh.consume(i);
});
}
}And here are the results on my system:
Benchmark Mode Cnt Score Error Units ParallelPerf.testingCommonPool thrpt 25 1.992 ± 0.018 ops/s ParallelPerf.testingCommonPoolWithSequentialInner thrpt 25 1.802 ± 0.015 ops/s ParallelPerf.testingNewPool thrpt 25 23.136 ± 1.738 ops/s
Assuming my benching code is correct and I haven't screwed anything up, I'm quite surprised that the code with new pools is around 20x faster than the others. Why is it so much faster ?
One potential reason I could think of (caveat - I haven't verified this at all) is that maybe the new pool is able to grab one of the waiting threads from the common pool ? But this would indicate that the threads within commonPool are unable to do so, which doesn't seem right.
So fellow redditors - any guesses/insights as to what might be happening here ?
Update
Originally this answer was an elaborate explanation claiming that the ForkJoinPool applies back-pressure and doesn't even reach the prescribed parallelism level, because there are always idle workers available to process the stream.
That's incorrect.
The actual answer is provided in the original question to which this was marked as duplicate - using a custom ForkJoinPool for stream processing is not officially supported, and when using forEach, the default pool parallelism is used to determine the stream spliterator behavior.
Here's an example how when tasks are manually submitted to a custom ForkJoinPool, the pool's active thread count easily reaches its parallelism level:
for (int i = 0; i < 1_000_000; ++i) {
forkJoinPool.submit(() -> {
try {
Thread.sleep(1);
thNames.add(Thread.currentThread().getName());
System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
Thanks to Stuart Marks for pointing this out and to Sotirios Delimanolis for arguing that my original answer is wrong :)
It seems to me that when you submit a lambda to the FJP that lambda will use the common pool and not the FJP. Sotirios Delimanolis proved this with his comment, above. What you are submitting is a Task that runs in your FJP.
Try profiling this code to see what threads are actually being used.
You cannot name the threads within the FJP.
Simple answer: it's a bug. I filed and fixed it. This was overlooked by tests as tests only check that all the operations are executed in the specified pool, but don't check whether different threads of pool are used (sometimes it's ok if parallelization does not work, e.g. for a stream of one element only).
A fix is available in 0.6.4 release. In previous releases to work-around the problem you may consider using .parallel().parallel(fjp): it should parallelize correctly.
Please consider reporting StreamEx problems to official StreamEx issue tracker. I visit StackOverflow only occasionally these days, so may overlook the problems reported here.
The problem is that as soon as you call the map(...) method, StreamEx creates the underlying Java 8 stream with the sequential/parallel configuration as of that point, (i.e. sequential), and calling parallel(...) after that does not appear to update the underlying Java 8 stream.
The solution depends on what you are trying to achieve. If you're happy for your map(...) operation to be run in parallel as well, then just move the parallel(...) operation up so that it is the first thing after the of(...).
However, if you want some operations to be carried out sequentially, before some parallel operations, then you'd be better using two streams. For example, following the style of your sample code:
public static void executeAsParallelAfterMapV2() {
logger.info("executeAsParallelAfterMapV2():");
List<String> testList = getTestList();
StreamEx<String> sequentialStream = StreamEx
.of(testList)
.map(item -> {
logger.info("Mapping {}", item);
return item + "_mapped";
});
logger.info("sequentialStream.isParallel(): {}", sequentialStream.isParallel());
List<String> afterSequentialProcessing = sequentialStream.toList();
StreamEx<String> streamOfItems = StreamEx.of(afterSequentialProcessing)
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
This gives something like:
20:43:36.835 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapV2():
20:43:36.883 [main] INFO scott.streams.ParallelExample - sequentialStream.isParallel(): false
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_0
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_1
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_2
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_3
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_4
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_5
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_6
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_7
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_8
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_9
20:43:36.886 [main] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:43:36.889 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:43:36.889 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped
An Aside...
Out of interest, if you create a Java 8 stream directly, (not using StreamEx), and put the parallel() operation below the map(...), then it does update the type of the (whole) stream to be parallel:
public static void executeAsParallelAfterMapJava8Stream() throws InterruptedException {
logger.info("executeAsParallelAfterMapJava8Stream():");
List<String> testList = getTestList();
s3ThreadPool.submit(() -> {
Stream<String> streamOfItems = testList.stream()
.map(item -> {
logger.info("Mapping {}", item);
return item + "_mapped";
})
.parallel();
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}).join();
}
If you create a similar unit test then you get something similar to:
20:36:23.469 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapJava8Stream():
20:36:23.517 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_6
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_2
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_8
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_5
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_4
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_9
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_1
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_3
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_7
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:36:23.521 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:36:23.521 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_0
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped