RxJava 2 - Течаща

1. Въведение

RxJava е Java реализация на Reactive Extensions, която ни позволява да пишем управлявани от събития и асинхронни приложения. Повече информация за това как да използвате RxJava можете да намерите в нашата встъпителна статия тук.

RxJava 2 е пренаписана от нулата, което донесе множество нови функции; някои от които са създадени като отговор на проблеми, съществували в предишната версия на рамката.

Една от тези функции е io.reactivex.Flowable .

2. Наблюдаем срещу . Течаща

В предишната версия на RxJava имаше само един основен клас за работа с източници, знаещи за обратното налягане и несъобразени с обратното налягане - Observable.

RxJava 2 въведе ясно разграничение между тези два вида източници - източниците, знаещи обратното налягане, сега са представени с помощта на специален клас - Flowable.

Наблюдаемите източници не поддържат противоналягане. Поради това трябва да го използваме за източници, които просто консумираме и не можем да повлияем.

Също така, ако имаме работа с голям брой елементи, могат да възникнат два възможни сценария, свързани с обратно налягане, в зависимост от типа на наблюдаемия .

В случай на използване на така наречения студен наблюдаем “, събитията се излъчват мързеливо, така че сме в безопасност от препълване на наблюдател.

Когато използвате горещо наблюдаемо обаче, това ще продължи да излъчва събития, дори ако потребителят не може да се справи.

3. Създаване на течаща

Има различни начини за създаване на Flowable . Удобно за нас, тези методи изглеждат подобни на методите в Observable в първата версия на RxJava.

3.1. Прост течащ

Можем да създадем Flowable, използвайки метода just () , по същия начин, както бихме могли с Observable:

Flowable integerFlowable = Flowable.just(1, 2, 3, 4);

Въпреки че използването на just () е доста просто, не е много често да се създава Flowable от статични данни и се използва за целите на тестването.

3.2. Течливо от Наблюдаем

Когато имаме видими ние можем лесно да го превърне в Течен помощта на toFlowable () метода :

Observable integerObservable = Observable.just(1, 2, 3); Flowable integerFlowable = integerObservable .toFlowable(BackpressureStrategy.BUFFER);

Забележете, че за да можем да извършим преобразуването, трябва да обогатим Observable с BackpressureStrategy. Ще опишем наличните стратегии в следващия раздел.

3.3. Тече от FlowableOnSubscribe

RxJava 2 представи функционален интерфейс FlowableOnSubscribe , който представлява Flowable, който започва да излъчва събития, след като потребителят се абонира за него.

Поради това всички клиенти ще получат един и същ набор от събития, което прави FlowableOnSubscribe безопасно за обратно налягане.

Когато имаме FlowableOnSubscribe, можем да го използваме за създаване на Flowable :

FlowableOnSubscribe flowableOnSubscribe = flowable -> flowable.onNext(1); Flowable integerFlowable = Flowable .create(flowableOnSubscribe, BackpressureStrategy.BUFFER);

Документацията описва много повече методи за създаване на Flowable.

4. Стратегия за течащ обратен натиск

Някои методи като toFlowable () или create () вземат BackpressureStrategy като аргумент.

В BackpressureStrategy е изброяване, които определят поведението противоналягане, че ние ще се прилага за нашия Течен.

Той може да кешира или да пусне събития или изобщо да не приложи никакво поведение, в последния случай ние ще бъдем отговорни за дефинирането му, използвайки оператори за обратно налягане

BackpressureStrategy е подобен на BackpressureMode, присъстващ в предишната версия на RxJava.

В RxJava 2 има пет различни стратегии.

4.1. Буфер

Ако използваме BackpressureStrategy.BUFFER , източникът ще буферира всички събития, докато абонатът не може да ги консумира :

public void thenAllValuesAreBufferedAndReceived() { List testList = IntStream.range(0, 100000) .boxed() .collect(Collectors.toList()); Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable .toFlowable(BackpressureStrategy.BUFFER) .observeOn(Schedulers.computation()).test(); testSubscriber.awaitTerminalEvent(); List receivedInts = testSubscriber.getEvents() .get(0) .stream() .mapToInt(object -> (int) object) .boxed() .collect(Collectors.toList()); assertEquals(testList, receivedInts); }

