One and two use ForkJoinPool which is designed exactly for parallel processing of one task while ThreadPoolExecutor is used for concurrent processing of independent tasks. So One and Two are supposed to be faster.

Answer from Evgeniy Dorofeev on Stack Overflow
🌐
Reddit
reddit.com › r/java › fork/join framework vs. parallel streams vs. executorservice: the ultimate fork/join benchmark
r/java on Reddit: Fork/Join Framework vs. Parallel Streams vs. ExecutorService: The Ultimate Fork/Join Benchmark
January 20, 2015 - This benchmark is a little flawed, Fork-Join is just a specialized executor service and parallel streams use fork-join....all that is being compared is the developer's efficiency at using each API type.
Top answer
1 of 2
5

One and two use ForkJoinPool which is designed exactly for parallel processing of one task while ThreadPoolExecutor is used for concurrent processing of independent tasks. So One and Two are supposed to be faster.

2 of 2
3

When you use .filter(element -> f(element)).collect(Collectors.toList()), it will collect the matching elements into a List, whereas .collect(Collectors.partitioningBy(element -> f(element))) will collect all elements into either of two lists, followed by you dropping one of them and only retrieving the list of matches via .get(true).

It should be obvious that the second variant can only be on par with the first in the best case, i.e. if all elements match the predicate anyway or when the JVM’s optimizer is capable of removing the redundant work. In the worst lase, e.g. when no element matches, the second variant collects a list of all elements only to drop it afterwards, where the first variant would not collect any element.

The third variant is not comparable, as you didn’t show an actual implementation but just a sketch. There is no point in comparing a hypothetical implementation with an actual. The logic you describe, is the same as the logic of the parallel stream implementation. So you’re just reinventing the wheel. It may happen that you do something slightly better than the reference implementation or just better tailored to your specific task, but chances are much higher that you overlook things which the Stream API implementors did already consider during the development process which lasted several years.

So I wouldn’t place any bets on your third variant. If we add the time you will need to complete the third variant’s implementation, it can never be more efficient than just using either of the other variants.

So the first variant is the most efficient variant, especially as it is also the simplest, most readable, directly expressing your intent.

🌐
Harness
harness.io › blog › service reliability management › fork/join framework vs. parallel streams vs. executorservice: the ultimate fork/join benchmark
Fork/Join Framework vs. Parallel Streams vs. ExecutorService
1 month ago - Syntactic sugar aside (lambdas! we didn’t mention lambdas), we’ve seen parallel streams perform better than the Fork/Join and the ExecutorService implementations. 6GB of text indexed in 24.33 seconds.
🌐
DZone
dzone.com › coding › frameworks › fork/join framework vs. parallel streams vs. executorservice: the ultimate fork/join benchmark
Fork/Join Framework vs. Parallel Streams vs. ExecutorService: The Ultimate Fork/Join Benchmark
April 1, 2015 - Syntactic sugar aside (lambdas! we didn’t mention lambdas), we’ve seen parallel streams perform better than the Fork/Join and the ExecutorService implementations. 6GB of text indexed in 24.33 seconds.
🌐
Reddit
reddit.com › r/java › a surprising pain point regarding parallel java streams (featuring mailing list discussion with viktor klang).
r/java on Reddit: A surprising pain point regarding Parallel Java Streams (featuring mailing list discussion with Viktor Klang).
November 20, 2024 -

First off, apologies for being AWOL. Been (and still am) juggling a lot of emergencies, both work and personal.

My team was in crunch time to respond to a pretty ridiculous client ask. In order to get things in in time, we had to ignore performance, and kind of just took the "shoot first, look later" approach. We got surprisingly lucky, except in one instance where we were using Java Streams.

It was a seemingly simple task -- download a file, split into several files based on an attribute, and then upload those split files to a new location.

But there is one catch -- both the input and output files were larger than the amount of RAM and hard disk available on the machine. Or at least, I was told to operate on that assumption when developing a solution.

No problem, I thought. We can just grab the file in batches and write out the batches.

This worked out great, but the performance was not good enough for what we were doing. In my overworked and rushed mind, I thought it would be a good idea to just turn on parallelism for that stream. That way, we could run N times faster, according to the number of cores on that machine, right?

Before I go any further, this is (more or less) what the stream looked like.

try (final Stream<String> myStream = SomeClass.openStream(someLocation)) {
    myStream
        .parallel()
        //insert some intermediate operations here
        .gather(Gatherers.windowFixed(SOME_BATCH_SIZE))
        //insert some more intermediate operations here
        .forEach(SomeClass::upload)
        ;
}

So, running this sequentially, it worked just fine on both smaller and larger files, albeit, slower than we needed.

So I turned on parallelism, ran it on a smaller file, and the performance was excellent. Exactly what we wanted.

