Java 8 – don’t be afraid of streams.
23.01.2018

Release of Java 8 in 2014 was really bigger leap forward. After versions 6 and 7 (let’s call ‘em evolutional) it brought again major changes to the language itself, including some paradigm shift in various areas. In this article, we’ll cover the Streams API topic and some basics of lambda expressions, as these two topics are slightly related.

What is Stream API?

It is a new approach in Java, how we can work with collections of objects. It allows us to write collection handling in functional-like, more declarative style and resembles MapReduce style. Declarative way of programming is usually more safe and error prone, then the imperative one. Also code is more readable (even if it looks not so from the start).

In Java 8 every implementation of Collection interface implements method stream() and method parallelStream() (yes, you can get parallel processing for free - but wait). As a result, they provide you Stream<T> instance, which is a stream you can perform actions on. At the end, you make final result based on stream (be it another collection or object).

Performing operations on streams

Basically, there are 2 kinds of operations (with forEach() method exception). First are operations, which takes stream as input, do something and return stream as output. Second are ones, which takes stream as input and aggregate or reduce it to the final object.

In first category, we can use the following (not complete) list of operations:

Second category contains mainly:

And of course, there is also forEach() operation, which does not belongs to above categories. Let’s explain the operations in next section. But let’s start with lambda expressions.

Details

Lambda expressions

We have to start explaining what lambda expressions are. The name lambda expressions comes from theoretical computational model of Lambda calculus. For our purposes, we can simply say, that lambda expression is an unnamed function. We can also store lambda expression in variable and pass it as an argument to method or other lambda expression. But due to the static typing in Java, it is not as straightforward as it is for example in JavaScript.

Syntax for lambda expression is quite simple.

    (arg1, arg2, ...) -> {
        ... //do something here
        return ...;
    }

In the first parentheses (arg1, arg2, ...) we declare parameters of the expression. It is similar like the parameters for method. Only difference is, that we don’t need to declare data type (instead of String a you type only a) as compiler is able to obtain data types from context.

If our expression is only one statement (and often it is), we can write lambda expression in more simple form.

    (arg1, arg2, ...) -> ... //do something here

Result of evaluating such expression is the result of statement inside the lambda expression. No need for explicit return statement.

And finally if our lambda expression takes only one argument, we can omit argument’s parentheses.

    arg1 -> ... //do something here

As said before, we can also store lambda expression in the variable and use it later. Because Java is static typed language, there must be some interface for that purpose. If our lambda expression takes one argument, there is Function<T, R> interface, where T is a type of argument and R is a type of result. For example lambda expression for computing square of integer can be stored to variable as:

    Function<Integer, Integer> square = x -> x * x;

For 2 arguments lambda expression, we can similarly use BiFunction<T, U, R> interface. T is type of first argument, U is type of second argument and R is type of result. Let’s see in code:

    BiFunction<Integer, Integer, Integer> twoSum = (x, y) -> x + y;

When lambda expression with more arguments is required to be stored, then we have to create our own interface. But it is only rare case. Remember, lambda expressions is not replacement of regular methods. It allows us to write small snippets of code and pass it as an argument.

Now we should know basics of lambda expressions, so we can start with operations on streams.

Common rules

There are some rules for lambda expressions, which we have to follow to get our code manageable or even compilable.

First, every lambda expression creates it’s own lexical scope. Variables from outer scope, used in lambda expression, must be final or effective final (behaves like final, even not explicitly declared as final). For example

    int a = 0;
    BiFunction<Integer, Integer, Integer> sum = (x, y) -> x + y + a;

is ok, because a variable is effective final. We can also declare it like final int a = 0;. In contrast

    int a = 0;
    a = 1;
    BiFunction<Integer, Integer, Integer> sum = (x, y) -> x + y + a;

will not compile, because a is neither final nor effective final.

Second, although it is perfectly possible to use another lambda inside current lambda like

    ...
    .stream()
        .forEach(o -> o.getParts().stream()
            .forEach(p -> ...));