Подобно е на извикването на метод onBackpressureBuffer () на Flowable, но не позволява да се дефинира изрично размера на буфера или действието onOverflow.

4.2. Изпускайте

Можем да използваме BackpressureStrategy.DROP, за да отхвърлим събитията, които не могат да бъдат консумирани, вместо да ги буферираме.

Отново това е подобно на използването на onBackpressureDrop () на Flowable :

public void whenDropStrategyUsed_thenOnBackpressureDropped() { Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable .toFlowable(BackpressureStrategy.DROP) .observeOn(Schedulers.computation()) .test(); testSubscriber.awaitTerminalEvent(); List receivedInts = testSubscriber.getEvents() .get(0) .stream() .mapToInt(object -> (int) object) .boxed() .collect(Collectors.toList()); assertThat(receivedInts.size() < testList.size()); assertThat(!receivedInts.contains(100000)); }

4.3. Последен

Използването на BackpressureStrategy.LATEST ще принуди източника да запазва само последните събития, като по този начин ще замени всички предишни стойности, ако потребителят не може да се справи:

public void whenLatestStrategyUsed_thenTheLastElementReceived() { Observable observable = Observable.fromIterable(testList); TestSubscriber testSubscriber = observable .toFlowable(BackpressureStrategy.LATEST) .observeOn(Schedulers.computation()) .test(); testSubscriber.awaitTerminalEvent(); List receivedInts = testSubscriber.getEvents() .get(0) .stream() .mapToInt(object -> (int) object) .boxed() .collect(Collectors.toList()); assertThat(receivedInts.size() < testList.size()); assertThat(receivedInts.contains(100000)); }

BackpressureStrategy.LATEST и BackpressureStrategy.DROP изглеждат много сходни, когато разглеждаме кода.

Въпреки това, BackpressureStrategy.LATEST ще презапише елементи, които ни абонат не могат да се справят и да поддържат само най-новите такива, откъдето идва и името.

BackpressureStrategy.DROP, on the other hand, will discard elements that can't be handled. This means that newest elements won't necessarily be emitted.

4.4. Error

When we're using the BackpressureStrategy.ERROR, we're simply saying that we don't expect backpressure to occur. Consequently, a MissingBackpressureException should be thrown if the consumer can't keep up with the source:

public void whenErrorStrategyUsed_thenExceptionIsThrown() { Observable observable = Observable.range(1, 100000); TestSubscriber subscriber = observable .toFlowable(BackpressureStrategy.ERROR) .observeOn(Schedulers.computation()) .test(); subscriber.awaitTerminalEvent(); subscriber.assertError(MissingBackpressureException.class); }

4.5. Missing

If we use the BackpressureStrategy.MISSING, the source will push elements without discarding or buffering.

The downstream will have to deal with overflows in this case:

public void whenMissingStrategyUsed_thenException() { Observable observable = Observable.range(1, 100000); TestSubscriber subscriber = observable .toFlowable(BackpressureStrategy.MISSING) .observeOn(Schedulers.computation()) .test(); subscriber.awaitTerminalEvent(); subscriber.assertError(MissingBackpressureException.class); }

In our tests, we're excepting MissingbackpressureException for both ERROR and MISSING strategies. As both of them will throw such exception when the source's internal buffer is overflown.

However, it's worth to note that both of them have a different purpose.

We should use the former one when we don't expect backpressure at all, and we want the source to throw an exception in case if it occurs.

The latter one could be used if we don't want to specify a default behavior on the creation of the Flowable. And we're going to use backpressure operators to define it later on.

5. Summary

In this tutorial, we've presented the new class introduced in RxJava 2 called Flowable.

За да намерим повече информация за самия Flowable и неговия API, можем да се обърнем към документацията.

Както винаги всички мостри на кода могат да бъдат намерени в GitHub.