Въведение в RxJava

1. Общ преглед

В тази статия ще се съсредоточим върху използването на реактивни разширения (Rx) в Java за съставяне и консумиране на поредици от данни.

С един поглед API може да изглежда подобно на Java 8 Streams, но всъщност е много по-гъвкав и плавен, което го прави мощна парадигма за програмиране.

Ако искате да прочетете повече за RxJava, разгледайте това писане.

2. Настройка

За да използваме RxJava в нашия проект Maven, ще трябва да добавим следната зависимост към нашия pom.xml:

 io.reactivex rxjava ${rx.java.version} 

Или за проект на Gradle:

compile 'io.reactivex.rxjava:rxjava:x.y.z'

3. Функционални реактивни концепции

От една страна, функционалното програмиране е процес на изграждане на софтуер чрез съставяне на чисти функции, избягване на споделено състояние, променящи се данни и странични ефекти.

От друга страна, реактивното програмиране е асинхронна програмна парадигма, свързана с потоците от данни и разпространението на промяната.

Заедно функционалното реактивно програмиране формира комбинация от функционални и реактивни техники, които могат да представляват елегантен подход към програмирано от събития програми - със стойности, които се променят с течение на времето и когато потребителят реагира на данните, когато постъпят.

Тази технология обединява различни приложения на основните си принципи, някои автори излязоха с документ, който определя общия речник за описване на новия тип приложения.

3.1. Реактивен манифест

Reactive Manifesto е онлайн документ, който излага висок стандарт за приложения в индустрията за разработка на софтуер. Просто казано, реактивните системи са:

  • Отзивчиви - системите трябва да реагират своевременно
  • Съобщение задвижвано - системите трябва да използват асинхронно предаване на съобщения между компонентите, за да осигурят свободно свързване
  • Еластични - системите трябва да останат отзивчиви при голямо натоварване
  • Еластична - системите трябва да останат отзивчиви при отказ на някои компоненти

4. Наблюдаеми

Има два ключови типа, които трябва да се разберат при работа с Rx:

  • Observable представлява всеки обект, който може да получи данни от източник на данни и чието състояние може да представлява интерес по начин, по който други обекти могат да регистрират интерес
  • Един наблюдател е всеки обект, който желае да бъде уведомен, когато състоянието на още промени обект

Един наблюдател се присъединява към наблюдавани последователност. Последователността изпраща елементи на наблюдателя един по един.

На наблюдателя се справя всеки един преди обработката на следващия. Ако много събития влязат асинхронно, те трябва да се съхраняват в опашка или да се пуснат.

В Rx , наблюдател никога няма да бъде извикан с елемент, който не е в ред или извикан преди обратното обаждане да се върне за предишния елемент.

4.1. Видове наблюдаеми

Има два вида:

  • Неблокиращо - поддържа се асинхронно изпълнение и има право да се отпише във всяка точка от потока на събитията. В тази статия ще се съсредоточим най-вече върху този тип
  • Блокиране - всички обаждания на onNext наблюдател ще бъдат синхронни и не е възможно да се отпишете в средата на поток от събития. Винаги можем да преобразуваме Observable в Blocking Observable , използвайки метода toBlocking:
BlockingObservable blockingObservable = observable.toBlocking();

4.2. Оператори

Един оператор е функция, която отнема един О bservable (източник), както първия си аргумент и връща друг видими (дестинацията). След това за всеки елемент, който източникът на наблюдаемото излъчва, той ще приложи функция към този елемент и след това ще излъчи резултата от целевия наблюдаем .

Операторите могат да бъдат свързани заедно, за да създадат сложни потоци от данни, които филтрират събитие въз основа на определени критерии. Към един и същ наблюдаем могат да се приложат множество оператори .

Не е трудно да се окажете в ситуация, в която наблюдава се излъчват предметите по-бързо от един оператор или наблюдател може да ги консумират. Можете да прочетете повече за обратния натиск тук.

4.3. Създайте Observable

Основният оператор просто създава Observable, който излъчва един общ екземпляр, преди да завърши, String “Hello”. Когато искаме да извлечем информация от Observable , ние прилагаме интерфейс на наблюдател и след това извикваме абонамент за желания Observable:

Observable observable = Observable.just("Hello"); observable.subscribe(s -> result = s); assertTrue(result.equals("Hello"));

4.4. OnNext, OnError и OnCompleted

Има три метода на интерфейса на наблюдателя , за които искаме да знаем:

  1. OnNext се извиква от нашия наблюдател всеки път, когато се публикува ново събитие в приложеното Observable . Това е методът, при който ще извършим някои действия за всяко събитие
  2. OnCompleted се извиква, когато последователността от събития, свързани с Observable, е завършена, което показва, че не трябва да очакваме повече onNext повиквания на нашия наблюдател
  3. OnError се извиква, когато се обработва необработено изключение по време на рамковия код RxJava или нашия код за обработка на събития

Връщаната стойност за метода на Observables абонамент е интерфейс за абонамент :

