This page looks best with JavaScript enabled

Mysteriously broken reduction operation on parallel streams in Java (explained)

 ·  ☕ 11 min read

How to not get into trouble writing custom reduction operation (.reduce()) on (parallel) streams with Java 8+ (including Java 14).

5 parallel streams of waterfall

Introduction

Recently…, well this article has spent years in my freezer for the blog posts. Originally, I have encountered this problem back in 2014 while preparing my Java 8 training session (recently renamed to Java - functional thinking ). Then, I noticed that it is possible to achieve interesting results writing a custom reduce operation implementation when a parallel stream is used. For various reasons, the partially written article was waiting for its completion around 6 years. Time flies, however, the case is still valid with the latest - as a time of writing - Java 14. So, let’s check it out.

Our case - simple number concatenation using reduce

As a short introduction, a reduction operation (also called a fold) takes input elements (here of a stream) and collects them into combined summary result. Some well know specialized reduction variants, available in the Stream API, include, inter alia, sum(), min() and count(). Nevertheless, we can write our custom, non-standard reduction operation (if needed).

Let’s assume we would like to concatenate a list of numbers into a string. A case which could be easier to achieve in other ways, but simple enough to do not focus on realized logic, but rather a nuances of a reduction operation itself. A reduce method on stream can be used:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Test
public void shouldConcatNumbersUsingReduce() {
    //given
    final List<Integer> input = Arrays.asList(1, 2, 3, 4);

    //when
    final String result = input.stream()
            .reduce(new StringBuilder(),
                    StringBuilder::append,
                    StringBuilder::append)
            .toString();

    //then
    assertThat(result).isEqualToIgnoringCase("1234");   //passes
}

Very compact and does its work. Numbers are concatenated. Similar implementations are common in internet and everything is ok unless you try to use it with the parallel stream:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
@Test
public void shouldConcatNumbersUsingReduceParallel() {   //broken!
    //given
    List<Integer> input = Arrays.asList(1, 2, 3, 4);

    //when
    String result = input.parallelStream()
            .reduce(new StringBuilder(),
                    StringBuilder::append,
                    StringBuilder::append)
            .toString();

    //then
    assertThat(result).isEqualToIgnoringCase("1234");   //ups!
}

This test fails with the assertion error similar to that:

1
2
3
4
5
java.lang.AssertionError: 
Expecting:
 <"343421343421343421343421">
to be equal to:
 <"1234">

It looks strange. Very often incomprehensible things are caused by some race condition in the parallel code which matches parallelStream(). But first let’s analyze what happens in a reduction operation step by step (if you are savvy in sequential reduction operations skip to the subsequent section).

Reduction in sequential stream

If you are not very familiar with method references the example can look a little bit cryptic. Let’s rewrite it using lambda expressions (instead method references):

1
2
3
4
5
String result = input.stream()
        .reduce(new StringBuilder(),
                (acc, x) -> acc.append(x),      //accumulation
                (sb1, sb2) -> sb1.append(sb2))  //combining
        .toString();

The new StringBuilder is used as an initial value (identity). For every element from an input list the second line in reduce will be called ((acc, x) -> acc.append(x)) with acc (an accumulator) containing concatenated already processed elements and x the current number. x is appended to the previously concatenated elements.

From the algorithm analysis, the expected values in subsequent iterations should be as follows:

iteration 0 1 2 3
acc "” “1” “12” “123”
x 1 2 3 4
returned “1” “12” “123” “1234”

Is this ok? Let’s verify it in code.

As debugging (prospectively) concurrent code is hard, especially in a blog post, let’s use plain old debug outputs, well known from C.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
String result = input.stream()
    .reduce(
        new StringBuilder(),
        (acc, x) -> {
            String accString = acc.toString();
            StringBuilder ret = acc.append(x);
            log.info("Accumulation: '{}', x: '{}', returned: '{}'", format("%4s", accString), 
                                     format("%4s", x), format("%4s", ret));
            return ret;
        },
        (sb1, sb2) -> {
            String sb1String = sb1.toString();
            StringBuilder ret = sb1.append(sb2);
            log.info("Combining: sb1: '{}', sb2: '{}', returned: '{}'", format("%4s", sb1String),
                                 format("%4s", sb2), format("%4s", ret));
            return ret;
        })
    .toString();

We slightly complicated the code, but it is only for the test (debug) purpose. The output is as expected:

1
2
3
4
18:09:05.839 [main] - Accumulation: acc: '    ', x: '   1', returned: '   1'
18:09:05.844 [main] - Accumulation: acc: '   1', x: '   2', returned: '  12'
18:09:05.844 [main] - Accumulation: acc: '  12', x: '   3', returned: ' 123'
18:09:05.844 [main] - Accumulation: acc: ' 123', x: '   4', returned: '1234'

