You say “but I don't quite know in which order things are evaluated and where buffering occurs”, which is precisely what parallel streams are about. The order of evaluation is unspecified.

A critical aspect of your example is the .limit(100_000_000). This implies that the implementation can’t just sum up arbitrary values, but must sum up the first 100,000,000 numbers. Note that in the reference implementation, .unordered().limit(100_000_000) doesn’t change the outcome, which indicates that there’s no special implementation for the unordered case, but that’s an implementation detail.

Now, when worker threads process the elements, they can’t just sum them up, as they have to know which elements they are allowed to consume, which depends on how many elements are preceding their specific workload. Since this stream doesn’t know the sizes, this can only be known when the prefix elements have been processed, which never happens for infinite streams. So the worker threads keep buffering for the moment, this information becomes available.

In principle, when a worker thread knows that it processes the leftmost¹ work-chunk, it could sum up the elements immediately, count them, and signal the end when reaching the limit. So the Stream could terminate, but this depends on a lot of factors.

In your case, a plausible scenario is that the other worker threads are faster in allocating buffers than the leftmost job is counting. In this scenario, subtle changes to the timing could make the stream occasionally return with a value.

When we slow down all worker threads except the one processing the leftmost chunk, we can make the stream terminate (at least in most runs):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ I’m following a suggestion by Stuart Marks to use left-to-right order when talking about the encounter order rather than the processing order.

Answer from Holger on Stack Overflow
Top answer
1 of 3
11

You say “but I don't quite know in which order things are evaluated and where buffering occurs”, which is precisely what parallel streams are about. The order of evaluation is unspecified.

A critical aspect of your example is the .limit(100_000_000). This implies that the implementation can’t just sum up arbitrary values, but must sum up the first 100,000,000 numbers. Note that in the reference implementation, .unordered().limit(100_000_000) doesn’t change the outcome, which indicates that there’s no special implementation for the unordered case, but that’s an implementation detail.

Now, when worker threads process the elements, they can’t just sum them up, as they have to know which elements they are allowed to consume, which depends on how many elements are preceding their specific workload. Since this stream doesn’t know the sizes, this can only be known when the prefix elements have been processed, which never happens for infinite streams. So the worker threads keep buffering for the moment, this information becomes available.

In principle, when a worker thread knows that it processes the leftmost¹ work-chunk, it could sum up the elements immediately, count them, and signal the end when reaching the limit. So the Stream could terminate, but this depends on a lot of factors.

In your case, a plausible scenario is that the other worker threads are faster in allocating buffers than the leftmost job is counting. In this scenario, subtle changes to the timing could make the stream occasionally return with a value.

When we slow down all worker threads except the one processing the leftmost chunk, we can make the stream terminate (at least in most runs):

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ I’m following a suggestion by Stuart Marks to use left-to-right order when talking about the encounter order rather than the processing order.

2 of 3
5

My best guess is that adding parallel() changes the internal behavior of flatMap() which already had problems being evaluated lazily before.

The OutOfMemoryError error that you are getting was reported in [JDK-8202307] Getting a java.lang.OutOfMemoryError: Java heap space when calling Stream.iterator().next() on a stream which uses an infinite/very big Stream in flatMap. If you look at the ticket it's more or less the same stack trace that you are getting. The ticket was closed as Won't Fix with following reason:

The iterator() and spliterator() methods are "escape hatches" to be used when it's not possible to use other operations. They have some limitations because they turn what is a push model of the stream implementation into a pull model. Such a transition requires buffering in certain cases, such as when an element is (flat) mapped to two or more elements. It would significantly complicate the stream implementation, likely at the expense of common cases, to support a notion of back-pressure to communicate how many elements to pull through nested layers of element production.

🌐
GitHub
github.com › aws › aws-sdk-java › issues › 3067
Direct memory leak when getting object metadata inside parallelStream with Java 17 · Issue #3067 · aws/aws-sdk-java
December 7, 2023 - When calling client.getObjectMetadata(bucket, key) inside a parallelStream(), we notice the direct memory usage going up when using Java 17 (and there are multiple objects in the bucket). This does not happen with Java 11 or when using stream(). Direct memory shouldn't be going up.
Author   winzsanchez
Discussions

