Because ArrayList is not a thread-safe collection. Using a thread-safe collection like CopyOnWriteArrayList would make it correct but not necessarily efficient.
Using a Collector instead would be much simpler and correct. e.g.
source.parallelStream().collect(Collectors.toList())
Answer from Sleiman Jneidi on Stack OverflowBecause ArrayList is not a thread-safe collection. Using a thread-safe collection like CopyOnWriteArrayList would make it correct but not necessarily efficient.
Using a Collector instead would be much simpler and correct. e.g.
source.parallelStream().collect(Collectors.toList())
The forEach operation of the parallel stream is adding elements to an un-synchronized Collection (an ArrayList) from multiple threads. Therefore, the operation is not thread safe, and has unexpected results.
Using forEachOrdered() instead of forEach() will ensure all the elements of the source List are added to the destination List.
However, as mentioned in the other answer, using collect(Collectors.toList()) is the correct way to produce an output List from a Stream.
java - Does Stream.forEach() always work in parallel? - Stack Overflow
A surprising pain point regarding Parallel Java Streams (featuring mailing list discussion with Viktor Klang).
java - Should I always use a parallel stream when possible? - Stack Overflow
java 8 parallelStream().forEach Result data loss - Stack Overflow
Videos
No, forEach() doesn't parallelize if the stream isn't parallel. I think he simplified the example for the sake of discussion.
As evidence, this code is inside the AbstractPipeline class's evaluate method (which is called from forEach)
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
The whole quote goes as follows:
Just as reduction can parallelize safely provided the combining function is associative and free of interfering side effects, mutable reduction with
Stream.collect()can parallelize safely if it meets certain simple consistency requirements (outlined in the specification forcollect()).
And then what you've quoted:
The key difference is that, with the
forEach()version, multiple threads are trying to access a single result container simultaneously, whereas with parallelcollect(), each thread has its own local result container, the results of which are merged afterward.
Since the first sentence clearly speaks of parallelization, my understanding is that both forEach() and collect() are spoken of in the context of parallel streams.
First off, apologies for being AWOL. Been (and still am) juggling a lot of emergencies, both work and personal.
My team was in crunch time to respond to a pretty ridiculous client ask. In order to get things in in time, we had to ignore performance, and kind of just took the "shoot first, look later" approach. We got surprisingly lucky, except in one instance where we were using Java Streams.
It was a seemingly simple task -- download a file, split into several files based on an attribute, and then upload those split files to a new location.
But there is one catch -- both the input and output files were larger than the amount of RAM and hard disk available on the machine. Or at least, I was told to operate on that assumption when developing a solution.
No problem, I thought. We can just grab the file in batches and write out the batches.
This worked out great, but the performance was not good enough for what we were doing. In my overworked and rushed mind, I thought it would be a good idea to just turn on parallelism for that stream. That way, we could run N times faster, according to the number of cores on that machine, right?
Before I go any further, this is (more or less) what the stream looked like.
try (final Stream<String> myStream = SomeClass.openStream(someLocation)) {
myStream
.parallel()
//insert some intermediate operations here
.gather(Gatherers.windowFixed(SOME_BATCH_SIZE))
//insert some more intermediate operations here
.forEach(SomeClass::upload)
;
}So, running this sequentially, it worked just fine on both smaller and larger files, albeit, slower than we needed.
So I turned on parallelism, ran it on a smaller file, and the performance was excellent. Exactly what we wanted.
So then I tried running a larger file in parallel.
OutOfMemoryError
I thought, ok, maybe the batch size is too large. Dropped it down to 100k lines (which is tiny in our case).
OutOfMemoryError
Getting frustrated, I dropped my batch size down to 1 single, solitary line.
OutOfMemoryError
Losing my mind, I boiled down my stream to the absolute minimum possible functionality possible to eliminate any chance of outside interference. I ended up with the following stream.
final AtomicLong rowCounter = new AtomicLong();
myStream
.parallel()
//no need to batch because I am literally processing this file each line at a time, albeit, in parallel.
.forEach(eachLine -> {
final long rowCount = rowCounter.getAndIncrement();
if (rowCount % 1_000_000 == 0) { //This will log the 0 value, so I know when it starts.
System.out.println(rowCount);
}
})
;And to be clear, I specifically designed that if statement so that the 0 value would be printed out. I tested it on a small file, and it did exactly that, printing out 0, 1000000, 2000000, etc.
And it worked just fine on both small and large files when running sequentially. And it worked just fine on a small file in parallel too.
Then I tried a larger file in parallel.
OutOfMemoryError
And it didn't even print out the 0. Which means, it didn't even process ANY of the elements AT ALL. It just fetched so much data and then died without hitting any of the pipeline stages.
At this point, I was furious and panicking, so I just turned my original stream sequential and upped my batch size to a much larger number (but still within our RAM requirements). This ended up speeding up performance pretty well for us because we made fewer (but larger) uploads. Which is not surprising -- each upload has to go through that whole connection process, and thus, we are paying a tax for each upload we do.
Still, this just barely met our performance needs, and my boss told me to ship it.
Weeks later, when things finally calmed down enough that I could breathe, I went onto the mailing list to figure out what on earth was happening with my stream.
Here is the start of the mailing list discussion.
https://mail.openjdk.org/pipermail/core-libs-dev/2024-November/134508.html
As it turns out, when a stream turns parallel, the intermediate and terminal operations you do on that stream will decide the fetching behaviour the stream uses on the source.
In our case, that meant that, if MY parallel stream used the forEach terminal operation, then the stream decides that the smartest thing to do to speed up performance is to fetch the entire dataset ahead of time and store it into an internal buffer in RAM before doing ANY PROCESSING WHATSOEVER. Resulting in an OutOfMemoryError.
And to be fair, that is not stupid at all. It makes good sense from a performance stand point. But it makes things risky from a memory standpoint.
Anyways, this is a very sharp and painful corner about parallel streams that i did not know about, so I wanted to bring it up here in case it would be useful for folks. I intend to also make a StackOverflow post to explain this in better detail.
Finally, as a silver-lining, Viktor Klang let me know that, a .gather() immediately followed by a .collect(), is immune to this pre-fetching behaviour mentioned above. Therefore, I could just create a custom Collector that does what I was doing in my forEach(). Doing it that way, I could run things in parallel safely without any fear of the dreaded OutOfMemoryError.
(and tbh, forEach() wasn't really the best idea for that operation). You can read more about it in the mailing list link above.
Please let me know if there are any questions, comments, or concerns.
EDIT -- Some minor clarifications. There are 2 issues interleaved here that makes it difficult to track the error.
-
Gatherers don't (currently) play well with some of the other terminal operations when running in parallel.
-
Iterators are parallel-unfriendly when operatiing as a stream source.
When I tried to boil things down to the simplistic scenario in my code above, I was no longer afflicted by problem 1, but was now afflicted by problem 2. My stream source was the source of the problem in that completely boiled down scenario.
Now that said, that only makes this problem less likely to occur than it appears. The simple reality is, it worked when running sequentially, but failed when running in parallel. And the only way I could find out that my stream source was "bad" was by diving into all sorts of libraries that create my stream. It wasn't until then that I realized the danger I was in.
A parallel stream has a much higher overhead compared to a sequential one. Coordinating the threads takes a significant amount of time. I would use sequential streams by default and only consider parallel ones if
I have a massive amount of items to process (or the processing of each item takes time and is parallelizable)
I have a performance problem in the first place
I don't already run the process in a multithread environment (for example: in a web container, if I already have many requests to process in parallel, adding an additional layer of parallelism inside each request could have more negative than positive effects)
In your example, the performance will anyway be driven by the synchronized access to System.out.println(), and making this process parallel will not have any effect, or even a negative one.
Moreover, remember that parallel streams don't magically solve all the synchronization problems. If a shared resource is used by the predicates and functions used in the process, you'll have to make sure that everything is threadsafe. In particular, side effects are things you really have to worry about if you go parallel.
In any case, measure, don't guess! Only a measurement will tell you if the parallelism is worth it or not.
The Stream API was designed to make it easy to write computations in a way that was abstracted away from how they would be executed, making switching between sequential and parallel easy.
However, just because it’s easy, doesn't mean it’s always a good idea, and in fact, it is a bad idea to just drop .parallel() all over the place simply because you can.
First, note that parallelism offers no benefits other than the possibility of faster execution when more cores are available. A parallel execution will always involve more work than a sequential one, because in addition to solving the problem, it also has to perform dispatching and coordinating of sub-tasks. The hope is that you'll be able to get to the answer faster by breaking up the work across multiple processors; whether this actually happens depends on a lot of things, including the size of your data set, how much computation you are doing on each element, the nature of the computation (specifically, does the processing of one element interact with processing of others?), the number of processors available, and the number of other tasks competing for those processors.
Further, note that parallelism also often exposes nondeterminism in the computation that is often hidden by sequential implementations; sometimes this doesn't matter, or can be mitigated by constraining the operations involved (i.e., reduction operators must be stateless and associative.)
In reality, sometimes parallelism will speed up your computation, sometimes it will not, and sometimes it will even slow it down. It is best to develop first using sequential execution and then apply parallelism where
(A) you know that there's actually benefit to increased performance and
(B) that it will actually deliver increased performance.
(A) is a business problem, not a technical one. If you are a performance expert, you'll usually be able to look at the code and determine (B), but the smart path is to measure. (And, don't even bother until you're convinced of (A); if the code is fast enough, better to apply your brain cycles elsewhere.)
The simplest performance model for parallelism is the "NQ" model, where N is the number of elements, and Q is the computation per element. In general, you need the product NQ to exceed some threshold before you start getting a performance benefit. For a low-Q problem like "add up numbers from 1 to N", you will generally see a breakeven between N=1000 and N=10000. With higher-Q problems, you'll see breakevens at lower thresholds.
But the reality is quite complicated. So until you achieve experthood, first identify when sequential processing is actually costing you something, and then measure if parallelism will help.
ArrayList isn't thread safe. You need to do
List<String> strings = Collections.synchronizedList(new ArrayList<>());
or
List<String> strings = new Vector<>();
to ensure all updates are synchronized, or switch to
List<String> strings = src.parallelStream()
.filter(integer -> (integer % 2) == 0)
.map(integer -> integer + "")
.collect(Collectors.toList());
and leave the list building to the Streams framework. Note that it's undefined whether the list returned by collect is modifiable, so if that is a requirement, you may need to modify your approach.
In terms of performance, Stream.collect is likely to be much faster than using Stream.forEach to add to a synchronized collection, since the Streams framework can handle collection of values in each thread separately without synchronization and combine the results at the end in a thread safe fashion.
ArrayList isn't thread-safe. While 1 thread sees a list with 30 elements another might still see 29 and override the 30th position (loosing 1 element).
Another issue might arise when the array backing the list needs to be resized. A new array (with double the size) is created and elements from the original array are copied into it. While other threads might have added stuff the thread doing the resizing might not have seen this or multiple threads are resizing and eventually only 1 will win.
When using multiple threads you need to either do some syncronized when accessing the list OR use a multi-thread safe list (by either wrapping it in a SynchronizedList or by using a CopyOnWriteArrayList to mention 2 possible solutions). Even better would be to use the collect method on the stream to put everything into a list.
For example: let's say you have a Set or Map or something where you are using a parallel stream to add to that resource from a different collection. Basically, adding a bunch of elements to a set at once.
Theoretically, there is no issues with collisions because there's no race condition if you are just adding to a set, it doesn't matter which order the threads add to the set, but is there any mutex type blocking happening such that the parallel stream would effectively be no better than a sequential stream, since each stream would block the resource until its done adding anyway?