You need to use forEachOrdered, not forEach.
As per the forEach doc:
Answer from Louis Wasserman on Stack OverflowFor parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever thread the library chooses. If the action accesses shared state, it is responsible for providing the required synchronization.
You need to use forEachOrdered, not forEach.
As per the forEach doc:
For parallel stream pipelines, this operation does not guarantee to respect the encounter order of the stream, as doing so would sacrifice the benefit of parallelism. For any given element, the action may be performed at whatever time and in whatever thread the library chooses. If the action accesses shared state, it is responsible for providing the required synchronization.
In addition, you can read more about parallelism and forEachOrdered with a very nice example from here. In summary, using forEachOrdered in a parallel stream might result to lose the benefits of parallelism.
Here the example from the same resource:
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers =
new ArrayList<>(Arrays.asList(intArray));
System.out.println("listOfIntegers:");
listOfIntegers
.stream()
.forEach(e -> System.out.print(e + " "));
System.out.println("");
System.out.println("listOfIntegers sorted in reverse order:");
Comparator<Integer> normal = Integer::compare;
Comparator<Integer> reversed = normal.reversed();
Collections.sort(listOfIntegers, reversed);
listOfIntegers
.stream()
.forEach(e -> System.out.print(e + " "));
System.out.println("");
System.out.println("Parallel stream");
listOfIntegers
.parallelStream()
.forEach(e -> System.out.print(e + " "));
System.out.println("");
System.out.println("Another parallel stream:");
listOfIntegers
.parallelStream()
.forEach(e -> System.out.print(e + " "));
System.out.println("");
System.out.println("With forEachOrdered:");
listOfIntegers
.parallelStream()
.forEachOrdered(e -> System.out.print(e + " "));
System.out.println("");
And the output is
listOfIntegers:
1 2 3 4 5 6 7 8
listOfIntegers sorted in reverse order:
8 7 6 5 4 3 2 1
Parallel stream:
3 4 1 6 2 5 7 8
Another parallel stream:
6 3 1 5 7 8 4 2
With forEachOrdered:
8 7 6 5 4 3 2 1
The fifth pipeline uses the method forEachOrdered, which processes the elements of the stream in the order specified by its source, regardless of whether you executed the stream in serial or parallel. Note that you may lose the benefits of parallelism if you use operations like forEachOrdered with parallel streams
.
Videos
TL;DR
Yes, the order is guaranteed.
Stream.collect() API documentation
The starting place is to look at what determines whether a reduction is concurrent or not. Stream.collect()'s description says the following:
If the stream is parallel, and the
Collectoris concurrent, and either the stream is unordered or the collector is unordered, then a concurrent reduction will be performed (seeCollectorfor details on concurrent reduction.)
The first condition is satisfied: the stream is parallel. How about the second and third: is the Collector concurrent and unordered?
Collectors.toList() API documentation
toList()'s documentation reads:
Returns a
Collectorthat accumulates the input elements into a newList. There are no guarantees on the type, mutability, serializability, or thread-safety of theListreturned; if more control over the returnedListis required, usetoCollection(Supplier).Returns:
a Collector which collects all the input elements into a List, in encounter order
An operation that works in encounter order operates on the elements in their original order. This overrides parallelness.
Implementation code
Inspecting the implementation of Collectors.java confirms that toList() does not include the CONCURRENT or UNORDERED traits.
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}
// ...
static final Set<Collector.Characteristics> CH_ID
= Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
Notice how the collector has the CH_ID trait set, which has only the single IDENTITY_FINISH trait. CONCURRENT and UNORDERED are not there, so the reduction cannot be concurrent.
A non-concurrent reduction means that, if the stream is parallel, collection can proceed in parallel, but it will be split into several thread-confined intermediate results which are then combined. This ensures the combined result is in encounter order.
See also: Why parallel stream get collected sequentially in Java 8
You are guaranteed to get the elements in encounter order.
From documentation of toList:
Returns: a Collector which collects all the input elements into a List, in encounter order
See java.util.streams summary for more information on the term "encounter order".
Furthermore, List#spliterator documentation requires that the all implementations of List produce spliterators that are ORDERED:
The Spliterator reports Spliterator.SIZED and Spliterator.ORDERED. Implementations should document the reporting of additional characteristic values.
Oddly enough, while List interface requires iterator() to produce elements in "proper sequence", spliterator() is only required to be ordered but not specifically required to follow the list's natural ordering.
So, to answer your question, the list produced by toList is guaranteed to contain the elements exactly as the source list's spliterator orders them. It does not matter whether the stream is parallel or sequential.
I assume that your problem is purely educational and experimental without any practical application as there are much more efficient ways to sort elements in Java. If you want to utilize the Stream API here, you may create a spliterator which performs bubble sorting and collector which performs merge sorting in combiner.
Here's spliterator:
static class BubbleSpliterator<T> implements Spliterator<T> {
private final Comparator<? super T> cmp;
private final Spliterator<T> source;
private T[] data;
private int offset;
public BubbleSpliterator(Spliterator<T> source, Comparator<? super T> cmp) {
this.source = source;
this.cmp = cmp;
}
@SuppressWarnings("unchecked")
private void init() {
if (data != null)
return;
Stream.Builder<T> buf = Stream.builder();
source.forEachRemaining(buf);
data = (T[]) buf.build().toArray();
bubble(data, cmp);
}
private static <T> void bubble(T[] data, Comparator<? super T> cmp) {
for (int i = 0; i < data.length - 1; i++)
for (int j = i + 1; j < data.length; j++) {
if (cmp.compare(data[i], data[j]) > 0) {
T tmp = data[i];
data[i] = data[j];
data[j] = tmp;
}
}
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
init();
if (offset >= data.length)
return false;
action.accept(data[offset++]);
return true;
}
@Override
public void forEachRemaining(Consumer<? super T> action) {
init();
for (int i = offset; i < data.length; i++)
action.accept(data[i]);
offset = data.length;
}
@Override
public Spliterator<T> trySplit() {
if (data != null)
return null;
Spliterator<T> prefix = source.trySplit();
return prefix == null ? null : new BubbleSpliterator<>(prefix, cmp);
}
@Override
public long estimateSize() {
if (data != null)
return data.length - offset;
return source.estimateSize();
}
@Override
public int characteristics() {
return source.characteristics();
}
public static <T> Stream<T> stream(Stream<T> source,
Comparator<? super T> comparator) {
Spliterator<T> spltr = source.spliterator();
return StreamSupport.stream(new BubbleSpliterator<>(spltr, comparator),
source.isParallel()).onClose(source::close);
}
}
It takes source, delegates splitting to the source, but when elements are requested it dumps the source elements to array and performs a bubble sorting for them. You may check it like this:
int[] data = new Random(1).ints(100, 0, 1000).toArray();
Comparator<Integer> comparator = Comparator.naturalOrder();
List<Integer> list = BubbleSpliterator.stream(Arrays.stream(data).parallel().boxed(), comparator).collect(
Collectors.toList());
System.out.println(list);
The result depends on number of hardware threads on your machine and may look like this:
[254, 313, 588, 847, 904, 985, 434, 473, 569, 606, 748, 978, 234, 262, 263, 317, 562, 592, 99, 189, 310,...]
Here you can see that output consists of several sorted sequences. The number of such sequences corresponds to the number of parallel tasks Stream API creates.
Now to combine sorted sequences via merge sorting you may write a special collector like this:
static <T> List<T> merge(List<T> l1, List<T> l2, Comparator<? super T> cmp) {
List<T> result = new ArrayList<>(l1.size()+l2.size());
int i=0, j=0;
while(i < l1.size() && j < l2.size()) {
if(cmp.compare(l1.get(i), l2.get(j)) <= 0) {
result.add(l1.get(i++));
} else {
result.add(l2.get(j++));
}
}
result.addAll(l1.subList(i, l1.size()));
result.addAll(l2.subList(j, l2.size()));
return result;
}
static <T> Collector<T, ?, List<T>> mergeSorting(Comparator<? super T> cmp) {
return Collector.<T, List<T>> of(ArrayList::new, List::add,
(l1, l2) -> merge(l1, l2, cmp));
}
In sequential more it works just like the Collectors.toList(), but in parallel it performs merge sorting assuming that both input lists are already sorted. My mergeSorting implementation is probably suboptimal, you may write something better.
So to sort everything via Stream API, you can use BubbleSpliterator and mergeSorting collector together:
int[] data = new Random(1).ints(100, 0, 1000).toArray();
Comparator<Integer> comparator = Comparator.naturalOrder();
List<Integer> list = BubbleSpliterator.stream(Arrays.stream(data).parallel().boxed(), comparator).collect(
mergeSorting(comparator));
System.out.println(list);
The result will be completely sorted.
This implementation performs unnecessary copying of input data several times, so I guess, custom bubble+merge implementation could beat this one in terms of performance.
if you want to sort an array in parallel using Java 8 Stream API, this may help you :
IntStream randomIntegers = ThreadLocalRandom.current().ints(100, 0, 100);
int[] sortedArray = randomIntegers
.parallel() // (1)
.sorted() // (2)
.toArray();
System.out.println(Arrays.toString(sortedArray));
no matter what type of Stream you have,
just invoke parallel() and then sorted().
(the order of invocation is not important)
by tracing the code we find that :
final class SortedOps {
private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
//...
@Override
public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator,
IntFunction<Integer[]> generator) {
if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
return helper.evaluate(spliterator, false, generator);
} else {
Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
int[] content = n.asPrimitiveArray();
Arrays.parallelSort(content); // <== this
return Nodes.node(content);
}
}
}
private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
//...
@Override
public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
IntFunction<T[]> generator) {
// If the input is already naturally sorted and this operation
// naturally sorts then collect the output
if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
return helper.evaluate(spliterator, false, generator);
}
else {
// @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
Arrays.parallelSort(flattenedData, comparator); // <== this
return Nodes.node(flattenedData);
}
}
}
}
Arrays.parallelSort() is used to sort the backing array.