1. Общ преглед
API на Stream предоставя богат репертоар от междинни, редукционни и терминални функции, които също поддържат успоредяване.
По-конкретно, операциите за намаляване на потока ни позволяват да създадем един-единствен резултат от поредица от елементи , чрез многократно прилагане на комбинирана операция към елементите в последователността.
В този урок ще разгледаме операцията Stream.reduce () с общо предназначение и ще я видим в някои конкретни случаи на използване.
2. Ключовите понятия: Идентичност, акумулатор и комбиниран
Преди да разгледаме по-задълбочено използването на операцията Stream.reduce () , нека разделим елементите на участниците в операцията на отделни блокове. По този начин ще разберем по-лесно ролята, която играе всеки от тях:
- Identity - елемент, който е първоначалната стойност на операцията за намаляване и резултата по подразбиране, ако потокът е празен
- Акумулатор - функция, която приема два параметъра: частичен резултат от операцията за намаляване и следващия елемент на потока
- Комбинатор - функция, използвана за комбиниране на частичния резултат от операцията за намаляване, когато намаляването е успоредно или когато има несъответствие между типовете аргументи на акумулатора и типовете на изпълнението на акумулатора
3. Използване на Stream.reduce ()
За да разберем по-добре функционалността на елементите за идентичност, акумулатор и комбинатор, нека разгледаме някои основни примери:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int result = numbers .stream() .reduce(0, (subtotal, element) -> subtotal + element); assertThat(result).isEqualTo(21);
В този случай, на цяло число стойност 0 е идентичността. Той съхранява първоначалната стойност на операцията за намаляване, както и резултата по подразбиране, когато потокът от целочислени стойности е празен.
По същия начин, ламбда изразът :
subtotal, element -> subtotal + element
е акумулаторът , тъй като отнема частичната сума от целочислени стойности и следващия елемент в потока.
За да направим кода още по-кратък, можем да използваме препратка към метод, вместо ламбда израз:
int result = numbers.stream().reduce(0, Integer::sum); assertThat(result).isEqualTo(21);
Разбира се, можем да използваме операция намаляване () за потоци, съдържащи други видове елементи.
Например можем да използваме reduce () за масив от String елементи и да ги обединим в един резултат:
List letters = Arrays.asList("a", "b", "c", "d", "e"); String result = letters .stream() .reduce("", (partialString, element) -> partialString + element); assertThat(result).isEqualTo("abcde");
По същия начин можем да преминем към версията, която използва препратка към метод:
String result = letters.stream().reduce("", String::concat); assertThat(result).isEqualTo("abcde");
Нека използваме операцията намаляване () за присъединяване на главни елементи на масива от букви :
String result = letters .stream() .reduce( "", (partialString, element) -> partialString.toUpperCase() + element.toUpperCase()); assertThat(result).isEqualTo("ABCDE");
В допълнение можем да използваме reduce () в паралелизиран поток (повече за това по-късно):
List ages = Arrays.asList(25, 30, 45, 28, 32); int computedAges = ages.parallelStream().reduce(0, a, b -> a + b, Integer::sum);
Когато потокът се изпълнява паралелно, изпълнението на Java разделя потока на множество подпотоци. В такива случаи трябва да използваме функция, която да комбинира резултатите от подпотоците в една единствена . Това е ролята на комбинатора - в горния фрагмент това е препратката към метода Integer :: sum .
Забавно, този код няма да се компилира:
List users = Arrays.asList(new User("John", 30), new User("Julie", 35)); int computedAges = users.stream().reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge());
В този случай имаме поток от потребителски обекти, а типовете на аргументите на акумулатора са Integer и User. Реализацията на акумулатора обаче е сбор от цели числа, така че компилаторът просто не може да изведе типа на потребителския параметър.
Можем да разрешим този проблем, като използваме комбинатор:
int result = users.stream() .reduce(0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); assertThat(result).isEqualTo(65);
Казано по-просто, ако използваме последователни потоци и видовете аргументи на акумулатора и типовете на неговото изпълнение съвпадат, не е нужно да използваме комбинатор .
4. Намаляване в паралел
Както научихме преди, можем да използваме reduce () за паралелизирани потоци.
When we use parallelized streams, we should make sure that reduce() or any other aggregate operations executed on the streams are:
- associative: the result is not affected by the order of the operands
- non-interfering: the operation doesn't affect the data source
- stateless and deterministic: the operation doesn't have state and produces the same output for a given input
We should fulfill all these conditions to prevent unpredictable results.
As expected, operations performed on parallelized streams, including reduce(), are executed in parallel, hence taking advantage of multi-core hardware architectures.
For obvious reasons, parallelized streams are much more performant than the sequential counterparts. Even so, they can be overkill if the operations applied to the stream aren't expensive, or the number of elements in the stream is small.
Of course, parallelized streams are the right way to go when we need to work with large streams and perform expensive aggregate operations.
Let's create a simple JMH (the Java Microbenchmark Harness) benchmark test and compare the respective execution times when using the reduce() operation on a sequential and a parallelized stream:
@State(Scope.Thread) private final List userList = createUsers(); @Benchmark public Integer executeReduceOnParallelizedStream() { return this.userList .parallelStream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); } @Benchmark public Integer executeReduceOnSequentialStream() { return this.userList .stream() .reduce( 0, (partialAgeResult, user) -> partialAgeResult + user.getAge(), Integer::sum); }
In the above JMH benchmark, we compare execution average times. We simply create a List containing a large number of User objects. Next, we call reduce() on a sequential and a parallelized stream and check that the latter performs faster than the former (in seconds-per-operation).
These are our benchmark results:
Benchmark Mode Cnt Score Error Units JMHStreamReduceBenchMark.executeReduceOnParallelizedStream avgt 5 0,007 ± 0,001 s/op JMHStreamReduceBenchMark.executeReduceOnSequentialStream avgt 5 0,010 ± 0,001 s/op
5. Throwing and Handling Exceptions While Reducing
In the above examples, the reduce() operation doesn't throw any exceptions. But it might, of course.
For instance, say that we need to divide all the elements of a stream by a supplied factor and then sum them:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); int divider = 2; int result = numbers.stream().reduce(0, a / divider + b / divider);
This will work, as long as the divider variable is not zero. But if it is zero, reduce() will throw an ArithmeticException exception: divide by zero.
We can easily catch the exception and do something useful with it, such as logging it, recovering from it and so forth, depending on the use case, by using a try/catch block:
public static int divideListElements(List values, int divider) { return values.stream() .reduce(0, (a, b) -> { try { return a / divider + b / divider; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return 0; }); }
While this approach will work, we polluted the lambda expression with the try/catch block. We no longer have the clean one-liner that we had before.
To fix this issue, we can use the extract function refactoring technique, and extract the try/catch block into a separate method:
private static int divide(int value, int factor) { int result = 0; try { result = value / factor; } catch (ArithmeticException e) { LOGGER.log(Level.INFO, "Arithmetic Exception: Division by Zero"); } return result }
Now, the implementation of the divideListElements() method is again clean and streamlined:
public static int divideListElements(List values, int divider) { return values.stream().reduce(0, (a, b) -> divide(a, divider) + divide(b, divider)); }
Assuming that divideListElements() is a utility method implemented by an abstract NumberUtils class, we can create a unit test to check the behavior of the divideListElements() method:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);
Let's also test the divideListElements() method, when the supplied List of Integer values contains a 0:
List numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 1)).isEqualTo(21);
Finally, let's test the method implementation when the divider is 0, too:
List numbers = Arrays.asList(1, 2, 3, 4, 5, 6); assertThat(NumberUtils.divideListElements(numbers, 0)).isEqualTo(0);
6. Complex Custom Objects
We can also use Stream.reduce() with custom objects that contain non-primitive fields. To do so, we need to provide a relevant identity, accumulator, and combiner for the data type.
Suppose our User is part of a review website. Each of our Users can possess one Rating, which is averaged over many Reviews.
First, let's start with our Review object. Each Review should contain a simple comment and score:
public class Review { private int points; private String review; // constructor, getters and setters }
Next, we need to define our Rating, which will hold our reviews alongside a points field. As we add more reviews, this field will increase or decrease accordingly:
public class Rating { double points; List reviews = new ArrayList(); public void add(Review review) { reviews.add(review); computeRating(); } private double computeRating() { double totalPoints = reviews.stream().map(Review::getPoints).reduce(0, Integer::sum); this.points = totalPoints / reviews.size(); return this.points; } public static Rating average(Rating r1, Rating r2) { Rating combined = new Rating(); combined.reviews = new ArrayList(r1.reviews); combined.reviews.addAll(r2.reviews); combined.computeRating(); return combined; } }
We have also added an average function to compute an average based on the two input Ratings. This will work nicely for our combiner and accumulator components.
Next, let's define a list of Users, each with their own sets of reviews.
User john = new User("John", 30); john.getRating().add(new Review(5, "")); john.getRating().add(new Review(3, "not bad")); User julie = new User("Julie", 35); john.getRating().add(new Review(4, "great!")); john.getRating().add(new Review(2, "terrible experience")); john.getRating().add(new Review(4, "")); List users = Arrays.asList(john, julie);
Сега, когато се отчитат Джон и Джули, нека използваме Stream.reduce (), за да изчислим средна оценка за двамата потребители. Като идентичност , нека върнем нов рейтинг, ако нашият списък за въвеждане е празен :
Rating averageRating = users.stream() .reduce(new Rating(), (rating, user) -> Rating.average(rating, user.getRating()), Rating::average);
Ако правим математика, трябва да открием, че средната оценка е 3,6:
assertThat(averageRating.getPoints()).isEqualTo(3.6);
7. Заключение
В този урок научихме как да използваме операцията Stream.reduce () . Освен това научихме как да извършваме редукции на последователни и паралелизирани потоци и как да боравим с изключения, докато намаляваме .
Както обикновено, всички примерни кодове, показани в този урок, са достъпни в GitHub.