How to not get into trouble writing custom reduction operation (
.reduce()
) on (parallel) streams with Java 8+ (including Java 14).
Introduction
Recently…, well this article has spent years in my freezer for the blog posts. Originally, I have encountered this problem back in 2014 while preparing my Java 8 training session (recently renamed to Java - functional thinking
). Then, I noticed that it is possible to achieve interesting results writing a custom reduce
operation implementation when a parallel stream is used. For various reasons, the partially written article was waiting for its completion around 6 years. Time flies, however, the case is still valid with the latest - as a time of writing - Java 14. So, let’s check it out.
Our case - simple number concatenation using reduce
As a short introduction, a reduction operation (also called a fold) takes input elements (here of a stream) and collects them into combined summary result. Some well know specialized reduction variants, available in the Stream API, include, inter alia, sum()
, min()
and count()
. Nevertheless, we can write our custom, non-standard reduction operation (if needed).
Let’s assume we would like to concatenate a list of numbers into a string. A case which could be easier to achieve in other ways, but simple enough to do not focus on realized logic, but rather a nuances of a reduction operation itself. A reduce
method on stream can be used:
|
|
Very compact and does its work. Numbers are concatenated. Similar implementations are common in internet and everything is ok unless you try to use it with the parallel stream:
|
|
This test fails with the assertion error similar to that:
|
|
It looks strange. Very often incomprehensible things are caused by some race condition in the parallel code which matches parallelStream()
. But first let’s analyze what happens in a reduction operation step by step (if you are savvy in sequential reduction operations skip to the subsequent section).
Reduction in sequential stream
If you are not very familiar with method references the example can look a little bit cryptic. Let’s rewrite it using lambda expressions (instead method references):
|
|
The new StringBuilder
is used as an initial value (identity). For every element from an input list the second line in reduce will be called ((acc, x) -> acc.append(x)
) with acc
(an accumulator) containing concatenated already processed elements and x
the current number. x
is appended to the previously concatenated elements.
From the algorithm analysis, the expected values in subsequent iterations should be as follows:
iteration | 0 | 1 | 2 | 3 |
---|---|---|---|---|
acc | "” | “1” | “12” | “123” |
x | 1 | 2 | 3 | 4 |
returned | “1” | “12” | “123” | “1234” |
Is this ok? Let’s verify it in code.
As debugging (prospectively) concurrent code is hard, especially in a blog post, let’s use plain old debug outputs, well known from C.
|
|
We slightly complicated the code, but it is only for the test (debug) purpose. The output is as expected:
|
|
Let’s skip for a while the log from the combining operation (line 14). It is not used in sequential streams.
In the output, we also clearly see that all operations take place in the main thread. Which is expected for sequential synchronous stream processing.
Reduction in parallel stream - behavior in working implementation
In a parallel stream, accumulation operations can be performed in different threads. Those partial results are later combined using lambda expression passed as the third argument (a combiner) in a reduce
method.
Let’s take a look at the diagnostic messages emitted during a sample execution with parallel stream.
|
|
Let’s analyze it line by line.
In the first 2 lines:
|
|
the accumulation operation is performed in parallel on the two first elements on the list. In the forth line those partial results are combined into a larger list:
|
|
The same situation, for the rest of the items, takes place in the line 3, 5 and 6.
|
|
And the end, two partial result: ‘12’ and ‘34’ are combined together to provide the final result:
|
|
Do you have - after logging at the listings - any suspicious what could be wrong with the original implementation? With the debug statements it much easier to spot it.
Reduction in parallel stream with widespread (broken) implementation
Let’s recall the widespread (broken when running in parallel) implementation once more:
together with its debug enhanced variant:
|
|
The output from the parallel processing execution:
|
|
The strange things start to happen from the first line. Instead of having an empty accumulator (the “acc” variable) there is “3”. In the second line there is already “342” (still instead of an empty StringBuilder). We discover empty accumulator in the 3rd line, which anyway could be the 1st line (it is a matter of logging operations from multiple threads - they were executed at “the same moment”). Nonetheless, all the first 4 lines should have the accumulator empty. It clearly shows there is a problem with a mutable data structure shared across the threads, in both accumulation and combining phrases.
A look at the StringBuilder
documentation ensures us that “append” adds a provided value to the StringBuilder
and returns its instance (aka this
). Mutating input variables is not recommended in the stream processing, however the root of the problem is a fact that initial “neutral” value (identity) passes to every initial accumulation operation is one and the same StringBuilder
instance. It is passed as the first element of the “reduce” method execution.
Someone could say: “let’s use StringBuffer
- its operations are thread safe”. However, it will not fix the situation here. Having multiple threads operating on the same structure (even thread safe) is also broken assuming that all initial operations assumes a private accumulator instance to create an independent list of elements which will be concatenated into the biggest one in the following steps.
Reduction in parallel stream - working implementation
Main case
With StringBuffer
and string concatenation the only required modification is not mutating the input objects:
|
|
In every iteration the new StringBuilder
instance is created. Thanks to that there is no mutation of the initial identity
element shared across threads. In general mutation of the stream elements (or doing side effects in general) can lead to some unpredictable situations, especially in the parallel execution.
Java API limitations
The above change solves the problem (at the cost of greatly increased number of created objects, but let’s skip it for a moment). Unfortunately, many structures, especially those from Java Collection Framework which remembers Java 1.2 (releases 1998 - over 20 years ago!), does not provide so friendly API to perform operations with the method chaining syntax/approach.
Let’s take a look at the (artificial) working example of using the java.util.List
API in a reduce operation simulating mapping to double every element of the input list:
|
|
True one-liners with a method chaining would make this code more readable.
Immutable collections
In addition to more verbose syntax, Java collections are in general mutable. This makes it harder (more error prone) to use them in (to share the instance between) multiple threads. Thread safety for collections might help, but as I already mentioned in that case it will not (as the initial empty collection is distributed across diffrent threads). However, in some more functional languages such as Scala, not to mention Haskel, it is quite popular to use immutable collections.
They typically has the “first” element, called head and the rest called tail. Adding a new element before head is cheap as the new head can share the old one as tail (which is immutable, remember?). Of course, in some other use cases (such as putting elements between other elements) they cannot keep up with a classic mutable counterparts. and it can be used in different use cases. Nevertheless, it might be an alternative worth consideration. For Java, there is - among others - the collections API in Vavr .
Mutable reduction operation
To deal with cases where mutable operation are much more memory/CPU effective a dedicated Collector can be written which realizes mutable reduction operation
. More info about writing a custom Collector
can be found in this interesting blog post
by Tomasz Nurkiewicz.
Summary
I hope that you were hard enough to get with me to the end of this (quite long and quite low level) blog post :-).
Using this example with reduce, I wanted to show one of the possible problems while dealing with parallel execution and its explaination. It is not a bug in JDK, but rather a presumption in the (parallel) stream API which - as I observe during my testing classes - are not emphasised enough to be common knowledge. It is good to know your tools and use it properly, as well as extend ones awareness about new and alternative solutions. And remember, state mutation is problematic. Prefer immutable data structures if only possible (and feasible).
Lead photo by Free-Photos, published in Pixabay, Pixabay License.