String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable observable = Observable.from(letters); observable.subscribe( i -> result += i, //OnNext Throwable::printStackTrace, //OnError () -> result += "_Completed" //OnCompleted ); assertTrue(result.equals("abcdefg_Completed"));

5. Наблюдаеми трансформации и условни оператори

5.1. Карта

Операторът m ap преобразува елементи, излъчени от Observable, като прилага функция към всеки елемент.

Да приемем, че има деклариран масив от низове, който съдържа някои букви от азбуката и искаме да ги отпечатаме в главен режим:

Observable.from(letters) .map(String::toUpperCase) .subscribe(letter -> result += letter); assertTrue(result.equals("ABCDEFG"));

FlatMap може да се използва за изравняване на Observables винаги, когато се окажем с вложени Observables.

Повече подробности за разликата между map и flatMap можете да намерите тук.

Ако приемем, че имаме метод, който връща Observable от списък с низове. Сега ще печатаме за всеки низ от нов наблюдаем списък със заглавия въз основа на това, което вижда Абонатът :

Observable getTitle() { return Observable.from(titleList); } Observable.just("book1", "book2") .flatMap(s -> getTitle()) .subscribe(l -> result += l); assertTrue(result.equals("titletitle"));

5.2. Сканиране

The scan operator applies a function to each item emitted by an Observable sequentially and emits each successive value.

It allows us to carry forward state from event to event:

String[] letters = {"a", "b", "c"}; Observable.from(letters) .scan(new StringBuilder(), StringBuilder::append) .subscribe(total -> result += total.toString()); assertTrue(result.equals("aababc"));

5.3. GroupBy

Group by operator allows us to classify the events in the input Observable into output categories.

Let's assume that we created an array of integers from 0 to 10, then apply group by that will divide them into the categories even and odd:

Observable.from(numbers) .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD") .subscribe(group -> group.subscribe((number) -> { if (group.getKey().toString().equals("EVEN")) { EVEN[0] += number; } else { ODD[0] += number; } }) ); assertTrue(EVEN[0].equals("0246810")); assertTrue(ODD[0].equals("13579"));

5.4. Filter

The operator filter emits only those items from an observable that pass a predicate test.

So let's filter in an integer array for the odd numbers:

Observable.from(numbers) .filter(i -> (i % 2 == 1)) .subscribe(i -> result += i); assertTrue(result.equals("13579"));

5.5. Conditional Operators

DefaultIfEmpty emits item from the source Observable, or a default item if the source Observable is empty:

Observable.empty() .defaultIfEmpty("Observable is empty") .subscribe(s -> result += s); assertTrue(result.equals("Observable is empty"));

The following code emits the first letter of the alphabet ‘a' because the array letters is not empty and this is what it contains in the first position:

Observable.from(letters) .defaultIfEmpty("Observable is empty") .first() .subscribe(s -> result += s); assertTrue(result.equals("a"));

TakeWhile operator discards items emitted by an Observable after a specified condition becomes false:

Observable.from(numbers) .takeWhile(i -> i  sum[0] += s); assertTrue(sum[0] == 10);

Of course, there more others operators that could cover our needs like Contain, SkipWhile, SkipUntil, TakeUntil, etc.

6. Connectable Observables

A ConnectableObservable resembles an ordinary Observable, except that it doesn't begin emitting items when it is subscribed to, but only when the connect operator is applied to it.

In this way, we can wait for all intended observers to subscribe to the Observable before the Observable begins emitting items:

String[] result = {""}; ConnectableObservable connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish(); connectable.subscribe(i -> result[0] += i); assertFalse(result[0].equals("01")); connectable.connect(); Thread.sleep(500); assertTrue(result[0].equals("01"));

7. Single

Single is like an Observable who, instead of emitting a series of values, emits one value or an error notification.

With this source of data, we can only use two methods to subscribe:

  • OnSuccess returns a Single that also calls a method we specify
  • OnError also returns a Single that immediately notifies subscribers of an error
String[] result = {""}; Single single = Observable.just("Hello") .toSingle() .doOnSuccess(i -> result[0] += i) .doOnError(error -> { throw new RuntimeException(error.getMessage()); }); single.subscribe(); assertTrue(result[0].equals("Hello"));

8. Subjects

A Subject is simultaneously two elements, a subscriber and an observable. As a subscriber, a subject can be used to publish the events coming from more than one observable.

And because it's also observable, the events from multiple subscribers can be reemitted as its events to anyone observing it.

In the next example, we'll look at how the observers will be able to see the events that occur after they subscribe:

Integer subscriber1 = 0; Integer subscriber2 = 0; Observer getFirstObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber1 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber1 completed"); } }; } Observer getSecondObserver() { return new Observer() { @Override public void onNext(Integer value) { subscriber2 += value; } @Override public void onError(Throwable e) { System.out.println("error"); } @Override public void onCompleted() { System.out.println("Subscriber2 completed"); } }; } PublishSubject subject = PublishSubject.create(); subject.subscribe(getFirstObserver()); subject.onNext(1); subject.onNext(2); subject.onNext(3); subject.subscribe(getSecondObserver()); subject.onNext(4); subject.onCompleted(); assertTrue(subscriber1 + subscriber2 == 14)

9. Resource Management

Using operation allows us to associate resources, such as a JDBC database connection, a network connection, or open files to our observables.

Тук представяме в коментари стъпките, които трябва да направим, за да постигнем тази цел, а също и пример за изпълнение:

String[] result = {""}; Observable values = Observable.using( () -> "MyResource", r -> { return Observable.create(o -> { for (Character c : r.toCharArray()) { o.onNext(c); } o.onCompleted(); }); }, r -> System.out.println("Disposed: " + r) ); values.subscribe( v -> result[0] += v, e -> result[0] += e ); assertTrue(result[0].equals("MyResource"));

10. Заключение

В тази статия говорихме как да използваме библиотеката RxJava, както и как да изследваме най-важните й характеристики.

Пълният изходен код за проекта, включително всички използвани тук кодови проби, може да бъде намерен в Github.