Комбиниране на наблюдаеми в RxJava

1. Въведение

В този кратък урок ще разгледаме различни начини за комбиниране на видимост в RxJava.

Ако сте нов в RxJava, определено първо разгледайте този уводен урок.

Сега, нека скочим направо.

2. Наблюдаеми

Наблюдаемите последователности или просто Observables са представяне на асинхронни потоци от данни.

Те се основават на модела Observer, при който обект, наречен Observer , се абонира за елементи, излъчени от Observable .

Абонаментът не блокира, тъй като Наблюдателят реагира на каквото и да излъчва Observable в бъдеще. Това от своя страна улеснява едновременността.

Ето една проста демонстрация в RxJava:

Observable .from(new String[] { "John", "Doe" }) .subscribe(name -> System.out.println("Hello " + name))

3. Комбиниране на наблюдаеми

Когато програмирате с помощта на реактивна рамка, това е често срещан случай за комбиниране на различни наблюдаеми .

Например в уеб приложение може да се наложи да получим два набора асинхронни потоци от данни, които са независими един от друг.

Вместо да чакаме предишния поток да завърши, преди да поискаме следващия поток, можем да се обадим едновременно и на двете и да се абонираме за комбинираните потоци.

В този раздел ще разгледаме някои от най-различни начини, по които могат да се комбинират няколко наблюдаеми в RxJava и различните случаи, когато използването на които се прилага всеки метод.

3.1. Обединяване

Можем да използваме оператора за сливане , за да комбинираме изхода на множество наблюдаеми, така че те да действат като един:

@Test public void givenTwoObservables_whenMerged_shouldEmitCombinedResults() { TestSubscriber testSubscriber = new TestSubscriber(); Observable.merge( Observable.from(new String[] {"Hello", "World"}), Observable.from(new String[] {"I love", "RxJava"}) ).subscribe(testSubscriber); testSubscriber.assertValues("Hello", "World", "I love", "RxJava"); }

3.2. MergeDelayError

Методът mergeDelayError е същият като сливането, тъй като комбинира множество наблюдаеми в едно, но ако възникнат грешки по време на сливането, той позволява да продължат елементите без грешки, преди да разпространят грешките :

@Test public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError() { TestSubscriber testSubscriber = new TestSubscriber(); Observable.mergeDelayError( Observable.from(new String[] { "hello", "world" }), Observable.error(new RuntimeException("Some exception")), Observable.from(new String[] { "rxjava" }) ).subscribe(testSubscriber); testSubscriber.assertValues("hello", "world", "rxjava"); testSubscriber.assertError(RuntimeException.class); }

Горният пример издава всички стойности без грешки :

hello world rxjava

Имайте предвид, че ако използваме сливане вместо mergeDelayError , на String " rxjava" няма да бъде излъчвана защото сливането веднага спира потока на данни от наблюдаеми , когато възникне грешка.

3.3. Цип

Методът на zip разширяване обединява две последователности от стойности като двойки :

@Test public void givenTwoObservables_whenZipped_thenReturnCombinedResults() { List zippedStrings = new ArrayList(); Observable.zip( Observable.from(new String[] { "Simple", "Moderate", "Complex" }), Observable.from(new String[] { "Solutions", "Success", "Hierarchy"}), (str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add); assertThat(zippedStrings).isNotEmpty(); assertThat(zippedStrings.size()).isEqualTo(3); assertThat(zippedStrings).contains("Simple Solutions", "Moderate Success", "Complex Hierarchy"); }

3.4. Цип с интервал

В този пример ще архивираме поток с интервал, който на практика ще забави излъчването на елементи от първия поток:

@Test public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission() { TestSubscriber testSubscriber = new TestSubscriber(); Observable data = Observable.just("one", "two", "three", "four", "five"); Observable interval = Observable.interval(1L, TimeUnit.SECONDS); Observable .zip(data, interval, (strData, tick) -> String.format("[%d]=%s", tick, strData)) .toBlocking().subscribe(testSubscriber); testSubscriber.assertCompleted(); testSubscriber.assertValueCount(5); testSubscriber.assertValues("[0]=one", "[1]=two", "[2]=three", "[3]=four", "[4]=five"); }

4. Обобщение

В тази статия, сме виждали някои от методите за комбиниране на видимост, с RxJava. Можете да научите за други методи като combLatest , join , groupJoin , switchOnNext , в официалната документация на RxJava.

Както винаги, изходният код за тази статия е достъпен в нашето репо GitHub.