Филтриране на наблюдаеми в RxJava

1. Въведение

След въведението в RxJava, ще разгледаме операторите за филтриране.

По-специално ще се съсредоточим върху филтрирането, пропускането, филтрирането по време и някои по-усъвършенствани операции по филтриране.

2. Филтриране

Когато работите с Observable , понякога е полезно да изберете само подмножество излъчени елементи. За тази цел RxJava предлага различни възможности за филтриране .

Нека започнем да разглеждаме метода на филтъра .

2.1. Най- филтър оператор

Най-просто казано, операторът на филтъра филтрира Observable, като се уверява, че излъчените елементи отговарят на определено условие , което се предлага под формата на предикат .

Нека да видим как можем да филтрираме само нечетните стойности от излъчените:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .filter(i -> i % 2 != 0); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 3, 5, 7, 9);

2.2. В приемане оператор

Когато се филтрира с take , логиката води до излъчване на първите n елементи, като се игнорират останалите елементи.

Нека да видим как можем да филтрираме източника Observable и да излъчим само първите два елемента:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.take(3); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

2.3. В takeWhile оператор

Когато се използва takeWhile, филтрираният Observable ще продължи да излъчва елементи, докато не срещне първи елемент, който не съответства на предиката.

Нека да видим как можем да използваме takeWhile - с филтриращ предикат:

Observable sourceObservable = Observable.just(1, 2, 3, 4, 3, 2, 1); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .takeWhile(i -> i < 4); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

2.4. В takeFirst оператор

Винаги, когато искаме да излъчим само първия елемент, отговарящ на дадено условие, можем да използваме takeFirst ().

Нека да разгледаме набързо как можем да излъчим първия елемент, който е по-голям от 5:

Observable sourceObservable = Observable .just(1, 2, 3, 4, 5, 7, 6); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .takeFirst(x -> x > 5); filteredObservable.subscribe(subscriber); subscriber.assertValue(7);

2.5. first и firstOrDefault Operators

Подобно поведение може да се постигне с помощта на първия API:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.first(); filteredObservable.subscribe(subscriber); subscriber.assertValue(1);

Ако обаче искаме да посочим стойност по подразбиране, ако не се излъчват елементи, можем да използваме f irstOrDefault :

Observable sourceObservable = Observable.empty(); Observable filteredObservable = sourceObservable.firstOrDefault(-1); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.6. В takeLast оператор

След това, ако искаме да излъчим само последните n елемента, излъчени от Observable , можем да използваме takeLast .

Нека да видим как е възможно да се излъчват само последните три елемента:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.takeLast(3); filteredObservable.subscribe(subscriber); subscriber.assertValues(8, 9, 10);

Трябва да помним, че това забавя излъчването на който и да е елемент от източника Observable, докато не завърши.

2.7. last и lastOrDefault

Ако искаме да излъчим само последния елемент, а след това използвайки takeLast (1) , можем да използваме последен .

Това филтрира Observable , излъчвайки само последния елемент, който по избор проверява филтриращ предикат :

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .last(i -> i % 2 != 0); filteredObservable.subscribe(subscriber); subscriber.assertValue(9);

В случай, че наблюдаваните е празна, ние можем да използваме lastOrDefault , който филтрира на наблюдаемата излъчваща стойността по подразбиране.

Стойността по подразбиране също се издава, ако се използва операторът lastOrDefault и няма елементи, които да проверяват условието за филтриране:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.lastOrDefault(-1, i -> i > 10); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.8. elementAt и elementAtOrDefault Оператори

С оператора elementAt можем да изберем единичен елемент, излъчен от източника Observable , като посочим неговия индекс:

Observable sourceObservable = Observable .just(1, 2, 3, 5, 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.elementAt(4); filteredObservable.subscribe(subscriber); subscriber.assertValue(7);

Въпреки това, elementAt ще хвърли IndexOutOfBoundException , ако посоченият индекс надвишава броя на елементите, отделяни.

За да се избегне тази ситуация, е възможно да се използва elementAtOrDefault - който ще върне стойност по подразбиране в случай, че индексът е извън обхвата:

Observable sourceObservable = Observable .just(1, 2, 3, 5, 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.elementAtOrDefault(7, -1); filteredObservable.subscribe(subscriber); subscriber.assertValue(-1);

2.9. В ofType оператор

Винаги, когато Observable излъчва обектни елементи, е възможно да ги филтрирате въз основа на техния тип.

Нека да видим как можем да филтрираме само излъчените елементи от типа String :

Observable sourceObservable = Observable.just(1, "two", 3, "five", 7, 11); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.ofType(String.class); filteredObservable.subscribe(subscriber); subscriber.assertValues("two", "five");

3. Пропускане

On the other hand, when we want to filter out or skip some of the items emitted by an Observable, RxJava offers a few operators as a counterpart of the filtering ones, that we've previously discussed.

Let's start looking at the skip operator, the counterpart of take.

3.1. The skip Operator

When an Observable emits a sequence of items, it's possible to filter out or skip some of the firsts emitted items using skip.

For example. let's see how it's possible to skip the first four elements:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.skip(4); filteredObservable.subscribe(subscriber); subscriber.assertValues(5, 6, 7, 8, 9, 10);

3.2. The skipWhile Operator

Whenever we want to filter out all the first values emitted by an Observable that fail a filtering predicate, we can use the skipWhile operator:

Observable sourceObservable = Observable .just(1, 2, 3, 4, 5, 4, 3, 2, 1); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable .skipWhile(i -> i < 4); filteredObservable.subscribe(subscriber); subscriber.assertValues(4, 5, 4, 3, 2, 1);

3.3. The skipLast Operator

The skipLast operator allows us to skip the final items emitted by the Observable accepting only those emitted before them.

With this, we can, for example, skip the last five items:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = sourceObservable.skipLast(5); filteredObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3, 4, 5);

3.4. distinct and distinctUntilChanged Operators

The distinct operator returns an Observable that emits all the items emitted by the sourceObservable that are distinct:

Observable sourceObservable = Observable .just(1, 1, 2, 2, 1, 3, 3, 1); TestSubscriber subscriber = new TestSubscriber(); Observable distinctObservable = sourceObservable.distinct(); distinctObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 3);