I strongly recommend to split the functionality into additional method like

    ...
    .stream()
        .forEach(o -> doSomethingWith(o));

    void doSomethingWith(Object o) {
        o.getParts().stream()
            .forEach(...);
    }

It is more expressive and we can see for the first sight what is going on compared to the first example, which can be easily misinterpreted (imagine even more deeply chained operations).

Third, there is some kind of shortcut, which can replace some lambdas. If we use example from previous rule

    ...
    .stream()
        .forEach(o -> doSomethingWith(o));

    void doSomethingWith(Object o) {
        o.getParts().stream()
            .forEach(...);
    }

lambda o -> doSomethingWith(o) can be rewritten with :: operator, which is a reference of method from some particular class. Let’s see the usage

    ...
    .stream()
        .forEach(this::doSomethingWith);

    void doSomethingWith(Object o) {
        o.getParts().stream()
            .forEach(...);
    }

Of course, we can use :: on different class, for example List::stream.

Mappers and filters

For this category of operations is typical, that they perform operation on stream and returns again stream as result. In context of MapReduce, these operations are the Map part. Another important fact is, that all these operations are lazy evaluated. That means, that all operations will run in one step just after some finalizer (be it collector or reducer) is evaluated. we can stack mappers or filters without affecting the performance

 
collection.stream()
.filter(...)
.map(...)
.flatMap(...)
.collect(...)

and it will use only one step for every element (even if we are tempted to say, that 3 steps would be used for every element). It is really important to understand this concept as it leads to more manageable code. we definitely have not to try to manage everything in one .map() as we would do with for cycle.

map()

The name of map() operation giving us some hint of what the operation supposed to be doing. It takes lambda expression and apply that expression on every element of stream. Lambda expression takes one argument (which is one element of the stream) and perform actions on it. Expression evaluation returns another object, which is treated as result of particular operation. Result of the whole map operation is a stream consisting of objects, which are results of every particular lambda expression evaluation.

For example, let’s have stream integers consisting of [1, 2, 3] numbers. Now we perform following mapping:

    integers.map(x -> x * 2);

As a result, we obtain stream containing [2, 4, 6]. So we simply doubled all the numbers in original stream. Let’s go throughout the whole process step by step.

  1. We start with empty result stream [] and with first item in source [(1), 2, 3]. x -> x * 2 become 1 * 2 and result 2 is stored in result stream.
  2. Result stream is like [2], process second item from source [1, (2), 3]. x -> x * 2 => 2 * 2 => 4 is stored in result stream.
  3. Result stream is like [2, 4], process third item from source [1, 2, (3)]. x -> x * 2 => 3 * 2 => 6 is stored in result stream.
  4. Final result stream is [2, 4, 6].

Important fact is, that result of map is stream. Therefore you can process that stream with another mapping of filtrating as we want.

flatMap()

In some cases, we are processing data containing other collections of data. Imagine, we have factories and in every factory we have employees. Now we want to do something with all employees. Suppose, that every Factory instance have getEmployees() method, which returns Collection of employees belonging to the factory.

In the next text, I will use following mnemonic: [] will be used for stream and () will be used for collection.

We start with collection factories, which is consisting of (fac1, fac2, fac3).

    factories.stream()
        .map(Factory::getEmployees)

Code above leads to the following stream [(e1, e2, e3), (e4), (e5, e6)]. As you can see, we already have all employees in the stream, but there is one layer of indirection. Instead of simple stream of employees, we obtained stream of collections of employees. We need to ‘flatten’ it and this is why flatMap() is available. Main principle is similar to map() - it maps given lambda expression to every element of stream. That’s common for both functions. But for contrast output of the lambda expression for flatMap() must be a stream. And second, output of the lambda expression is not simply appended to the resulting stream. It takes intermediate result stream and join it with actual result of the lambda expression. Enough talking, go for code:

    factories.stream()
        .map(Factory::getEmployees)
        .flatMap(Collection::stream)

Now the result would look like we want to - [e1, e2, e3, e4, e5, e6]. First map creates [(e1, e2, e3), (e4), (e5, e6)], so we have stream of 3 collections. Then we apply on every collection Collection::stream and join resulting streams together.

