Пътеводител за потоците Akka

1. Общ преглед

В тази статия ще разгледаме библиотеката на akka-streams, която е изградена върху рамката на Akka актьор, която се придържа към манифеста за реактивните потоци. API на Akka Streams ни позволява лесно да съставяме потоци за трансформация на данни от независими стъпки.

Освен това цялата обработка се извършва по реактивен, неблокиращ и асинхронен начин.

2. Зависимости на Maven

За да започнем, трябва да добавим библиотеките akka-stream и akka-stream-testkit в нашия pom.xml:

 com.typesafe.akka akka-stream_2.11 2.5.2   com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. API на Akka Streams

За да работим с Akka Streams, трябва да сме наясно с основните концепции на API:

  • Source - входна точка за обработка в библиотеката на akka-stream - можем да създадем екземпляр на този клас от множество източници; например, можем да използваме метода single () , ако искаме да създадем източник от един низ или да създадем източник от итерируеми елементи
  • Flow - основният градивен блок за обработка - всекиекземпляр на Flow има една входна и една изходна стойност
  • Материализатор - можем да го използваме, ако искаме нашият поток да има някои странични ефекти като регистриране или запазване на резултати ; най-често ще предаваме псевдонима NotUsed като Материализатор, за да покажем, че нашият поток не трябва да има странични ефекти
  • Операция с мивка - когато изграждаме поток, тя не се изпълнява, докато не регистрираме операция с мивка върху нея - това е терминална операция, която задейства всички изчисления в целия поток

4. Създаване на потоци в потоци Akka

Нека започнем с изграждането на прост пример, където ще покажем как да създадем и комбинираме множество потоци s - за да обработим поток от цели числа и да изчислим средния променлив прозорец на целочислени двойки от потока.

Ще анализираме разделен с точка и запетая низ от цели числа като вход, за да създадем нашия източник на akka-stream за примера.

4.1. Използване на поток за синтактичен вход

Първо, нека създадем клас DataImporter, който ще вземе екземпляр на ActorSystem , който ще използваме по-късно, за да създадем нашия поток :