java 8 - Memory usage for a parallel stream from File.lines() - Stack Overflow
I am reading lines from large files (8GB+) using Files.lines(). If processing sequentially it works great, with a very low memory footprint. As soon as I add parallel() to the stream it seems to ... More on stackoverflow.com
🌐 stackoverflow.com
A surprising pain point regarding Parallel Java Streams (featuring mailing list discussion with Viktor Klang).
I did want to follow up about one point Viktor made later on in the conversation. https://mail.openjdk.org/pipermail/core-libs-dev/2024-November/134542.html And here is the quote. In a potential future where all intermediate operations are Gatherer-based, and all terminal operations are Collector-based, it would just work as expected. But with that said, I'm not sure it is practically achievable because some operations might not have the same performance-characteristics as before. Me personally, I would GLADLY accept a flag on stream (similar to parallel() or unordered()) that would allow me to guarantee that my stream never pre-fetches, even if I take a massive performance hit. If that can be accomplished by making all intermediate operations be implemented by a Gatherer under the hood, that is A-OK with me. The reality is, not all streams are compute bound. Some are IO bound, but are otherwise, a great fit for streams. Having a method that allows us to optimize for that fact is a new type of performance enhancement that I would greatly appreciate, even if it degrades performance in other ways. More on reddit.com
🌐 r/java
94
223
November 20, 2024
How to prevent heap space error when using large parallel Java 8 stream - Stack Overflow
So it does not matter where I add the .parallel()call. But still: How to not run into a OutOfMemeoryError during execution? ... Maybe giving it more memory is just throwing gasoline on the fire :), are you giving your JVM any parameters for the maximum memory size? See this SO post: stackoverflow.com/questions/14763079/… ... Increasing the limits is not a real solution, cause there would be another size of the stream ... More on stackoverflow.com
🌐 stackoverflow.com
February 13, 2015
Java 8 stream objects significant memory usage - Stack Overflow
In looking at some profiling results, I noticed that using streams within a tight loop (used instead of another nested loop) incurred a significant memory overhead of objects of types java.util.str... More on stackoverflow.com
🌐 stackoverflow.com
🌐
Medium
nagahemanthkotha.medium.com › how-fork-join-pool-caused-memory-leak-dcbdcf606749
How Fork Join Pool caused Memory Leak!! | by Naga Hemanth Kotha | Medium
February 14, 2021 - First thing parallel streams uses common pool so If you are thinking that using parallel streams everywhere will make your code run faster you are wrong. After certain number of tasks given to common pool based on the cores in that system it becomes blocking and also remember that java uses common pool internally,so you cant use parallel streams everywhere and think that it will be faster
🌐
OpenJDK
bugs.openjdk.org › browse › JDK-8191559
[JDK-8191559] Spliterators have a memory leak
FULL PRODUCT VERSION : java version ... When using StreamSupport.stream(Spliterators.spliteratorUnknownSize(someLargeIterator, 0), true) the JVM crashes as it runs out of HEAP space....
Top answer
1 of 2
5

Tracing through the code my guess is the Spliterator used by Files.lines() is Spliterators.IteratorSpliterator. whose trySplit() method has this comment:

        /*
         * Split into arrays of arithmetically increasing batch
         * sizes.  This will only improve parallel performance if
         * per-element Consumer actions are more costly than
         * transferring them into an array.  The use of an
         * arithmetic progression in split sizes provides overhead
         * vs parallelism bounds that do not particularly favor or
         * penalize cases of lightweight vs heavyweight element
         * operations, across combinations of #elements vs #cores,
         * whether or not either are known.  We generate
         * O(sqrt(#elements)) splits, allowing O(sqrt(#cores))
         * potential speedup.
         */

The code then looks like it splits into batches of multiples of 1024 records (lines). So the first split will read 1024 lines then the next one will read 2048 lines etc on and on. Each split will read larger and larger batch sizes.

If your file is really big, it will eventually hit a max batch size of 33,554,432 which is 1<<25. Remember that's lines not bytes which will probably cause an out of memory error especially when you start having multiple threads read that many.

That also explains the slow down. Those lines are read ahead of time before the thread can process those lines.

So I would either not use parallel() at all or if you must because the computations you are doing are expensive per line, write your own Spliterator that doesn't split like this. Probably just always using a batch of 1024 is fine.

2 of 2
1

As mentioned by dkatzel. This problem is caused by the Spliterator.IteratorSplitter which will batch the elements in your stream. Where the batch size will start with 1024 elements and grow to 33,554,432 elements.