So then I tried running a larger file in parallel.

OutOfMemoryError

I thought, ok, maybe the batch size is too large. Dropped it down to 100k lines (which is tiny in our case).

OutOfMemoryError

Getting frustrated, I dropped my batch size down to 1 single, solitary line.

OutOfMemoryError

Losing my mind, I boiled down my stream to the absolute minimum possible functionality possible to eliminate any chance of outside interference. I ended up with the following stream.

final AtomicLong rowCounter = new AtomicLong();
myStream
    .parallel()
    //no need to batch because I am literally processing this file each line at a time, albeit, in parallel.
    .forEach(eachLine -> {
        final long rowCount = rowCounter.getAndIncrement();
        if (rowCount % 1_000_000 == 0) { //This will log the 0 value, so I know when it starts.
            System.out.println(rowCount);
        }
    })
    ;

And to be clear, I specifically designed that if statement so that the 0 value would be printed out. I tested it on a small file, and it did exactly that, printing out 0, 1000000, 2000000, etc.

And it worked just fine on both small and large files when running sequentially. And it worked just fine on a small file in parallel too.

Then I tried a larger file in parallel.

OutOfMemoryError

And it didn't even print out the 0. Which means, it didn't even process ANY of the elements AT ALL. It just fetched so much data and then died without hitting any of the pipeline stages.

At this point, I was furious and panicking, so I just turned my original stream sequential and upped my batch size to a much larger number (but still within our RAM requirements). This ended up speeding up performance pretty well for us because we made fewer (but larger) uploads. Which is not surprising -- each upload has to go through that whole connection process, and thus, we are paying a tax for each upload we do.

Still, this just barely met our performance needs, and my boss told me to ship it.

Weeks later, when things finally calmed down enough that I could breathe, I went onto the mailing list to figure out what on earth was happening with my stream.

Here is the start of the mailing list discussion.

https://mail.openjdk.org/pipermail/core-libs-dev/2024-November/134508.html

As it turns out, when a stream turns parallel, the intermediate and terminal operations you do on that stream will decide the fetching behaviour the stream uses on the source.

In our case, that meant that, if MY parallel stream used the forEach terminal operation, then the stream decides that the smartest thing to do to speed up performance is to fetch the entire dataset ahead of time and store it into an internal buffer in RAM before doing ANY PROCESSING WHATSOEVER. Resulting in an OutOfMemoryError.

And to be fair, that is not stupid at all. It makes good sense from a performance stand point. But it makes things risky from a memory standpoint.

Anyways, this is a very sharp and painful corner about parallel streams that i did not know about, so I wanted to bring it up here in case it would be useful for folks. I intend to also make a StackOverflow post to explain this in better detail.

Finally, as a silver-lining, Viktor Klang let me know that, a .gather() immediately followed by a .collect(), is immune to this pre-fetching behaviour mentioned above. Therefore, I could just create a custom Collector that does what I was doing in my forEach(). Doing it that way, I could run things in parallel safely without any fear of the dreaded OutOfMemoryError.

