There are several issues going on here in parallel, as it were.
The first is that solving a problem in parallel always involves performing more actual work than doing it sequentially. Overhead is involved in splitting the work among several threads and joining or merging the results. Problems like converting short strings to lower-case are small enough that they are in danger of being swamped by the parallel splitting overhead.
The second issue is that benchmarking Java program is very subtle, and it is very easy to get confusing results. Two common issues are JIT compilation and dead code elimination. Short benchmarks often finish before or during JIT compilation, so they're not measuring peak throughput, and indeed they might be measuring the JIT itself. When compilation occurs is somewhat non-deterministic, so it may cause results to vary wildly as well.
For small, synthetic benchmarks, the workload often computes results that are thrown away. JIT compilers are quite good at detecting this and eliminating code that doesn't produce results that are used anywhere. This probably isn't happening in this case, but if you tinker around with other synthetic workloads, it can certainly happen. Of course, if the JIT eliminates the benchmark workload, it renders the benchmark useless.
I strongly recommend using a well-developed benchmarking framework such as JMH instead of hand-rolling one of your own. JMH has facilities to help avoid common benchmarking pitfalls, including these, and it's pretty easy to set up and run. Here's your benchmark converted to use JMH:
package com.stackoverflow.questions;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
public class SO23170832 {
@State(Scope.Benchmark)
public static class BenchmarkState {
static String[] array;
static {
array = new String[1000000];
Arrays.fill(array, "AbabagalamagA");
}
}
@GenerateMicroBenchmark
@OutputTimeUnit(TimeUnit.SECONDS)
public List<String> sequential(BenchmarkState state) {
return
Arrays.stream(state.array)
.map(x -> x.toLowerCase())
.collect(Collectors.toList());
}
@GenerateMicroBenchmark
@OutputTimeUnit(TimeUnit.SECONDS)
public List<String> parallel(BenchmarkState state) {
return
Arrays.stream(state.array)
.parallel()
.map(x -> x.toLowerCase())
.collect(Collectors.toList());
}
}
I ran this using the command:
java -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1
(The options indicate five warmup iterations, five benchmark iterations, and one forked JVM.) During its run, JMH emits lots of verbose messages, which I've elided. The summary results are as follows.
Benchmark Mode Samples Mean Mean error Units
c.s.q.SO23170832.parallel thrpt 5 4.600 5.995 ops/s
c.s.q.SO23170832.sequential thrpt 5 1.500 1.727 ops/s
Note that results are in ops per second, so it looks like the parallel run was about three times faster than the sequential run. But my machine has only two cores. Hmmm. And the mean error per run is actually larger than the mean runtime! WAT? Something fishy is going on here.
This brings us to a third issue. Looking more closely at the workload, we can see that it allocates a new String object for each input, and it also collects the results into a list, which involves lots of reallocation and copying. I'd guess that this will result in a fair amount of garbage collection. We can see this by rerunning the benchmark with GC messages enabled:
java -verbose:gc -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1
This gives results like:
[GC (Allocation Failure) 512K->432K(130560K), 0.0024130 secs]
[GC (Allocation Failure) 944K->520K(131072K), 0.0015740 secs]
[GC (Allocation Failure) 1544K->777K(131072K), 0.0032490 secs]
[GC (Allocation Failure) 1801K->1027K(132096K), 0.0023940 secs]
# Run progress: 0.00% complete, ETA 00:00:20
# VM invoker: /Users/src/jdk/jdk8-b132.jdk/Contents/Home/jre/bin/java
# VM options: -verbose:gc
# Fork: 1 of 1
[GC (Allocation Failure) 512K->424K(130560K), 0.0015460 secs]
[GC (Allocation Failure) 933K->552K(131072K), 0.0014050 secs]
[GC (Allocation Failure) 1576K->850K(131072K), 0.0023050 secs]
[GC (Allocation Failure) 3075K->1561K(132096K), 0.0045140 secs]
[GC (Allocation Failure) 1874K->1059K(132096K), 0.0062330 secs]
# Warmup: 5 iterations, 1 s each
# Measurement: 5 iterations, 1 s each
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: com.stackoverflow.questions.SO23170832.parallel
# Warmup Iteration 1: [GC (Allocation Failure) 7014K->5445K(132096K), 0.0184680 secs]
[GC (Allocation Failure) 7493K->6346K(135168K), 0.0068380 secs]
[GC (Allocation Failure) 10442K->8663K(135168K), 0.0155600 secs]
[GC (Allocation Failure) 12759K->11051K(139776K), 0.0148190 secs]
[GC (Allocation Failure) 18219K->15067K(140800K), 0.0241780 secs]
[GC (Allocation Failure) 22167K->19214K(145920K), 0.0208510 secs]
[GC (Allocation Failure) 29454K->25065K(147456K), 0.0333080 secs]
[GC (Allocation Failure) 35305K->30729K(153600K), 0.0376610 secs]
[GC (Allocation Failure) 46089K->39406K(154624K), 0.0406060 secs]
[GC (Allocation Failure) 54766K->48299K(164352K), 0.0550140 secs]
[GC (Allocation Failure) 71851K->62725K(165376K), 0.0612780 secs]
[GC (Allocation Failure) 86277K->74864K(184320K), 0.0649210 secs]
[GC (Allocation Failure) 111216K->94203K(185856K), 0.0875710 secs]
[GC (Allocation Failure) 130555K->114932K(199680K), 0.1030540 secs]
[GC (Allocation Failure) 162548K->141952K(203264K), 0.1315720 secs]
[Full GC (Ergonomics) 141952K->59696K(159232K), 0.5150890 secs]
[GC (Allocation Failure) 105613K->85547K(184832K), 0.0738530 secs]
1.183 ops/s
Note: the lines beginning with # are normal JMH output lines. All the rest are GC messages. This is just the first of the five warmup iterations, which precedes five benchmark iterations. The GC messages continued in the same vein during the rest of the iterations. I think it's safe to say that the measured performance is dominated by GC overhead and that the results reported should not be believed.
At this point it's unclear what to do. This is purely a synthetic workload. It clearly involves very little CPU time doing actual work compared to allocation and copying. It's hard to say what you really are trying to measure here. One approach would be to come up with a different workload that is in some sense more "real." Another approach would be to change the heap and GC parameters to avoid GC during the benchmark run.
Answer from Stuart Marks on Stack OverflowThere are several issues going on here in parallel, as it were.
The first is that solving a problem in parallel always involves performing more actual work than doing it sequentially. Overhead is involved in splitting the work among several threads and joining or merging the results. Problems like converting short strings to lower-case are small enough that they are in danger of being swamped by the parallel splitting overhead.
The second issue is that benchmarking Java program is very subtle, and it is very easy to get confusing results. Two common issues are JIT compilation and dead code elimination. Short benchmarks often finish before or during JIT compilation, so they're not measuring peak throughput, and indeed they might be measuring the JIT itself. When compilation occurs is somewhat non-deterministic, so it may cause results to vary wildly as well.
For small, synthetic benchmarks, the workload often computes results that are thrown away. JIT compilers are quite good at detecting this and eliminating code that doesn't produce results that are used anywhere. This probably isn't happening in this case, but if you tinker around with other synthetic workloads, it can certainly happen. Of course, if the JIT eliminates the benchmark workload, it renders the benchmark useless.
I strongly recommend using a well-developed benchmarking framework such as JMH instead of hand-rolling one of your own. JMH has facilities to help avoid common benchmarking pitfalls, including these, and it's pretty easy to set up and run. Here's your benchmark converted to use JMH:
package com.stackoverflow.questions;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
public class SO23170832 {
@State(Scope.Benchmark)
public static class BenchmarkState {
static String[] array;
static {
array = new String[1000000];
Arrays.fill(array, "AbabagalamagA");
}
}
@GenerateMicroBenchmark
@OutputTimeUnit(TimeUnit.SECONDS)
public List<String> sequential(BenchmarkState state) {
return
Arrays.stream(state.array)
.map(x -> x.toLowerCase())
.collect(Collectors.toList());
}
@GenerateMicroBenchmark
@OutputTimeUnit(TimeUnit.SECONDS)
public List<String> parallel(BenchmarkState state) {
return
Arrays.stream(state.array)
.parallel()
.map(x -> x.toLowerCase())
.collect(Collectors.toList());
}
}
I ran this using the command:
java -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1
(The options indicate five warmup iterations, five benchmark iterations, and one forked JVM.) During its run, JMH emits lots of verbose messages, which I've elided. The summary results are as follows.
Benchmark Mode Samples Mean Mean error Units
c.s.q.SO23170832.parallel thrpt 5 4.600 5.995 ops/s
c.s.q.SO23170832.sequential thrpt 5 1.500 1.727 ops/s
Note that results are in ops per second, so it looks like the parallel run was about three times faster than the sequential run. But my machine has only two cores. Hmmm. And the mean error per run is actually larger than the mean runtime! WAT? Something fishy is going on here.
This brings us to a third issue. Looking more closely at the workload, we can see that it allocates a new String object for each input, and it also collects the results into a list, which involves lots of reallocation and copying. I'd guess that this will result in a fair amount of garbage collection. We can see this by rerunning the benchmark with GC messages enabled:
java -verbose:gc -jar dist/microbenchmarks.jar ".*SO23170832.*" -wi 5 -i 5 -f 1
This gives results like:
[GC (Allocation Failure) 512K->432K(130560K), 0.0024130 secs]
[GC (Allocation Failure) 944K->520K(131072K), 0.0015740 secs]
[GC (Allocation Failure) 1544K->777K(131072K), 0.0032490 secs]
[GC (Allocation Failure) 1801K->1027K(132096K), 0.0023940 secs]
# Run progress: 0.00% complete, ETA 00:00:20
# VM invoker: /Users/src/jdk/jdk8-b132.jdk/Contents/Home/jre/bin/java
# VM options: -verbose:gc
# Fork: 1 of 1
[GC (Allocation Failure) 512K->424K(130560K), 0.0015460 secs]
[GC (Allocation Failure) 933K->552K(131072K), 0.0014050 secs]
[GC (Allocation Failure) 1576K->850K(131072K), 0.0023050 secs]
[GC (Allocation Failure) 3075K->1561K(132096K), 0.0045140 secs]
[GC (Allocation Failure) 1874K->1059K(132096K), 0.0062330 secs]
# Warmup: 5 iterations, 1 s each
# Measurement: 5 iterations, 1 s each
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: com.stackoverflow.questions.SO23170832.parallel
# Warmup Iteration 1: [GC (Allocation Failure) 7014K->5445K(132096K), 0.0184680 secs]
[GC (Allocation Failure) 7493K->6346K(135168K), 0.0068380 secs]
[GC (Allocation Failure) 10442K->8663K(135168K), 0.0155600 secs]
[GC (Allocation Failure) 12759K->11051K(139776K), 0.0148190 secs]
[GC (Allocation Failure) 18219K->15067K(140800K), 0.0241780 secs]
[GC (Allocation Failure) 22167K->19214K(145920K), 0.0208510 secs]
[GC (Allocation Failure) 29454K->25065K(147456K), 0.0333080 secs]
[GC (Allocation Failure) 35305K->30729K(153600K), 0.0376610 secs]
[GC (Allocation Failure) 46089K->39406K(154624K), 0.0406060 secs]
[GC (Allocation Failure) 54766K->48299K(164352K), 0.0550140 secs]
[GC (Allocation Failure) 71851K->62725K(165376K), 0.0612780 secs]
[GC (Allocation Failure) 86277K->74864K(184320K), 0.0649210 secs]
[GC (Allocation Failure) 111216K->94203K(185856K), 0.0875710 secs]
[GC (Allocation Failure) 130555K->114932K(199680K), 0.1030540 secs]
[GC (Allocation Failure) 162548K->141952K(203264K), 0.1315720 secs]
[Full GC (Ergonomics) 141952K->59696K(159232K), 0.5150890 secs]
[GC (Allocation Failure) 105613K->85547K(184832K), 0.0738530 secs]
1.183 ops/s
Note: the lines beginning with # are normal JMH output lines. All the rest are GC messages. This is just the first of the five warmup iterations, which precedes five benchmark iterations. The GC messages continued in the same vein during the rest of the iterations. I think it's safe to say that the measured performance is dominated by GC overhead and that the results reported should not be believed.
At this point it's unclear what to do. This is purely a synthetic workload. It clearly involves very little CPU time doing actual work compared to allocation and copying. It's hard to say what you really are trying to measure here. One approach would be to come up with a different workload that is in some sense more "real." Another approach would be to change the heap and GC parameters to avoid GC during the benchmark run.
When doing benchmarks, you should pay attention to the JIT compilation, and that timing behaviors can change, based on the amount of JIT compiled code paths. If I add a warm-up phase to your test program, the parallel version is bit a faster than the sequential version. Here are the results:
Warmup...
Benchmark...
Run 0: sequential 0.12s - parallel 0.11s
Run 1: sequential 0.13s - parallel 0.08s
Run 2: sequential 0.15s - parallel 0.08s
Run 3: sequential 0.12s - parallel 0.11s
Run 4: sequential 0.13s - parallel 0.08s
The following code fragment contains the complete source code that I have used for this test.
public static void main(String... args) {
String[] array = new String[1000000];
Arrays.fill(array, "AbabagalamagA");
System.out.println("Warmup...");
for (int i = 0; i < 100; ++i) {
sequential(array);
parallel(array);
}
System.out.println("Benchmark...");
for (int i = 0; i < 5; ++i) {
System.out.printf("Run %d: sequential %s - parallel %s\n",
i,
test(() -> sequential(array)),
test(() -> parallel(array)));
}
}
private static void sequential(String[] array) {
Arrays.stream(array).map(String::toLowerCase).collect(Collectors.toList());
}
private static void parallel(String[] array) {
Arrays.stream(array).parallel().map(String::toLowerCase).collect(Collectors.toList());
}
private static String test(Runnable runnable) {
long start = System.currentTimeMillis();
runnable.run();
long elapsed = System.currentTimeMillis() - start;
return String.format("%4.2fs", elapsed / 1000.0);
}
A surprising pain point regarding Parallel Java Streams (featuring mailing list discussion with Viktor Klang).
The Java Stream Parallel
Java streams are black magic !
How fast are the Java 8 Streams compared to for-loops?
Videos
First off, apologies for being AWOL. Been (and still am) juggling a lot of emergencies, both work and personal.
My team was in crunch time to respond to a pretty ridiculous client ask. In order to get things in in time, we had to ignore performance, and kind of just took the "shoot first, look later" approach. We got surprisingly lucky, except in one instance where we were using Java Streams.
It was a seemingly simple task -- download a file, split into several files based on an attribute, and then upload those split files to a new location.
But there is one catch -- both the input and output files were larger than the amount of RAM and hard disk available on the machine. Or at least, I was told to operate on that assumption when developing a solution.
No problem, I thought. We can just grab the file in batches and write out the batches.
This worked out great, but the performance was not good enough for what we were doing. In my overworked and rushed mind, I thought it would be a good idea to just turn on parallelism for that stream. That way, we could run N times faster, according to the number of cores on that machine, right?
Before I go any further, this is (more or less) what the stream looked like.
try (final Stream<String> myStream = SomeClass.openStream(someLocation)) {
myStream
.parallel()
//insert some intermediate operations here
.gather(Gatherers.windowFixed(SOME_BATCH_SIZE))
//insert some more intermediate operations here
.forEach(SomeClass::upload)
;
}So, running this sequentially, it worked just fine on both smaller and larger files, albeit, slower than we needed.
So I turned on parallelism, ran it on a smaller file, and the performance was excellent. Exactly what we wanted.
So then I tried running a larger file in parallel.
OutOfMemoryError
I thought, ok, maybe the batch size is too large. Dropped it down to 100k lines (which is tiny in our case).
OutOfMemoryError
Getting frustrated, I dropped my batch size down to 1 single, solitary line.
OutOfMemoryError
Losing my mind, I boiled down my stream to the absolute minimum possible functionality possible to eliminate any chance of outside interference. I ended up with the following stream.
final AtomicLong rowCounter = new AtomicLong();
myStream
.parallel()
//no need to batch because I am literally processing this file each line at a time, albeit, in parallel.
.forEach(eachLine -> {
final long rowCount = rowCounter.getAndIncrement();
if (rowCount % 1_000_000 == 0) { //This will log the 0 value, so I know when it starts.
System.out.println(rowCount);
}
})
;And to be clear, I specifically designed that if statement so that the 0 value would be printed out. I tested it on a small file, and it did exactly that, printing out 0, 1000000, 2000000, etc.
And it worked just fine on both small and large files when running sequentially. And it worked just fine on a small file in parallel too.
Then I tried a larger file in parallel.
OutOfMemoryError
And it didn't even print out the 0. Which means, it didn't even process ANY of the elements AT ALL. It just fetched so much data and then died without hitting any of the pipeline stages.
At this point, I was furious and panicking, so I just turned my original stream sequential and upped my batch size to a much larger number (but still within our RAM requirements). This ended up speeding up performance pretty well for us because we made fewer (but larger) uploads. Which is not surprising -- each upload has to go through that whole connection process, and thus, we are paying a tax for each upload we do.
Still, this just barely met our performance needs, and my boss told me to ship it.
Weeks later, when things finally calmed down enough that I could breathe, I went onto the mailing list to figure out what on earth was happening with my stream.
Here is the start of the mailing list discussion.
https://mail.openjdk.org/pipermail/core-libs-dev/2024-November/134508.html
As it turns out, when a stream turns parallel, the intermediate and terminal operations you do on that stream will decide the fetching behaviour the stream uses on the source.
In our case, that meant that, if MY parallel stream used the forEach terminal operation, then the stream decides that the smartest thing to do to speed up performance is to fetch the entire dataset ahead of time and store it into an internal buffer in RAM before doing ANY PROCESSING WHATSOEVER. Resulting in an OutOfMemoryError.
And to be fair, that is not stupid at all. It makes good sense from a performance stand point. But it makes things risky from a memory standpoint.
Anyways, this is a very sharp and painful corner about parallel streams that i did not know about, so I wanted to bring it up here in case it would be useful for folks. I intend to also make a StackOverflow post to explain this in better detail.
Finally, as a silver-lining, Viktor Klang let me know that, a .gather() immediately followed by a .collect(), is immune to this pre-fetching behaviour mentioned above. Therefore, I could just create a custom Collector that does what I was doing in my forEach(). Doing it that way, I could run things in parallel safely without any fear of the dreaded OutOfMemoryError.
(and tbh, forEach() wasn't really the best idea for that operation). You can read more about it in the mailing list link above.
Please let me know if there are any questions, comments, or concerns.
EDIT -- Some minor clarifications. There are 2 issues interleaved here that makes it difficult to track the error.
-
Gatherers don't (currently) play well with some of the other terminal operations when running in parallel.
-
Iterators are parallel-unfriendly when operatiing as a stream source.
When I tried to boil things down to the simplistic scenario in my code above, I was no longer afflicted by problem 1, but was now afflicted by problem 2. My stream source was the source of the problem in that completely boiled down scenario.
Now that said, that only makes this problem less likely to occur than it appears. The simple reality is, it worked when running sequentially, but failed when running in parallel. And the only way I could find out that my stream source was "bad" was by diving into all sorts of libraries that create my stream. It wasn't until then that I realized the danger I was in.
https://daniel.avery.io/writing/the-java-streams-parallel
I made this "expert-friendly" doc, to orient all who find themselves probing the Java Streams source code in despair. It culminates in the "Stream planner" - a little tool I made to simulate how (parallel) stream operations affect memory usage and execution paths.
Go forth, use (parallel) streams with confidence, and don't run out of memory.