Another solution for this can be to use the FixedBatchSpliteratorBase which is proposed in the article on Faster parallel processing in Java using Streams and a spliterator.

🌐
Reddit
reddit.com › r/java › a surprising pain point regarding parallel java streams (featuring mailing list discussion with viktor klang).
r/java on Reddit: A surprising pain point regarding Parallel Java Streams (featuring mailing list discussion with Viktor Klang).
November 20, 2024 -

First off, apologies for being AWOL. Been (and still am) juggling a lot of emergencies, both work and personal.

My team was in crunch time to respond to a pretty ridiculous client ask. In order to get things in in time, we had to ignore performance, and kind of just took the "shoot first, look later" approach. We got surprisingly lucky, except in one instance where we were using Java Streams.

It was a seemingly simple task -- download a file, split into several files based on an attribute, and then upload those split files to a new location.

But there is one catch -- both the input and output files were larger than the amount of RAM and hard disk available on the machine. Or at least, I was told to operate on that assumption when developing a solution.

No problem, I thought. We can just grab the file in batches and write out the batches.

This worked out great, but the performance was not good enough for what we were doing. In my overworked and rushed mind, I thought it would be a good idea to just turn on parallelism for that stream. That way, we could run N times faster, according to the number of cores on that machine, right?

Before I go any further, this is (more or less) what the stream looked like.

try (final Stream<String> myStream = SomeClass.openStream(someLocation)) {
    myStream
        .parallel()
        //insert some intermediate operations here
        .gather(Gatherers.windowFixed(SOME_BATCH_SIZE))
        //insert some more intermediate operations here
        .forEach(SomeClass::upload)
        ;
}

So, running this sequentially, it worked just fine on both smaller and larger files, albeit, slower than we needed.

So I turned on parallelism, ran it on a smaller file, and the performance was excellent. Exactly what we wanted.

So then I tried running a larger file in parallel.

OutOfMemoryError

I thought, ok, maybe the batch size is too large. Dropped it down to 100k lines (which is tiny in our case).

OutOfMemoryError

Getting frustrated, I dropped my batch size down to 1 single, solitary line.

OutOfMemoryError

Losing my mind, I boiled down my stream to the absolute minimum possible functionality possible to eliminate any chance of outside interference. I ended up with the following stream.

final AtomicLong rowCounter = new AtomicLong();
myStream
    .parallel()
    //no need to batch because I am literally processing this file each line at a time, albeit, in parallel.
    .forEach(eachLine -> {
        final long rowCount = rowCounter.getAndIncrement();
        if (rowCount % 1_000_000 == 0) { //This will log the 0 value, so I know when it starts.
            System.out.println(rowCount);
        }
    })
    ;

And to be clear, I specifically designed that if statement so that the 0 value would be printed out. I tested it on a small file, and it did exactly that, printing out 0, 1000000, 2000000, etc.

And it worked just fine on both small and large files when running sequentially. And it worked just fine on a small file in parallel too.

Then I tried a larger file in parallel.

OutOfMemoryError

And it didn't even print out the 0. Which means, it didn't even process ANY of the elements AT ALL. It just fetched so much data and then died without hitting any of the pipeline stages.

At this point, I was furious and panicking, so I just turned my original stream sequential and upped my batch size to a much larger number (but still within our RAM requirements). This ended up speeding up performance pretty well for us because we made fewer (but larger) uploads. Which is not surprising -- each upload has to go through that whole connection process, and thus, we are paying a tax for each upload we do.

Still, this just barely met our performance needs, and my boss told me to ship it.

Weeks later, when things finally calmed down enough that I could breathe, I went onto the mailing list to figure out what on earth was happening with my stream.

Here is the start of the mailing list discussion.

https://mail.openjdk.org/pipermail/core-libs-dev/2024-November/134508.html

As it turns out, when a stream turns parallel, the intermediate and terminal operations you do on that stream will decide the fetching behaviour the stream uses on the source.

In our case, that meant that, if MY parallel stream used the forEach terminal operation, then the stream decides that the smartest thing to do to speed up performance is to fetch the entire dataset ahead of time and store it into an internal buffer in RAM before doing ANY PROCESSING WHATSOEVER. Resulting in an OutOfMemoryError.

And to be fair, that is not stupid at all. It makes good sense from a performance stand point. But it makes things risky from a memory standpoint.

Anyways, this is a very sharp and painful corner about parallel streams that i did not know about, so I wanted to bring it up here in case it would be useful for folks. I intend to also make a StackOverflow post to explain this in better detail.