However, if we want to obtain an Observable that emits all the items emitted by the sourceObservable that are distinct from their immediate predecessor, we can use the distinctUntilChanged operator:

Observable sourceObservable = Observable .just(1, 1, 2, 2, 1, 3, 3, 1); TestSubscriber subscriber = new TestSubscriber(); Observable distinctObservable = sourceObservable.distinctUntilChanged(); distinctObservable.subscribe(subscriber); subscriber.assertValues(1, 2, 1, 3, 1);

3.5. The ignoreElements Operator

Whenever we want to ignore all the elements emitted by the sourceObservable, we can simply use the ignoreElements:

Observable sourceObservable = Observable.range(1, 10); TestSubscriber subscriber = new TestSubscriber(); Observable ignoredObservable = sourceObservable.ignoreElements(); ignoredObservable.subscribe(subscriber); subscriber.assertNoValues();

4. Time Filtering Operators

When working with observable sequence, the time axis is unknown but sometimes getting timely data from a sequence could be useful.

With this purpose, RxJava offers a few methods that allow us to work with Observable using also the time axis.

Before moving on to the first one, let's define a timed Observable that will emit an item every second:

TestScheduler testScheduler = new TestScheduler(); Observable timedObservable = Observable .just(1, 2, 3, 4, 5, 6) .zipWith(Observable.interval( 0, 1, TimeUnit.SECONDS, testScheduler), (item, time) -> item);

The TestScheduler is a special scheduler that allows advancing the clock manually at whatever pace we prefer.

4.1. sample and throttleLast Operators

The sample operator filters the timedObservable, returning an Observable that emits the most recent items emitted by this API within period time intervals.

Let's see how we can sample the timedObservable, filtering only the last emitted item every 2.5 seconds:

TestSubscriber subscriber = new TestSubscriber(); Observable sampledObservable = timedObservable .sample(2500L, TimeUnit.MILLISECONDS, testScheduler); sampledObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(3, 5, 6);

This kind of behavior can be achieved also using the throttleLast operator.

4.2. The throttleFirst Operator

The throttleFirst operator differs from throttleLast/sample since it emits the first item emitted by the timedObservable in each sampling period instead of the most recently emitted one.

Let's see how we can emit the first items, using a sampling period of 4 seconds:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .throttleFirst(4100L, TimeUnit.SECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(1, 6);

4.3. debounce and throttleWithTimeout Operators

With the debounce operator, it's possible to emit only an item if a particular timespan has passed without emitting another item.

Therefore, if we select a timespan that is greater than the time interval between the emitted items of the timedObservable, it will only emit the last one. On the other hand, if it's smaller, it will emit all the items emitted by the timedObservable.

Let's see what happens in the first scenario:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .debounce(2000L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValue(6);

This kind of behavior can also be achieved using throttleWithTimeout.

4.4. The timeout Operator

The timeout operator mirrors the source Observable, but issue a notification error, aborting the emission of items, if the source Observable fails to emit any items during a specified time interval.

Let's see what happens if we specify a timeout of 500 milliseconds to our timedObservable:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .timeout(500L, TimeUnit.MILLISECONDS, testScheduler); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertError(TimeoutException.class); subscriber.assertValues(1);

5. Multiple Observable Filtering

When working with Observable, it's definitely possible to decide if filtering or skipping items based on a second Observable.

Before moving on, let's define a delayedObservable, that will emit only 1 item after 3 seconds:

Observable delayedObservable = Observable.just(1) .delay(3, TimeUnit.SECONDS, testScheduler);

Let's start with takeUntil operator.

5.1. The takeUntil Operator

The takeUntil operator discards any item emitted by the source Observable (timedObservable) after a second Observable (delayedObservable) emits an item or terminates:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .skipUntil(delayedObservable); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(4, 5, 6);

5.2. The skipUntil Operator

On the other hand, skipUntil discards any item emitted by the source Observable (timedObservable) until a second Observable (delayedObservable) emits an item:

TestSubscriber subscriber = new TestSubscriber(); Observable filteredObservable = timedObservable .takeUntil(delayedObservable); filteredObservable.subscribe(subscriber); testScheduler.advanceTimeBy(7, TimeUnit.SECONDS); subscriber.assertValues(1, 2, 3);

6. Conclusion

In this extensive tutorial, we explored the different filtering operators available within RxJava, providing a simple example of each one.

As always, all the code examples in this article can be found over on GitHub.