Let’s skip for a while the log from the combining operation (line 14). It is not used in sequential streams.

In the output, we also clearly see that all operations take place in the main thread. Which is expected for sequential synchronous stream processing.

Reduction in parallel stream - behavior in working implementation

In a parallel stream, accumulation operations can be performed in different threads. Those partial results are later combined using lambda expression passed as the third argument (a combiner) in a reduce method.

Let’s take a look at the diagnostic messages emitted during a sample execution with parallel stream.

1
2
3
4
5
6
7
[ForkJoinPool.commonPool-worker-3] - Accumulation: acc: '    ', x:   '   1', returned: '   1'
[ForkJoinPool.commonPool-worker-1] - Accumulation: acc: '    ', x:   '   2', returned: '   2'
[main]                             - Accumulation: acc: '    ', x:   '   3', returned: '   3'
[ForkJoinPool.commonPool-worker-1] - Combining   : sb1: '   1', sb2: '   2', returned: '  12'
[ForkJoinPool.commonPool-worker-2] - Accumulation: acc: '    ', x:   '   4', returned: '   4'
[ForkJoinPool.commonPool-worker-2] - Combining   : sb1: '   3', sb2: '   4', returned: '  34'
[ForkJoinPool.commonPool-worker-2] - Combining   : sb1: '  12', sb2: '  34', returned: '1234'

Let’s analyze it line by line.

In the first 2 lines:

1
2
[ForkJoinPool.commonPool-worker-3] - Accumulation: acc: '    ', x:   '   1', returned: '   1'
[ForkJoinPool.commonPool-worker-1] - Accumulation: acc: '    ', x:   '   2', returned: '   2'

the accumulation operation is performed in parallel on the two first elements on the list. In the forth line those partial results are combined into a larger list:

4
[ForkJoinPool.commonPool-worker-1] - Combining   : sb1: '   1', sb2: '   2', returned: '  12'

The same situation, for the rest of the items, takes place in the line 3, 5 and 6.

3
4
5
6
[main]                             - Accumulation: acc: '    ', x:   '   3', returned: '   3'
...
[ForkJoinPool.commonPool-worker-2] - Accumulation: acc: '    ', x:   '   4', returned: '   4'
[ForkJoinPool.commonPool-worker-2] - Combining   : sb1: '   3', sb2: '   4', returned: '  34'

And the end, two partial result: ‘12’ and ‘34’ are combined together to provide the final result:

7
[ForkJoinPool.commonPool-worker-2] - Combining   : sb1: '  12', sb2: '  34', returned: '1234'

Do you have - after logging at the listings - any suspicious what could be wrong with the original implementation? With the debug statements it much easier to spot it.

Reduction in parallel stream with widespread (broken) implementation

Let’s recall the widespread (broken when running in parallel) implementation once more:

1
2
3
4
5
String result = input.parallelStream()
        .reduce(new StringBuilder(),
                (acc, x) -> acc.append(x),      //accumulation
                (sb1, sb2) -> sb1.append(sb2))  //combining
        .toString();

together with its debug enhanced variant:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
String result = input.parallelStream()
    .reduce(
        new StringBuilder(),
        (acc, x) -> {
            String accString = acc.toString();
            StringBuilder ret = acc.append(x);
            log.info("Accumulation: '{}', x: '{}', returned: '{}'", format("%4s", accString), 
                                     format("%4s", x), format("%4s", ret));
            return ret;
        },
        (sb1, sb2) -> {
            String sb1String = sb1.toString();
            StringBuilder ret = sb1.append(sb2);
            log.info("Combining: sb1: '{}', sb2: '{}', returned: '{}'", format("%4s", sb1String),
                                 format("%4s", sb2), format("%4s", ret));
            return ret;
        })
    .toString();

The output from the parallel processing execution:

1
2
3
4
5
6
7
10:42:53.334 [ForkJoinPool.commonPool-worker-5]  - Accumulation: acc: '   3', x: '   4', returned: '  34'
10:42:53.334 [ForkJoinPool.commonPool-worker-23] - Accumulation: acc: ' 342', x: '   1', returned: '3421'
10:42:53.334 [ForkJoinPool.commonPool-worker-19] - Accumulation: acc :'  34', x: '   2', returned: '3421'
10:42:53.334 [main]                              - Accumulation: acc :'    ', x: '   3', returned: '   3'
10:42:53.339 [ForkJoinPool.commonPool-worker-19] - Combining: sb1: '3421', sb2: '342134213421', returned: '342134213421'
10:42:53.339 [main]                              - Combining: sb1: '3421', sb2: '342134213421', returned: '342134213421'
10:42:53.340 [main]                              - Combining: sb1: '342134213421', sb2: '342134213421342134213421', returned: '342134213421342134213421'