Finally, as a silver-lining, Viktor Klang let me know that, a .gather() immediately followed by a .collect(), is immune to this pre-fetching behaviour mentioned above. Therefore, I could just create a custom Collector that does what I was doing in my forEach(). Doing it that way, I could run things in parallel safely without any fear of the dreaded OutOfMemoryError.

(and tbh, forEach() wasn't really the best idea for that operation). You can read more about it in the mailing list link above.

Please let me know if there are any questions, comments, or concerns.

EDIT -- Some minor clarifications. There are 2 issues interleaved here that makes it difficult to track the error.

  1. Gatherers don't (currently) play well with some of the other terminal operations when running in parallel.

  2. Iterators are parallel-unfriendly when operatiing as a stream source.

When I tried to boil things down to the simplistic scenario in my code above, I was no longer afflicted by problem 1, but was now afflicted by problem 2. My stream source was the source of the problem in that completely boiled down scenario.

Now that said, that only makes this problem less likely to occur than it appears. The simple reality is, it worked when running sequentially, but failed when running in parallel. And the only way I could find out that my stream source was "bad" was by diving into all sorts of libraries that create my stream. It wasn't until then that I realized the danger I was in.

Top answer
1 of 5
40
I did want to follow up about one point Viktor made later on in the conversation. https://mail.openjdk.org/pipermail/core-libs-dev/2024-November/134542.html And here is the quote. In a potential future where all intermediate operations are Gatherer-based, and all terminal operations are Collector-based, it would just work as expected. But with that said, I'm not sure it is practically achievable because some operations might not have the same performance-characteristics as before. Me personally, I would GLADLY accept a flag on stream (similar to parallel() or unordered()) that would allow me to guarantee that my stream never pre-fetches, even if I take a massive performance hit. If that can be accomplished by making all intermediate operations be implemented by a Gatherer under the hood, that is A-OK with me. The reality is, not all streams are compute bound. Some are IO bound, but are otherwise, a great fit for streams. Having a method that allows us to optimize for that fact is a new type of performance enhancement that I would greatly appreciate, even if it degrades performance in other ways.
2 of 5
11
This was a fascinanting read. Thank you for sharing. I guess it is kinda bad when higher level non-trivial apis, like streams or fork-join, do not expose lower level oprations as user-overridable constructs. Like in this example an iteration strategy for streams, or underlying executor of fork-join pool. Seems like an obvious thing to have because nobody knows better how thing will be used than end user..
Find elsewhere
Top answer
1 of 2
3

The problem is that you are using constructs which are hard to parallelize.

First, Stream.iterate(…) creates a sequence of numbers where each calculation depends on the previous value, hence, it offers no room for parallel computation. Even worse, it creates an infinite stream which will be handled by the implementation like a stream with unknown size. For splitting the stream, the values have to be collected into arrays before they can be handed over to other computation threads.

Second, providing a limit(…) doesn’t improve the situation, it makes the situation even worse. Applying a limit removes the size information which the implementation just had gathered for the array fragments. The reason is that the stream is ordered, thus a thread processing an array fragment doesn’t know whether it can process all elements as that depends on the information how many previous elements other threads are processing. This is documented:

“… it can be quite expensive on ordered parallel pipelines, especially for large values of maxSize, since limit(n) is constrained to return not just any n elements, but the first n elements in the encounter order.”

That’s a pity as we perfectly know that the combination of an infinite sequence returned by iterate with a limit(…) actually has an exactly known size. But the implementation doesn’t know. And the API doesn’t provide a way to create an efficient combination of the two. But we can do it ourselves:

static DoubleStream iterate(double seed, DoubleUnaryOperator f, long limit) {
  return StreamSupport.doubleStream(new Spliterators.AbstractDoubleSpliterator(limit,
     Spliterator.ORDERED|Spliterator.SIZED|Spliterator.IMMUTABLE|Spliterator.NONNULL) {
       long remaining=limit;
       double value=seed;
       public boolean tryAdvance(DoubleConsumer action) {
           if(remaining==0) return false;
           double d=value;
           if(--remaining>0) value=f.applyAsDouble(d);
           action.accept(d);
           return true;
       }
   }, false);
}

Once we have such an iterate-with-limit method we can use it like

iterate(1d, d -> -(d+2*(Math.abs(d)/d)), 999999999L).parallel().map(d->4.0d/d).sum()

this still doesn’t benefit much from parallel execution due to the sequential nature of the source, but it works. On my four core machine it managed to get roughly 20% gain.

2 of 2
-1

This is because the default ForkJoinPool implementation used by the parallel() method does not limit the number of threads that get created. The solution is to provide a custom implementation of a ForkJoinPool that is limited to the number of threads that it executes in parallel. This can be achieved as mentioned below:

ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
forkJoinPool.submit(() -> DoubleStream.iterate(1d, d->-(d+2*(Math.abs(d)/d))).parallel().limit(999999999L).map(d->4.0d/d).sum());

🌐
Usi
dag.inf.usi.ch › wp-content › uploads › iceccs22-wosp.pdf pdf
Optimizing Parallel Java Streams - Dynamic Analysis Group
analysis to characterize stream pipelines, detect parallel streams, and apply transformations removing the abstraction overhead. We evaluate our method on a set of benchmarks, showing that · our approach significantly reduces execution time and memory
Top answer
1 of 2
27

Using Stream API you indeed allocate more memory, though your experimental setup is somewhat questionable. I've never used JFR, but my findings using JOL are quite similar to yours.

Note that you measure not only the heap allocated during the ArrayList querying, but also during its creation and population. The allocation during the allocation and population of single ArrayList should look like this (64bits, compressed OOPs, via JOL):

 COUNT       AVG       SUM   DESCRIPTION
     1       416       416   [Ljava.lang.Object;
     1        24        24   java.util.ArrayList
     1        32        32   java.util.Random
     1        24        24   java.util.concurrent.atomic.AtomicLong
     4                 496   (total)

So the most memory allocated is the Object[] array used inside ArrayList to store the data. AtomicLong is a part of Random class implementation. If you perform this 100_000_000 times, then you should have at least 496*10^8/2^30 = 46.2 Gb allocated in both tests. Nevertheless this part could be skipped as it should be identical for both tests.

Another interesting thing here is inlining. JIT is smart enough to inline the whole getIndexOfNothingManualImpl (via java -XX:+UnlockDiagnosticVMOptions -XX:+PrintCompilation -XX:+PrintInlining StreamMemoryTest):

  StreamMemoryTest::main @ 13 (59 bytes)
     ...
     @ 30   StreamMemoryTest::getIndexOfNothingManualImpl (43 bytes)   inline (hot)
       @ 1   java.util.ArrayList::iterator (10 bytes)   inline (hot)
        \-> TypeProfile (2132/2132 counts) = java/util/ArrayList
         @ 6   java.util.ArrayList$Itr::<init> (6 bytes)   inline (hot)
           @ 2   java.util.ArrayList$Itr::<init> (26 bytes)   inline (hot)
             @ 6   java.lang.Object::<init> (1 bytes)   inline (hot)
       @ 8   java.util.ArrayList$Itr::hasNext (20 bytes)   inline (hot)
        \-> TypeProfile (215332/215332 counts) = java/util/ArrayList$Itr
         @ 8   java.util.ArrayList::access$100 (5 bytes)   accessor
       @ 17   java.util.ArrayList$Itr::next (66 bytes)   inline (hot)
         @ 1   java.util.ArrayList$Itr::checkForComodification (23 bytes)   inline (hot)
         @ 14   java.util.ArrayList::access$100 (5 bytes)   accessor
       @ 28   StreamMemoryTest$$Lambda$1/791452441::test (8 bytes)   inline (hot)
        \-> TypeProfile (213200/213200 counts) = StreamMemoryTest$$Lambda$1
         @ 4   StreamMemoryTest::lambda$main$0 (13 bytes)   inline (hot)
           @ 1   java.lang.Integer::intValue (5 bytes)   accessor
       @ 8   java.util.ArrayList$Itr::hasNext (20 bytes)   inline (hot)
         @ 8   java.util.ArrayList::access$100 (5 bytes)   accessor
     @ 33   StreamMemoryTest::consume (19 bytes)   inline (hot)

Disassembly actually shows that no allocation of iterator is performed after warm-up. Because escape analysis successfully tells JIT that iterator object does not escape, it's simply scalarized. Were the Iterator actually allocated it would take additionally 32 bytes:

 COUNT       AVG       SUM   DESCRIPTION
     1        32        32   java.util.ArrayList$Itr
     1                  32   (total)

Note that JIT could also remove iteration at all. Your blackhole is false by default, so doing blackhole = blackhole && value does not change it regardless of the value, and value calculation could be excluded at all, as it does not have any side effects. I'm not sure whether it actually did this (reading disassembly is quite hard for me), but it's possible.

However while getIndexOfNothingStreamImpl also seems to inline everything inside, escape analysis fails as there are too many interdependent objects inside the stream API, so actual allocations occur. Thus it really adds five additional objects (the table is composed manually from JOL outputs):

 COUNT       AVG       SUM   DESCRIPTION
     1        32        32   java.util.ArrayList$ArrayListSpliterator
     1        24        24   java.util.stream.FindOps$FindSink$OfRef
     1        64        64   java.util.stream.ReferencePipeline$2
     1        24        24   java.util.stream.ReferencePipeline$2$1
     1        56        56   java.util.stream.ReferencePipeline$Head
     5                 200   (total)

So every invocation of this particular stream actually allocates 200 additional bytes. As you perform 100_000_000 iterations, in total Stream version should allocate 10^8*200/2^30 = 18.62Gb more than manual version which is close to your result. I think, AtomicLong inside Random is scalarized as well, but both Iterator and AtomicLong are present during the warmup iterations (until JIT actually creates the most optimized version). This would explain the minor discrepancies in the numbers.

This additional 200 bytes allocation does not depend on the stream size, but depends on the number of intermediate stream operations (in particular, every additional filter step would add 64+24=88 bytes more). However note that these objects are usually short-lived, allocated quickly and can be collected by minor GC. In most of real-life applications you probably should not have to worry about this.

2 of 2
7

Not only more memory due to the infrastructure that is needed to build the Stream API. But also, it might to be slower in terms of speed (at least for this small inputs).

There is this presentation from one of the developers from Oracle (it is in russian, but that is not the point) that shows a trivial example (not much more complicated then yours) where the speed of execution is 30% worse in case of Streams vs Loops. He says that's pretty normal.

One thing that I've notice that not a lot of people realize is that using Streams (lambda's and method references to be more precise) will also create (potentially) a lot of classes that you will not know about.

Try to run your example with :

  -Djdk.internal.lambda.dumpProxyClasses=/Some/Path/Of/Yours

And see how many additional classes will be created by your code and the code that Streams need (via ASM)

🌐
GitHub
github.com › reactor › reactor-core › issues › 3177
Apparent memory leak in pipeline combining flatMaps ParallelFlux and map to same object type · Issue #3177 · reactor/reactor-core
August 31, 2022 - Interestingly, if we map to a different type of object the leak does not seem to take place. import java.util.stream.Stream; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; class MemLeakTest { @Test void memLeakTest() { Flux.just("a", "b", "c", "d", "e", "f", "g") .flatMap(id -> generateTuplesFor(id, (Integer.MAX_VALUE / 7))) .parallel(6) .runOn(Schedulers.parallel()) .flatMap(this::generatePairsFor) .map(this::mapPair) .sequential() .map(this::mapPair) // If you remove this call, it seems to run without leaking.
Author   pmaria
Top answer
1 of 15
11

I want to give advice on how to monitor an application for the memory leaks with the tools that are available in the JVM. It doesn't show how to generate the memory leak, but explains how to detect it with the minimum tools available.

You need to monitor Java memory consumption first.

The simplest way to do this is to use the jstat utility that comes with JVM:

jstat -gcutil <process_id> <timeout>

It will report memory consumption for each generation (young, eldery and old) and garbage collection times (young and full).

As soon as you spot that a full garbage collection is executed too often and takes too much time, you can assume that application is leaking memory.

Then you need to create a memory dump using the jmap utility:

jmap -dump:live,format=b,file=heap.bin <process_id>

Then you need to analyse the heap.bin file with a memory analyser, Eclipse Memory Analyzer (MAT) for example.

MAT will analyze the memory and provide you suspect information about memory leaks.

2 of 15
10

I think that a valid example could be using ThreadLocal variables in an environment where threads are pooled.

For instance, using ThreadLocal variables in Servlets to communicate with other web components, having the threads being created by the container and maintaining the idle ones in a pool. ThreadLocal variables, if not correctly cleaned up, will live there until, possibly, the same web component overwrites their values.

Of course, once identified, the problem can be solved easily.

Top answer
1 of 3
42

It's important to be precise in the use of the term 'memory leak' with respect to Java.

In Java, a memory leak happens when your code holds a reference permanently, so that some object is never garbage collected.

Failing to close a stream is not a memory leak in this sense. Streams that have native resources have finalizers; the GC will eventually close them down. Unless you hold a reference to the unclosed stream it isn't a leak.

However, there are other kinds of leaks besides memory leaks. Most operating systems limit the number of open files. If you fail to close your streams, the GC may take a very long time to close them for you; the net result may be that you run out of system file descriptors and your code fails to open one more file. Some people will call this a leak, but it's not accuracy to call it a memory leak.

2 of 3
9

(at least java 1.6 and earlier did had this problem).

ANY version of Java, and for that matter, ANY language, has this problem; it is not specific to Java.

If you get hold of an input or output handle which needs system resources, you need to release them no matter what.

Java has Closeable to signify that the instance you hold may, or may not, hold system resources; but in the event that it does, if you don't .close() it you'll leak resources; it's that simple.

For instance, you may have a method which has an InputStream as an argument; OK, fine, but what is this InputStream? Is it a FileInputStream or a ByteArrayInputStream, or even something else? You cannot know. The first needs to be closed properly but the second doesn't need to.

So what? .close() them all anyway, you have nothing to lose. Do you really want to take the chance?

With Java 6, you'll want to use Guava's Closer, since this is the safest way to have all your resources closed (and the JDK does not provide such a tool):

final Closer closer = Closer.create();

try {
    final InputStream in = closer.register(openInHere());
    final InputStream in2 = closer.register(...);
    final OutputStream out = closer.register(...);
    // do work with in, in2, out, other
} catch (WhateverException e) {
    // IF WhateverException is not an IOException and you want to rethrow...
    // throw closer.rethrow(e, WhateverException.class);
} finally {
    closer.close(); // always safe
}

With Java 7, you have try-with-resources which works with all AutoCloseable resources (which Closeable extends):

try (
    final InputStream in = ...;
    // etc
) {
    // work with AutoCloseable resources
} catch (WhateverException e) {
    // deal with e, rethrow if necessary
}

The main difference between Closer and try-with-resources is that with the latter, resources will be closed before catch whereas Closer will close them in finally.

But again: don't take a chance. Close them all.

🌐
Baeldung
baeldung.com › home › java › core java › understanding memory leaks in java
Understanding Memory Leaks in Java | Baeldung
February 25, 2026 - Learn what memory leaks are in Java, how to recognize them at runtime, what causes them, and strategies for preventing them.
🌐
GitHub
github.com › spring-projects › spring-framework › issues › 30955
Java 17: resource.isReadable() with concurrency leaks large amounts of non-heap memory · Issue #30955 · spring-projects/spring-framework
July 26, 2023 - Scenario: converting application from java 8 to java 17 a large increase in non heap memory occurred. determined critical region: during startup, application scans all classes looking for annotations. This is based on configuration setti...
Author   peterdnight
🌐
DZone
dzone.com › coding › java › think twice before using java 8 parallel streams
Think Twice Before Using Java 8 Parallel Streams
August 13, 2019 - The other option is to not use parallel streams and wait until Oracle allows us to specify the thread pool to be used for parallel streams. ... If you enjoyed this article and want to learn more about Java Streams, check out this collection of tutorials and articles on all things Java Streams.
🌐
Medium
medium.com › @mesfandiari77 › parallel-stream-in-java-ac47c54176e0
Parallel Stream in java. Parallel Stream is a feature introduced… | by MEsfandiari | Medium
June 22, 2023 - 1. High memory consumption: Using Parallel Stream can lead to higher memory usage compared to traditional methods, as creating new threads to run parallel operations incurs additional memory overhead.
🌐
yCrash
blog.ycrash.io › home › untangling deadlocks caused by java’s parallelstream
Untangling Deadlocks Caused by Java's parallelStream - yCrash
August 31, 2025 - Deadlocks, those insidious hiccups in the world of multithreaded programming, can bring even the most robust application to its knees. It describes a situation where two or more threads are blocked forever, waiting for each other.
🌐
Stack Overflow
stackoverflow.com › questions › 29718497 › java-memory-leak-multithreading
Java memory leak multithreading - Stack Overflow
This is a small service which is leaking somewhere. Recently I've refactored my code where I've added parallel job execution and after release noticed that my heap is growing and GC is not able to free application memory.