I looked into the Stream source code. The result of a map operation is just fed into the next operation. So there is almost no difference between one big map() call or several small map() calls.
And for the map() operation a parallel Stream makes no difference at all. Meaning each input object will be processed until the end by the same Thread in any case.
Also note: A parallel Stream only splits up the work if the operation chain allows it and there is enough data to process. So for a small Collection or a Collection that allows no random access, a parallel Stream behaves like a sequential Stream.
multithreading - Java ParallelStream: several map or single map - Stack Overflow
A surprising pain point regarding Parallel Java Streams (featuring mailing list discussion with Viktor Klang).
multithreading - In what application niche is parallelStream from Java useful? - Stack Overflow
The Java Stream Parallel
Videos
I looked into the Stream source code. The result of a map operation is just fed into the next operation. So there is almost no difference between one big map() call or several small map() calls.
And for the map() operation a parallel Stream makes no difference at all. Meaning each input object will be processed until the end by the same Thread in any case.
Also note: A parallel Stream only splits up the work if the operation chain allows it and there is enough data to process. So for a small Collection or a Collection that allows no random access, a parallel Stream behaves like a sequential Stream.
I don't think it will do any better if you chain it with multiple maps. In case your code is not very complex I would prefer to use a single big map.
To understand this we have to check the code inside the map function. link
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
As you can see a lot many things happen behind the scenes. Multiple objects are created and multiple methods are called. Hence, for each chained map function call all these are repeated.
Now coming back to ParallelStreams, they work on the concept of Parallelism .
Streams Documentation
A parallel stream is a stream that splits its elements into multiple chunks, processing each chunk with a different thread. Thus, you can automatically partition the workload of a given operation on all the cores of your multicore processor and keep all of them equally busy.
Parallel streams internally use the default ForkJoinPool, which by default has as many threads as you have processors, as returned by Runtime.getRuntime().availableProcessors(). But you can change the size of this pool using the system property java.util.concurrent.ForkJoinPool.common.parallelism.
ParallelStream calls spliterator() on the collection object which returns a Spliterator implementation that provides the logic of splitting a task. Every source or collection has their own spliterator implementations. Using these spliterators, parallel stream splits the task as long as possible and finally when the task becomes too small it executes it sequentially and merges partial results from all the sub tasks.
So I would prefer parallelStream when
- I have huge amount of data to process at a time
- I have multiple cores to process the data
- Performance issues with the existing implementation
- I already don't have multiple threaded process running, as it will add to the complexity.
Performance Implications
- Overhead : Sometimes when dataset is small converting a sequential stream into a parallel one results in worse performance. The overhead of managing
threads, sources and results is a more expensive operation than doing the actual work. - Splitting:
Arrayscan split cheaply and evenly, whileLinkedListhas none of these properties.TreeMapandHashSetsplit better thanLinkedListbut not as well as arrays. - Merging:The merge operation is really cheap for some operations, such as reduction and addition, but merge operations like grouping to sets or maps can be quite expensive.
Conclusion: A large amount of data and many computations done per element indicate that parallelism could be a good option.
First off, apologies for being AWOL. Been (and still am) juggling a lot of emergencies, both work and personal.
My team was in crunch time to respond to a pretty ridiculous client ask. In order to get things in in time, we had to ignore performance, and kind of just took the "shoot first, look later" approach. We got surprisingly lucky, except in one instance where we were using Java Streams.
It was a seemingly simple task -- download a file, split into several files based on an attribute, and then upload those split files to a new location.
But there is one catch -- both the input and output files were larger than the amount of RAM and hard disk available on the machine. Or at least, I was told to operate on that assumption when developing a solution.
No problem, I thought. We can just grab the file in batches and write out the batches.
This worked out great, but the performance was not good enough for what we were doing. In my overworked and rushed mind, I thought it would be a good idea to just turn on parallelism for that stream. That way, we could run N times faster, according to the number of cores on that machine, right?
Before I go any further, this is (more or less) what the stream looked like.
try (final Stream<String> myStream = SomeClass.openStream(someLocation)) {
myStream
.parallel()
//insert some intermediate operations here
.gather(Gatherers.windowFixed(SOME_BATCH_SIZE))
//insert some more intermediate operations here
.forEach(SomeClass::upload)
;
}So, running this sequentially, it worked just fine on both smaller and larger files, albeit, slower than we needed.
So I turned on parallelism, ran it on a smaller file, and the performance was excellent. Exactly what we wanted.
So then I tried running a larger file in parallel.
OutOfMemoryError
I thought, ok, maybe the batch size is too large. Dropped it down to 100k lines (which is tiny in our case).
OutOfMemoryError
Getting frustrated, I dropped my batch size down to 1 single, solitary line.
OutOfMemoryError
Losing my mind, I boiled down my stream to the absolute minimum possible functionality possible to eliminate any chance of outside interference. I ended up with the following stream.
final AtomicLong rowCounter = new AtomicLong();
myStream
.parallel()
//no need to batch because I am literally processing this file each line at a time, albeit, in parallel.
.forEach(eachLine -> {
final long rowCount = rowCounter.getAndIncrement();
if (rowCount % 1_000_000 == 0) { //This will log the 0 value, so I know when it starts.
System.out.println(rowCount);
}
})
;And to be clear, I specifically designed that if statement so that the 0 value would be printed out. I tested it on a small file, and it did exactly that, printing out 0, 1000000, 2000000, etc.
And it worked just fine on both small and large files when running sequentially. And it worked just fine on a small file in parallel too.
Then I tried a larger file in parallel.
OutOfMemoryError
And it didn't even print out the 0. Which means, it didn't even process ANY of the elements AT ALL. It just fetched so much data and then died without hitting any of the pipeline stages.
At this point, I was furious and panicking, so I just turned my original stream sequential and upped my batch size to a much larger number (but still within our RAM requirements). This ended up speeding up performance pretty well for us because we made fewer (but larger) uploads. Which is not surprising -- each upload has to go through that whole connection process, and thus, we are paying a tax for each upload we do.
Still, this just barely met our performance needs, and my boss told me to ship it.
Weeks later, when things finally calmed down enough that I could breathe, I went onto the mailing list to figure out what on earth was happening with my stream.
Here is the start of the mailing list discussion.
https://mail.openjdk.org/pipermail/core-libs-dev/2024-November/134508.html
As it turns out, when a stream turns parallel, the intermediate and terminal operations you do on that stream will decide the fetching behaviour the stream uses on the source.
In our case, that meant that, if MY parallel stream used the forEach terminal operation, then the stream decides that the smartest thing to do to speed up performance is to fetch the entire dataset ahead of time and store it into an internal buffer in RAM before doing ANY PROCESSING WHATSOEVER. Resulting in an OutOfMemoryError.
And to be fair, that is not stupid at all. It makes good sense from a performance stand point. But it makes things risky from a memory standpoint.
Anyways, this is a very sharp and painful corner about parallel streams that i did not know about, so I wanted to bring it up here in case it would be useful for folks. I intend to also make a StackOverflow post to explain this in better detail.
Finally, as a silver-lining, Viktor Klang let me know that, a .gather() immediately followed by a .collect(), is immune to this pre-fetching behaviour mentioned above. Therefore, I could just create a custom Collector that does what I was doing in my forEach(). Doing it that way, I could run things in parallel safely without any fear of the dreaded OutOfMemoryError.
(and tbh, forEach() wasn't really the best idea for that operation). You can read more about it in the mailing list link above.
Please let me know if there are any questions, comments, or concerns.
EDIT -- Some minor clarifications. There are 2 issues interleaved here that makes it difficult to track the error.
-
Gatherers don't (currently) play well with some of the other terminal operations when running in parallel.
-
Iterators are parallel-unfriendly when operatiing as a stream source.
When I tried to boil things down to the simplistic scenario in my code above, I was no longer afflicted by problem 1, but was now afflicted by problem 2. My stream source was the source of the problem in that completely boiled down scenario.
Now that said, that only makes this problem less likely to occur than it appears. The simple reality is, it worked when running sequentially, but failed when running in parallel. And the only way I could find out that my stream source was "bad" was by diving into all sorts of libraries that create my stream. It wasn't until then that I realized the danger I was in.
A similar question is asked in Should I always use a parallel stream when possible? Note the second answer is given by Brian Goetz, a Java language architect at Oracle who was involved in the design of the Stream API, so his answer may be considered authoritative.
Top answers are quick to point out that parallel streams include additional overhead necessary for coordination and thus will only increase performance in scenarios where the amount of individual processing per stream is significant enough that the gain from parallel processing overcomes that initial overhead.
Unsurprisingly, as with any question of performance, the advice is to measure rather than guess. Start with a sequential stream, and if you have a large number of elements each requiring complex computation, measure the performance difference of switching to parallel streams.
Additional guidelines, such as those listed in the OP, may be helpful; but people are notoriously bad at identifying performance bottlenecks, so any guidelines are likely to fail eventually in the face of actual measurements.
This looks like a nice explanation of cases of where and why. https://computing.llnl.gov/tutorials/parallel_comp/#WhyUse I personally see no interesting cases in user centered web applications.
The fork/join Framework is a really cool low level api. Many other higher level frameworks use it under the hood very successfuly. I've used it for test data generation. Cache bootstraping. Data processing etc... In many cases you get a really good boost of performance in others its just unnecessary overhead.
https://daniel.avery.io/writing/the-java-streams-parallel
I made this "expert-friendly" doc, to orient all who find themselves probing the Java Streams source code in despair. It culminates in the "Stream planner" - a little tool I made to simulate how (parallel) stream operations affect memory usage and execution paths.
Go forth, use (parallel) streams with confidence, and don't run out of memory.
You can define a custom thread pool by implementing the (Executor) interface that increases or decreases the number of threads in the pool as needed. You can submit your parallelStream chain to it as shown here using a ForkJoinPool:
I've created a working example which prints the threads that are doing the work:
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
public class TestParallel
{
public static void main(String... args) throws InterruptedException, ExecutionException
{
testParallel();
}
static Long sum(long a, long b)
{
System.out.println(Thread.currentThread() + " - sum: " + a + " " + b);
return a + b;
}
public static void testParallel()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 10;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
System.out.println("custom: ");
System.out.println();
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long totalCustom = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, TestParallel::sum)).get();
System.out.println();
System.out.println("standard: ");
System.out.println();
long totalStandard = aList.parallelStream().reduce(0L, TestParallel::sum);
System.out.println();
System.out.println(totalCustom + " " + totalStandard);
}
}
Personally, if you want to get to that level of control, I'm not sure the streaming API is worth bothering with. It's not doing anything you can't do with Executors and concurrent libs. It's just a simplified facade to those features with limited capabilities.
Streams are kind of nice when you need to lay out a simple multi-step process in a little bit of code. But if all you are doing is using them to manage parallelism of tasks, the Executors and ExecutorService are more straightforward IMO. One thing I would avoid is pushing the number of threads above your machine's native thread count unless you have IO-bound processing. And if that's the case NIO is the more efficient solution.
What I'm not sure about is what the logic is that decides when to use multiple threads and when to use one. You'd have to better explain what factors come into play.
I don't know if this is useful but there is a design pattern called Bridge that decouples the abstraction from its implementation so you can, at runtime change between implementations.
A simple example would be a stack. For stacks where the total amount of data stored at one time is relatively small, it is more efficient to use an array. When the amount of data hits a certain point, it becomes better to use a linked-list. The stack implementation determines when it switches from one to the other.
For your case, it sounds like the processing would be behind some interface and based on the volume (do you know it before you start the processing?) your Processor class could use streams or parallel streams as appropriate.