filter()

Filtering is useful concept, when we want to get rid of some elements from collection. It requires lambda expression, which for given object returns either true or false. When true is returned, object will stay in final stream, when false, object will be thrown away.

Let’s say, we want to filter out odd numbers from [3, 4, 5, 6, 7] stream.

    .filter(x -> x % 2 == 0)

And we’ll get result stream [4, 6]. It can be of course used for example to filter out null values, which is pretty useful in various situations. It is even so useful, that it is already implemented in Objects::nonNull. Let’s show this later in real world examples.

Aggregators or reducers and collectors (finalizer)

These operations takes stream as input and produce aggregation or reduction on it. For example operation sum() can take stream of integers and as result there will be sum of all integers in that list. In context of MapReduce, these operations are the Reduce part. We can call them finalizers too, as these methods finalizes the work with stream and return result.

collect()

Usually we need to get data after processing in some better suiting format than stream. Streams are only one time usable and immutable. Also we can process data during the collecting. That’s why there is a collect() function. There is a lot of predefined collectors, which covers most of the needs, that normal developer wants to use. For example if we want to simply get List out of the stream, we use:

    ... .stream()
    .collect(Collectors.toList());

To obtain set, use:

    ... .stream()
    .collect(Collectors.toSet());

And so on.

There is also possibility to implement own custom collector. We have to implement Collector

reduce()

Reduce operation is useful especially when you want to get one value (non collection) out of the stream. Imagine, you want to sum all values of Integer stream. Perfect situation for reduce operation. We would illustrate process of reduction on summing up stream of integers.

In the next explanation, I will use following mnemonic: [] will be used for stream and () will be used for accumulated value (accumulator).

We would sum integers stream up. First, we identify neutral element in context of operation. For + operation, it is 0 as 0 + x = x. This neutral element will serve as starter value.

  1. Set starter value to accumulator () <- 0.
  2. Apply operation + on every element of list and accumulator, store result in accumulator () <-+ x.
  3. Result is in accumulator and it is returned as return value of the whole operation.

For [1, 2, 3] it looks like this

    ()  <-  0
    (0) <-+ 1
    (1) <-+ 2
    (3) <-+ 3
    return (6)

Back to Java. Calling of .collect(<starter>, (<accumulator>, <value>) -> ...) has following parameters: <starter> is a neutral element in context of operation as we described above and as second parameter we provide lambda expression with 2 arguments - <accumulator> contains current accumulated value and <value> contains current value from stream. For our summing example, we would write

    ... .stream()
    .reduce(0, (acc, x) -> acc + x);

We can use same functionality for example for concatenating strings or for other operations (with different arguments, of course), which stacks multiple values into one.

average(), sum(), min(), max(), …

Those functions are some shorthands for pre-prepared reducers and the functionality is the same as described just above. We can use it instead of implementing own reducers.

We have to be aware of that with count() exception, all those functions are implemented on IntStream, so you have to convert general stream to IntStream like this:

    List<Integer> nums = Arrays.asList(1, 2, 3);
    int sum = nums.stream()
            .mapToInt(i -> i)
            .sum();

Lambda expression in mapToInt() is used to extract integer value out of general stream’s objects. In our case, we are only doing identify operation, because our original stream already contains integer values.

forEach()

forEach() does not fit to any category above. Mapping and reducing always return some value as a result and (ideally) does not rely on side effects. forEach() in opposite rely only on side effects and it does not return anything as result. It resembles traditional for cycle. We should always try to use map and collect / reduce where possible and use forEach() only when needed. Because of those side effects, it has worse testability and it can cause for example race condition when running on parallel stream.

In our example, we want to print out all numbers from the stream. It is perfectly ok for using forEach() as we are using side effect (printing) anyway.

    numbers.stream()
               .forEach(System.out::println);

That’s straightforward. Classical for loop rewritten for streams.

Parallelism

