There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"
There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.
final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"
The parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors() (This means that parallel streams leave one processor for the calling thread).
For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.
This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)
To change the way parallel streams are executed, you can either
- submit the parallel stream execution to your own ForkJoinPool:
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();or - you can change the size of the common pool using system properties:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")for a target parallelism of 20 threads.
Example of the latter on my machine which has 8 processors. If I run the following program:
long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});
The output is:
215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416
So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:
215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216
This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.
Is it possible to use virtual threads with parallel streams?
Custom Thread Pools In Java Parallel Streams
Alternating between Java streams and parallel streams at runtime - Software Engineering Stack Exchange
Pitfalls Of Java Parallel Streams
Videos
All parallel streams share the same Fork-Join pool yeah? Can we configure that pool to use virtual threads so we can have multiple parallel streams without any of them being blocked by each other? Is there something I'm missing that doesn't make this a good idea?
You can define a custom thread pool by implementing the (Executor) interface that increases or decreases the number of threads in the pool as needed. You can submit your parallelStream chain to it as shown here using a ForkJoinPool:
I've created a working example which prints the threads that are doing the work:
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class TestParallel
{
public static void main(String... args) throws InterruptedException, ExecutionException
{
testParallel();
}
static Long sum(long a, long b)
{
System.out.println(Thread.currentThread() + " - sum: " + a + " " + b);
return a + b;
}
public static void testParallel()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 10;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
System.out.println("custom: ");
System.out.println();
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long totalCustom = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, TestParallel::sum)).get();
System.out.println();
System.out.println("standard: ");
System.out.println();
long totalStandard = aList.parallelStream().reduce(0L, TestParallel::sum);
System.out.println();
System.out.println(totalCustom + " " + totalStandard);
}
}
Personally, if you want to get to that level of control, I'm not sure the streaming API is worth bothering with. It's not doing anything you can't do with Executors and concurrent libs. It's just a simplified facade to those features with limited capabilities.
Streams are kind of nice when you need to lay out a simple multi-step process in a little bit of code. But if all you are doing is using them to manage parallelism of tasks, the Executors and ExecutorService are more straightforward IMO. One thing I would avoid is pushing the number of threads above your machine's native thread count unless you have IO-bound processing. And if that's the case NIO is the more efficient solution.
What I'm not sure about is what the logic is that decides when to use multiple threads and when to use one. You'd have to better explain what factors come into play.
I don't know if this is useful but there is a design pattern called Bridge that decouples the abstraction from its implementation so you can, at runtime change between implementations.
A simple example would be a stack. For stacks where the total amount of data stored at one time is relatively small, it is more efficient to use an array. When the amount of data hits a certain point, it becomes better to use a linked-list. The stack implementation determines when it switches from one to the other.
For your case, it sounds like the processing would be behind some interface and based on the volume (do you know it before you start the processing?) your Processor class could use streams or parallel streams as appropriate.