The strange things start to happen from the first line. Instead of having an empty accumulator (the “acc” variable) there is “3”. In the second line there is already “342” (still instead of an empty StringBuilder). We discover empty accumulator in the 3rd line, which anyway could be the 1st line (it is a matter of logging operations from multiple threads - they were executed at “the same moment”). Nonetheless, all the first 4 lines should have the accumulator empty. It clearly shows there is a problem with a mutable data structure shared across the threads, in both accumulation and combining phrases.

A look at the StringBuilder documentation ensures us that “append” adds a provided value to the StringBuilder and returns its instance (aka this). Mutating input variables is not recommended in the stream processing, however the root of the problem is a fact that initial “neutral” value (identity) passes to every initial accumulation operation is one and the same StringBuilder instance. It is passed as the first element of the “reduce” method execution.

Someone could say: “let’s use StringBuffer - its operations are thread safe”. However, it will not fix the situation here. Having multiple threads operating on the same structure (even thread safe) is also broken assuming that all initial operations assumes a private accumulator instance to create an independent list of elements which will be concatenated into the biggest one in the following steps.

Reduction in parallel stream - working implementation

Main case

With StringBuffer and string concatenation the only required modification is not mutating the input objects:

1
2
3
4
5
6
String result = input.parallelStream()
        .reduce(
                new StringBuilder(),
                (acc, x) -> new StringBuilder(acc).append(x),
                (sb1, sb2) -> new StringBuilder(sb1).append(sb2))
        .toString();

In every iteration the new StringBuilder instance is created. Thanks to that there is no mutation of the initial identity element shared across threads. In general mutation of the stream elements (or doing side effects in general) can lead to some unpredictable situations, especially in the parallel execution.

Java API limitations

The above change solves the problem (at the cost of greatly increased number of created objects, but let’s skip it for a moment). Unfortunately, many structures, especially those from Java Collection Framework which remembers Java 1.2 (releases 1998 - over 20 years ago!), does not provide so friendly API to perform operations with the method chaining syntax/approach.

Let’s take a look at the (artificial) working example of using the java.util.List API in a reduce operation simulating mapping to double every element of the input list:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
List<Integer> doubledValues = stream
    .reduce(Collections.emptyList(),
            (list, x) -> {
                List<Integer> resultList = new ArrayList<>(list);
                resultList.add(x * 2);
                return resultList;
            },
            (list1, list2) -> {
                List<Integer> resultList = new ArrayList<>(list1);
                resultList.addAll(list2);
                return resultList;
            });

True one-liners with a method chaining would make this code more readable.

Immutable collections

In addition to more verbose syntax, Java collections are in general mutable. This makes it harder (more error prone) to use them in (to share the instance between) multiple threads. Thread safety for collections might help, but as I already mentioned in that case it will not (as the initial empty collection is distributed across diffrent threads). However, in some more functional languages such as Scala, not to mention Haskel, it is quite popular to use immutable collections.

They typically has the “first” element, called head and the rest called tail. Adding a new element before head is cheap as the new head can share the old one as tail (which is immutable, remember?). Of course, in some other use cases (such as putting elements between other elements) they cannot keep up with a classic mutable counterparts. and it can be used in different use cases. Nevertheless, it might be an alternative worth consideration. For Java, there is - among others - the collections API in Vavr .

Mutable reduction operation

To deal with cases where mutable operation are much more memory/CPU effective a dedicated Collector can be written which realizes mutable reduction operation . More info about writing a custom Collector can be found in this interesting blog post by Tomasz Nurkiewicz.

Summary

I hope that you were hard enough to get with me to the end of this (quite long and quite low level) blog post :-).

Using this example with reduce, I wanted to show one of the possible problems while dealing with parallel execution and its explaination. It is not a bug in JDK, but rather a presumption in the (parallel) stream API which - as I observe during my testing classes - are not emphasised enough to be common knowledge. It is good to know your tools and use it properly, as well as extend ones awareness about new and alternative solutions. And remember, state mutation is problematic. Prefer immutable data structures if only possible (and feasible).

Lead photo by Free-Photos, published in Pixabay, Pixabay License.
Share on

Marcin Zajączkowski
WRITTEN BY
Marcin Zajączkowski
Software Craftsman & Solution Architect
An experienced architect aiming for high quality solutions. Very engaged in evangelising Software Craftsmanship, Clean Code and Test-Driven Development as a conference speaker and a trainer. A specialist in Continuous Delivery and Continuous Inspection of Code Quality. An enthusiast of Reactive Systems and broadly defined concurrency.

Besides, open source author and contributor, a proud Linux user.


Don't want to use the Utterance bot? Comments can be also placed directly on GitHub.
What's on this Page