(and tbh, forEach() wasn't really the best idea for that operation). You can read more about it in the mailing list link above.

Please let me know if there are any questions, comments, or concerns.

EDIT -- Some minor clarifications. There are 2 issues interleaved here that makes it difficult to track the error.

  1. Gatherers don't (currently) play well with some of the other terminal operations when running in parallel.

  2. Iterators are parallel-unfriendly when operatiing as a stream source.

When I tried to boil things down to the simplistic scenario in my code above, I was no longer afflicted by problem 1, but was now afflicted by problem 2. My stream source was the source of the problem in that completely boiled down scenario.

Now that said, that only makes this problem less likely to occur than it appears. The simple reality is, it worked when running sequentially, but failed when running in parallel. And the only way I could find out that my stream source was "bad" was by diving into all sorts of libraries that create my stream. It wasn't until then that I realized the danger I was in.

Top answer
1 of 5
40
I did want to follow up about one point Viktor made later on in the conversation. https://mail.openjdk.org/pipermail/core-libs-dev/2024-November/134542.html And here is the quote. In a potential future where all intermediate operations are Gatherer-based, and all terminal operations are Collector-based, it would just work as expected. But with that said, I'm not sure it is practically achievable because some operations might not have the same performance-characteristics as before. Me personally, I would GLADLY accept a flag on stream (similar to parallel() or unordered()) that would allow me to guarantee that my stream never pre-fetches, even if I take a massive performance hit. If that can be accomplished by making all intermediate operations be implemented by a Gatherer under the hood, that is A-OK with me. The reality is, not all streams are compute bound. Some are IO bound, but are otherwise, a great fit for streams. Having a method that allows us to optimize for that fact is a new type of performance enhancement that I would greatly appreciate, even if it degrades performance in other ways.
2 of 5
11
This was a fascinanting read. Thank you for sharing. I guess it is kinda bad when higher level non-trivial apis, like streams or fork-join, do not expose lower level oprations as user-overridable constructs. Like in this example an iteration strategy for streams, or underlying executor of fork-join pool. Seems like an obvious thing to have because nobody knows better how thing will be used than end user..
🌐
Medium
medium.com › @jhumper68 › java-parallel-in-practice-887b907a42e9
Java parallel in practice. As the title suggests, this article… | by Jhumper | Medium
September 9, 2023 - Some of them are designed for a specific set of tasks, while others seem to be suitable for any. And how to evaluate the performance of the Reactor vs. ExecutorService, or using the Stream API. Suppose you are developing a simple service. You need to fetch data from a source, map it in parallel with another service, and put the result in a third one.
Find elsewhere
🌐
Coderanch
coderanch.com › t › 779357 › java › ParallelStream-jdbc-connection
ParallelStream and jdbc connection (Threads forum at Coderanch)
February 2, 2024 - I see more than one question here: 1. Should I use a Stream and call parallelStream for the purpose of creating several threads to run parallel database updates? 2. Should I try to do these database tasks in several threads? 3. Is running database tasks in several threads going to improve performance? Let me try to respond to those: 1. Personally I would use a more obvious way of creating several threads -- create an ExecutorService and have it create the threads and run the tasks for you.
Top answer
1 of 3
5

You can define a custom thread pool by implementing the (Executor) interface that increases or decreases the number of threads in the pool as needed. You can submit your parallelStream chain to it as shown here using a ForkJoinPool:

I've created a working example which prints the threads that are doing the work:

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

public class TestParallel
{
  public static void main(String... args) throws InterruptedException, ExecutionException
  {
    testParallel();
  }
  
  
  static Long sum(long a, long b)
  {
    System.out.println(Thread.currentThread() + " - sum: " + a + " " + b);
    return a + b;
  }
  
  public static void testParallel() 
      throws InterruptedException, ExecutionException {
        
        long firstNum = 1;
        long lastNum = 10;

        List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
          .collect(Collectors.toList());

        System.out.println("custom: ");
        System.out.println();
        
        ForkJoinPool customThreadPool = new ForkJoinPool(4);
        long totalCustom = customThreadPool.submit(
          () -> aList.parallelStream().reduce(0L, TestParallel::sum)).get();
        
        System.out.println();
        System.out.println("standard: ");
        System.out.println();
        
        long totalStandard = aList.parallelStream().reduce(0L, TestParallel::sum);
        
        System.out.println();
        System.out.println(totalCustom + " " + totalStandard);
    }
}

Personally, if you want to get to that level of control, I'm not sure the streaming API is worth bothering with. It's not doing anything you can't do with Executors and concurrent libs. It's just a simplified facade to those features with limited capabilities.

Streams are kind of nice when you need to lay out a simple multi-step process in a little bit of code. But if all you are doing is using them to manage parallelism of tasks, the Executors and ExecutorService are more straightforward IMO. One thing I would avoid is pushing the number of threads above your machine's native thread count unless you have IO-bound processing. And if that's the case NIO is the more efficient solution.

What I'm not sure about is what the logic is that decides when to use multiple threads and when to use one. You'd have to better explain what factors come into play.

2 of 3
2

I don't know if this is useful but there is a design pattern called Bridge that decouples the abstraction from its implementation so you can, at runtime change between implementations.

A simple example would be a stack. For stacks where the total amount of data stored at one time is relatively small, it is more efficient to use an array. When the amount of data hits a certain point, it becomes better to use a linked-list. The stack implementation determines when it switches from one to the other.

For your case, it sounds like the processing would be behind some interface and based on the volume (do you know it before you start the processing?) your Processor class could use streams or parallel streams as appropriate.

🌐
4Comprehension
4comprehension.com › home › parallel collection processing: without parallel streams (1/3)
Parallel Collection Processing: Without Parallel Streams (1/3) - { 4Comprehension }
February 24, 2020 - Before we dig into contemporary techniques, let’s quickly revisit how something like this could be achieved using legacy Java versions. Let’s start with a point of reference, which is how sequential processing would look like: List<Integer> results = new ArrayList<>(); for (Integer integer : integers) { results.add(Utils.process(integer)); } As mentioned above, to be able to process anything asynchronously, we need to have a thread/thread-pool where we could schedule our operations on: ExecutorService executor = Executors.newFixedThreadPool(10);
🌐
Reddit
reddit.com › r/java › do you wish you could use stream api with virtual threads?
r/java on Reddit: Do you wish you could use Stream API with Virtual Threads?
February 2, 2024 -

Now you can do that with 3.0.0 of https://github.com/pivovarit/parallel-collectors

Yes, toList() is the standard Collector

You can do it in the blocking way:

Process in parallel, block and stream results as they arrive

You can do the same while preserving the original order:

Process in parallel, block and stream results as they arrive but in the original order

You want to configure everything by yourself? I've got you covered.

Process in parallel on a custom Executor and with custom parallelism, expose results as CompletableFuture

Addressing this question: https://www.reddit.com/r/java/comments/11ykqm4/is_it_possible_to_use_virtual_threads_with/

by u/TheKingOfSentries

🌐
Medium
medium.com › @sridinu03 › concurrency-in-java-threads-executors-and-parallel-streams-a4269c662944
Concurrency in Java: Threads, Executors, and Parallel Streams | by Dinushan Sriskandaraja | Medium
March 21, 2025 - Here’s a quick recap: Threads allow manual creation of concurrent tasks. Executors help manage multiple threads efficiently. Parallel Streams provide a simple way to process large datasets in parallel.
Top answer
1 of 16
520

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

2 of 16
251

The parallel streams use the default ForkJoinPool.commonPool which by default has one less threads as you have processors, as returned by Runtime.getRuntime().availableProcessors() (This means that parallel streams leave one processor for the calling thread).

For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.

This also means if you have nested parallel streams or multiple parallel streams started concurrently, they will all share the same pool. Advantage: you will never use more than the default (number of available processors). Disadvantage: you may not get "all the processors" assigned to each parallel stream you initiate (if you happen to have more than one). (Apparently you can use a ManagedBlocker to circumvent that.)

To change the way parallel streams are executed, you can either

  • submit the parallel stream execution to your own ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); or
  • you can change the size of the common pool using system properties: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") for a target parallelism of 20 threads.

