Java 9 реактивни потоци

1. Общ преглед

В тази статия ще разгледаме Java 9 Reactive Streams. Просто казано, ще можем да използваме класа Flow , който затваря основните модули за изграждане на логика за обработка на реактивен поток.

Reactive Streams е стандарт за асинхронна обработка на потоци с неблокиращо обратно налягане. Тази спецификация е дефинирана в Reactive Manifesto и има различни реализации от нея, например RxJava или Akka-Streams.

2. Общ преглед на реактивния API

За да изградим поток , можем да използваме три основни абстракции и да ги съставим в асинхронна логика за обработка.

Всеки поток трябва да обработва събития, които са публикувани в него от издателски екземпляр ; на издателя има един метод - абонирате ().

Ако някой от абонатите иска да получава събития, публикувани от него, той трябва да се абонира за дадения Издател.

Получателят на съобщения трябва да внедри интерфейса на абоната . Обикновено това е краят на всяка обработка на потока, тъй като екземплярът му не изпраща съобщения допълнително.

Можем да мислим за абоната като мивка. Това има четири метода, които трябва да бъдат отменени - onSubscribe (), onNext (), onError () и onComplete (). Ще ги разгледаме в следващия раздел.

Ако искаме да трансформираме входящото съобщение и да го предадем на следващия абонат, трябва да внедрим интерфейса на процесора . Това действа както като абонат, тъй като получава съобщения, така и като издател, защото обработва тези съобщения и ги изпраща за по-нататъшна обработка.

3. Публикуване и консумиране на съобщения

Да предположим, че искаме да създадем прост поток, в който имаме издател, публикуващ съобщения, и прост абонат, консумиращ съобщения, когато те пристигнат - едно по едно.

Нека създадем клас EndSubscriber . Трябва да приложим интерфейса на абоната . След това ще заменим необходимите методи.

Методът onSubscribe () се извиква преди да започне обработката. Екземплярът на Абонамента се предава като аргумент. Това е клас, който се използва за контрол на потока от съобщения между Абонат и Издател:

public class EndSubscriber implements Subscriber { private Subscription subscription; public List consumedElements = new LinkedList(); @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } }

Също така инициализирахме празен списък на consumedElements, който ще бъде използван в тестовете.

Сега трябва да приложим останалите методи от интерфейса на абоната . Основният метод тук е onNext () - той се извиква всеки път, когато издателят публикува ново съобщение:

@Override public void onNext(T item) { System.out.println("Got : " + item); subscription.request(1); }

Имайте предвид, че когато стартирахме абонамента в метода onSubscribe () и когато обработихме съобщение, трябва да извикаме метода request () в Абонамента, за да сигнализираме, че текущият Абонат е готов да консумира повече съобщения.

И накрая, трябва да приложим onError () - който се извиква всеки път, когато в обработката се добави някакво изключение, както и onComplete () - извиква се, когато Publisher е затворен:

@Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); }

Нека напишем тест за процеса на обработка . Ще използваме класа SubmissionPublisher - конструкция от java.util.concurrent - която реализира интерфейса на Publisher .

Ще изпратим N елементи на Издателя - които ще получи нашият EndSubscriber :

@Test public void whenSubscribeToIt_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until( () -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(items) ); }

Имайте предвид, че извикваме метода close () в екземпляра на EndSubscriber. Той ще извика обратно извикване onComplete () отдолу за всеки абонат на дадения издател.

Изпълнението на тази програма ще даде следния резултат:

Got : 1 Got : x Got : 2 Got : x Got : 3 Got : x Done

4. Преобразуване на съобщения

Да кажем, че искаме да изградим подобна логика между издател и абонат , но също така да приложим някаква трансформация.

Ще създадем клас TransformProcessor, който реализира Processor и разширява SubmissionPublisher - тъй като това ще бъде както P ublisher, така и S ubscriber.

Ще предадем функция, която ще трансформира входовете в изходи:

public class TransformProcessor extends SubmissionPublisher implements Flow.Processor { private Function function; private Flow.Subscription subscription; public TransformProcessor(Function function) { super(); this.function = function; } @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { submit(function.apply(item)); subscription.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { close(); } }

Нека сега напишем бърз тест с процес на обработка, в който Издателят публикува String елементи.

Нашият TransformProcessor ще анализира String като Integer - което означава, че трябва да се случи преобразуване тук:

@Test public void whenSubscribeAndTransformElements_thenShouldConsumeAll() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); TransformProcessor transformProcessor = new TransformProcessor(Integer::parseInt); EndSubscriber subscriber = new EndSubscriber(); List items = List.of("1", "2", "3"); List expectedResult = List.of(1, 2, 3); // when publisher.subscribe(transformProcessor); transformProcessor.subscribe(subscriber); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expectedResult) ); }

Имайте предвид, че извикването на метода close () в основния издател ще доведе до извикване на метода onComplete () на TransformProcessor .

Имайте предвид, че всички издатели във веригата за обработка трябва да бъдат затворени по този начин.

5. Контролиране на търсенето на съобщения с помощта на абонамента

Да кажем, че искаме да консумираме само първия елемент от Абонамента, да приложим някаква логика и да завършим обработката. Можем да използваме метода request () , за да постигнем това.

Нека модифицираме нашия EndSubscriber да консумира само N брой съобщения. Ще предадем това число като аргумент на конструктора howMuchMessagesConsume :

public class EndSubscriber implements Subscriber { private AtomicInteger howMuchMessagesConsume; private Subscription subscription; public List consumedElements = new LinkedList(); public EndSubscriber(Integer howMuchMessagesConsume) { this.howMuchMessagesConsume = new AtomicInteger(howMuchMessagesConsume); } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { howMuchMessagesConsume.decrementAndGet(); System.out.println("Got : " + item); consumedElements.add(item); if (howMuchMessagesConsume.get() > 0) { subscription.request(1); } } //... }

Можем да искаме елементи, стига да искаме.

Нека напишем тест, в който искаме да консумираме само един елемент от дадения абонамент:

@Test public void whenRequestForOnlyOneElement_thenShouldConsumeOne() throws InterruptedException { // given SubmissionPublisher publisher = new SubmissionPublisher(); EndSubscriber subscriber = new EndSubscriber(1); publisher.subscribe(subscriber); List items = List.of("1", "x", "2", "x", "3", "x"); List expected = List.of("1"); // when assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); items.forEach(publisher::submit); publisher.close(); // then await().atMost(1000, TimeUnit.MILLISECONDS) .until(() -> assertThat(subscriber.consumedElements) .containsExactlyElementsOf(expected) ); }

Although the publisher is publishing six elements, our EndSubscriber will consume only one element because it signals demand for processing only that single one.

By using the request() method on the Subscription, we can implement a more sophisticated back-pressure mechanism to control the speed of the message consumption.

6. Conclusion

In this article, we had a look at the Java 9 Reactive Streams.

We saw how to create a processing Flow consisting of a Publisher and a Subscriber. We created a more complex processing flow with the transformation of elements using Processors.

И накрая, ние използвахме Абонамента, за да контролираме търсенето на елементи от Абоната.

Изпълнението на всички тези примери и кодови фрагменти може да се намери в проекта GitHub - това е проект на Maven, така че трябва да е лесно да се импортира и да се изпълнява както е.