As we said before, streams also introduces easy way, how we can process data in parallel. It is getting more important as modern processors always consists of more than one core. Before Java 8, we were also able to do parallel processing in Java. But we have to take care about threads manually, leading to hard manageable code and potentially buggy. We have also manage by ourselves prevention against race conditions, deadlocks and so on.

What we need to make our stream code run in parallel? Almost nothing. Just use parallelStream() instead of stream() call on the collection. Or use stream().parallel() call. It is easy, isn’t it? But it is not only all bells and whistles. We have to be aware of following:

  1. If our code rely on order of elements, we can’t use parallel stream. That’s logical, because elements are processed simultaneously, no one would guarantee order of processing.
  2. If our code rely on side effects (especially forEach()), it can lead to unpredictable behavior. Imagine for example putting more objects with same key into map simultaneously.
  3. JVM by default preparing and managing thread pool in behind, which is used for parallel stream processing. Basically number of threads corresponds to the number of cores available in system. That’s fine in single user application. But when we are working in multi user environment (like web application), we are in trap. For example 10 users want to start operation, which is using parallel stream. First user start operation and take all available threads (remember, thread pool for streams is JVM scoped, not application nor session scoped). All remaining 9 users have to wait, until at least one thread is not free. That’s a pretty annoying bottleneck.

There is workaround for point 3. We can define custom thread pool for some particular stream processing if it is necessary.

    ForkJoinPool threadPool = new ForkJoinPool(<number_of_threads>);
    int sum = threadPool.submit(<lambda_to_evaluate>).get();

<number_of_threads> defines how many of threads should be used for execution and <lambda_to_evaluate> is expression, which would be evaluated in context of defined thread pool. Actual code for parallel sum of integers (stored in numbers collection) using 2 threads could look like:

    ForkJoinPool threadPool = new ForkJoinPool(2);
    int sum = threadPool.submit(
            () -> numbers.parallelStream()
                .reduce(0, (acc, x) -> acc + x)).get();

Anyway, I would recommend to use parallel streams in multi user environment only when we have good reason to do it. We need to consider more scenarios compared to standard, serial streams.

Real world examples

It is time to show, how to rewrite some ‘old’ code into the ‘new’ one. For every case, there will be 2 examples - one with traditional for loop and one with streams.

    Integer sum = 0;
    for (Integer value : numbers) {
        if (value != null) {
            sum = sum + value;
        }
    }
    Integer sum = numbers.stream()
            .filter(Objects::nonNull)
            .reduce(0, (acc, v) -> acc + v);
    String wordList = "";
    for (String word : words) {
        if (word != null) {
            wordList = wordList + word + " ";
        }
    }
    String wordList = words.stream()
            .filter(Objects::nonNull)
            .reduce("", (acc, w) -> acc + w + " ");
    Map<String, Category> categoryMap = new HashMap<>();
    for (Category cat : categories) {
        if (cat != null) {
            categoryMap.putIfAbsent(cat.getCategory(), cat);
        }
    }
    Map<String, Category> categoryMap = categories.stream()
            .filter(Objects::nonNull)
            .collect(Collectors.toMap(Category::getCategory, Function.identity(), 
                (c1, c2) -> c1));
    Map<String, List<Category>> categoryMap = new HashMap<>();
    for (Category cat : categories) {
        if (cat != null) {
            List<Category> catList = categoryMap.get(cat.getCategory());
            if (catList == null) {
                catList = new ArrayList<>();
            }
            catList.add(cat);
            categoryMap.put(cat.getCategory(), catList);
        }
    }
    Map<String, List<Category>> categoryMap = categories.stream()
            .filter(Objects::nonNull)
            .collect(Collectors.groupingBy(Category::getCategory));

As we can see, the code with streams is more declarative, therefore it can be considered safer. It’s harder to make an error and that’s the main benefit.

Conclusion

As Stream API is quite established in Java nowadays, it makes sense to use it regularly. It can bring more readability, safety and better expressiveness into our code. In some case, we can also take advantages of parallel code execution without explicitly dealing directly with threads. Parallel processing will be probably more and more important in the future.