public class DataImporter { private ActorSystem actorSystem; // standard constructors, getters... }

След това нека създадем метод parseLine , който ще генерира Списък на цяло число от нашия делимитиран входен низ. Имайте предвид, че тук използваме Java Stream API само за синтактичен анализ:

private List parseLine(String line) { String[] fields = line.split(";"); return Arrays.stream(fields) .map(Integer::parseInt) .collect(Collectors.toList()); }

Нашият първоначален поток ще приложи parseLine към нашия вход, за да създаде поток с входен тип String и изходен тип Integer :

private Flow parseContent() { return Flow.of(String.class) .mapConcat(this::parseLine); }

Когато извикаме метода parseLine () , компилаторът знае, че аргументът на тази ламбда функция ще бъде низ - същият като типа на въвеждане за нашия поток .

Имайте предвид, че ние сме с помощта на mapConcat () метод - еквивалент на Java 8 flatMap () метод - защото искаме да се изглади Списъка на Integer върнати от parseLine () в потока на цяло число , така че следващите стъпки в нашия обработка не се нуждаят от за справяне със Списъка .

4.2. Използване на поток за извършване на изчисления

Към този момент имаме нашия поток от анализирани цели числа. Сега трябва да приложим логика, която да групира всички входни елементи по двойки и да изчисли средната стойност на тези двойки .

Сега ще се създаде поток от Integer ите и ги групирате, като се използва групирани () метод .

След това искаме да изчислим средна стойност.

Тъй като ние не се интересуват от реда, по който ще се обработват тези средни стойности, можем да имаме изчислените средни стойности паралелно използване на множество нишки с помощта на mapAsyncUnordered () метода , минавайки на броя на теми като аргумент за този метод.

Действието, което ще бъде предадено като ламбда на потока, трябва да върне CompletableFuture, защото това действие ще бъде изчислено асинхронно в отделната нишка:

private Flow computeAverage() { return Flow.of(Integer.class) .grouped(2) .mapAsyncUnordered(8, integers -> CompletableFuture.supplyAsync(() -> integers.stream() .mapToDouble(v -> v) .average() .orElse(-1.0))); }

Изчисляваме средните стойности в осем паралелни нишки. Имайте предвид, че използваме Java 8 Stream API за изчисляване на средна стойност.

4.3. Съставяне на множество потоци в един поток

Най- Flow API е перфектен абстракция, която ни позволява да композира множество потоци случаи да се постигне крайната си цел обработка . Можем да имаме гранулирани потоци, където един например анализира JSON, друг прави някаква трансформация, а друг събира статистика.

Такава детайлност ще ни помогне да създадем по-тестваем код, защото можем да тестваме всяка стъпка на обработка независимо.

Създадохме два потока по-горе, които могат да работят независимо един от друг. Сега искаме да ги съставим заедно.

Първо искаме да анализираме нашия входен низ , а след това искаме да изчислим средна стойност за поток от елементи.

Можем да съставяме нашите потоци, използвайки метода via () :

Flow calculateAverage() { return Flow.of(String.class) .via(parseContent()) .via(computeAverage()); }

Създадохме поток с входен тип String и два други потока след него. В parseContent () Flow отнема String вход и връща цяло число , както е изход. В computeAverage () Flow поема че Цяло число и изчислява средно връщане Двойна като типа на изхода.

5. Добавяне на мивка към потока

Както споменахме, до този момент целият Поток все още не е изпълнен, защото е мързелив. За да започнем изпълнението на потока , трябва да дефинираме мивка . В Мивка операция може, например, спасяване на данни в база данни или резултати Изпрати на някои външни уеб услуга.

Да предположим, че имаме клас AverageRepository със следния метод save () , който записва резултати в нашата база данни:

CompletionStage save(Double average) { return CompletableFuture.supplyAsync(() -> { // write to database return average; }); }

Сега искаме да създадем операция Sink , която използва този метод, за да запази резултатите от нашата обработка на потока . За да създадем нашата мивка, първо трябва да създадем поток, който приема резултата от нашата обработка като тип вход . След това искаме да запишем всички наши резултати в базата данни.

Отново не ни интересува подреждането на елементите, така че можем да изпълняваме операциите save () паралелно, използвайки метода mapAsyncUnordered () .

За да създадем мивка от потока , трябва да извикаме toMat () със Sink.ignore () като първи аргумент и Keep.right () като втори, защото искаме да върнем състояние на обработката:

private Sink
    
      storeAverages() { return Flow.of(Double.class) .mapAsyncUnordered(4, averageRepository::save) .toMat(Sink.ignore(), Keep.right()); }
    

6. Определяне на източник за потока

Последното нещо, което трябва да направим, е да създадем източник от входния низ . Можем да приложим поток CalcuAverage () към този източник, използвайки метода via () .

След това, за да добавим Sink към обработката, трябва да извикаме метода runWith () и да предадем мивката storeAverage () , която току-що създадохме:

CompletionStage calculateAverageForContent(String content) { return Source.single(content) .via(calculateAverage()) .runWith(storeAverages(), ActorMaterializer.create(actorSystem)) .whenComplete((d, e) -> { if (d != null) { System.out.println("Import finished "); } else { e.printStackTrace(); } }); }

Note that when the processing is finished we are adding the whenComplete() callback, in which we can perform some action depending on the outcome of the processing.

7. Testing Akka Streams

We can test our processing using the akka-stream-testkit.

The best way to test the actual logic of the processing is to test all Flow logic and use TestSink to trigger the computation and assert on the results.

In our test, we are creating the Flow that we want to test, and next, we are creating a Source from the test input content:

@Test public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() { // given Flow tested = new DataImporter(actorSystem).calculateAverage(); String input = "1;9;11;0"; // when Source flow = Source.single(input).via(tested); // then flow .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem)) .request(4) .expectNextUnordered(5d, 5.5); }

We are checking that we are expecting four input arguments, and two results that are averages can arrive in any order because our processing is done in the asynchronous and parallel way.

8. Conclusion

In this article, we were looking at the akka-stream library.

We defined a process that combines multiple Flows to calculate moving average of elements. Then, we defined a Source that is an entry point of the stream processing and a Sink that triggers the actual processing.

Finally, we wrote a test for our processing using the akka-stream-testkit.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.