Example of the latter on my machine which has 8 processors. If I run the following program:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

The output is:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

So you can see that the parallel stream processes 8 items at a time, i.e. it uses 8 threads. However, if I uncomment the commented line, the output is:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

This time, the parallel stream has used 20 threads and all 20 elements in the stream have been processed concurrently.

🌐
DEV Community
dev.to › igalhaddad › java-8-parallel-stream-with-threadpool-32kd
Java 8 Parallel Stream with ThreadPool - DEV Community
February 10, 2020 - import com.google.common.base.Throwables; import com.google.common.util.concurrent.ExecutionError; import com.google.common.util.concurrent.UncheckedExecutionException; import com.google.common.util.concurrent.UncheckedTimeoutException; import java.time.Duration; import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; public class ThreadExecutor { public static <T, R> R execute(int parallelism, String forkJoinWorkerThreadName, T source, Function<T, R> parallelStream) { return execute(parallelism, forkJoinWorkerThreadName, source, 0, null, parallel
🌐
DZone
dzone.com › coding › java › think twice before using java 8 parallel streams
Think Twice Before Using Java 8 Parallel Streams
August 13, 2019 - What's worse, you can not specify thread pool for parallel streams; the whole class loader has to use the same one. ... private void run() throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); // Simulating multiple threads in the system // if one of them is executing a long-running task.
🌐
Baeldung
baeldung.com › home › java › java concurrency › custom thread pools in java parallel streams
Custom Thread Pools in Java Parallel Streams | Baeldung
December 28, 2023 - In this quick tutorial, we’ll look at one of the biggest limitations of Stream API and see how to make a parallel stream work with a custom ThreadPool instance, alternatively – there’s a library that handles this. Let’s start with a simple example – calling the parallelStream method on any of the Collection types – which will return a possibly parallel Stream:
🌐
Blogger
fahdshariff.blogspot.com › 2016 › 06 › java-8-completablefuture-vs-parallel.html
fahd.blog: Java 8: CompletableFuture vs Parallel Stream
June 26, 2016 - public static void useCompletableFutureWithExecutor(List<MyTask> tasks) { long start = System.nanoTime(); ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10)); List<CompletableFuture<Integer>> futures = tasks.stream() .map(t -> CompletableFuture.supplyAsync(() -> t.calculate(), executor)) .collect(Collectors.toList()); List<Integer> result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); long duration = (System.nanoTime() - start) / 1_000_000; System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration); System.out.println(result); executor.shutdown(); }
🌐
Stack Abuse
stackabuse.com › java-8-streams-guide-to-parallel-streaming-with-parallel
Java 8 Streams: Definitive Guide to Parallel Streaming with parallel()
October 24, 2023 - The class is a concrete implementation of ExecutorService. And, it extended the interface by adding the aspect of work stealing, thus, setting up parallelism for increased efficiency. With ForkJoinPool, idle tasks aim to relieve busy tasks of some of their load. Starting with Java 8, the aspect of streams ...