Въведение в RxKotlin

1. Общ преглед

В този урок ще разгледаме използването на реактивни разширения (Rx) в идиоматичния Kotlin, използвайки библиотеката RxKotlin.

RxKotlin не е изпълнение на реактивни разширения, само по себе си. Вместо това това е предимно колекция от методи за разширение. Това означава, че RxKotlin увеличава библиотеката RxJava с API, проектиран с мисълта за Kotlin.

Ето защо, ние ще използваме понятия от нашата статия, Въведение в RxJava, както и концепцията за Течните вещества , които сме представени в специална статия.

2. Настройка на RxKotlin

За да използваме RxKotlin в нашия проект Maven, ще трябва да добавим зависимостта rxkotlin към нашия pom.xml:

 io.reactivex.rxjava2 rxkotlin 2.3.0 

Или, за проект Gradle, към нашата build.gradle:

implementation 'io.reactivex.rxjava2:rxkotlin:2.3.0'

Тук използваме RxKotlin 2.x, който е насочен към RxJava 2. Проектите, използващи RxJava 1, трябва да използват RxKotlin 1.x. И за двете версии се прилагат едни и същи концепции.

Имайте предвид, че RxKotlin зависи от RxJava, но те не актуализират зависимостта често до последната версия. Затова препоръчваме изрично да включите конкретната версия на RxJava, от която ще разчитаме, както е подробно описано в нашата статия за RxJava.

3. Създаване на наблюдаеми s в RxKotlin

RxKotlin включва редица методи за разширение за създаване на наблюдаеми и течащи обекти от колекции.

По-специално, всеки тип масив има toObservable () метод и toFlowable () метод:

val observable = listOf(1, 1, 2, 3).toObservable() observable.test().assertValues(1, 1, 2, 3)
val flowable = listOf(1, 1, 2, 3).toFlowable() flowable.buffer(2).test().assertValues(listOf(1, 1), listOf(2, 3))

3.1. Изпълним s

RxKotlin също предоставя някои методи за създаване на изпълними екземпляри. По-конкретно, можем да конвертираме Action s, Callable s, Future s и нулеви arity функции в Completable с метод на разширение вCompletable:

var value = 0 val completable = { value = 3 }.toCompletable() assertFalse(completable.test().isCancelled()) assertEquals(3, value)

4. Наблюдаем и течащ на карта и мултимап

Когато имаме Observable или Flowable, който създава двойки , можем да ги трансформираме в Single Observable, който създава Map:

val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4)) val observable = list.toObservable() val map = observable.toMap() assertEquals(mapOf(Pair("a", 4), Pair("b", 2), Pair("c", 3)), map.blockingGet())

Както виждаме в предишния пример, toMap замества стойности, излъчени по-рано, с по-късни стойности, ако имат същия ключ.

Ако искаме да натрупаме всички стойности, свързани с ключ, в колекция, вместо това използваме toMultimap :

val list = listOf(Pair("a", 1), Pair("b", 2), Pair("c", 3), Pair("a", 4)) val observable = list.toObservable() val map = observable.toMultimap() assertEquals( mapOf(Pair("a", listOf(1, 4)), Pair("b", listOf(2)), Pair("c", listOf(3))), map.blockingGet())

5. Комбиниране на наблюдаеми s и текущи s

Една от точките за продажба на Rx е възможността да се комбинират Observable s и Flowable s по различни начини. Всъщност RxJava предоставя редица оператори извън кутията.

В допълнение към това, RxKotlin включва още няколко метода за удължаване за комбиниране на Observable s и други подобни.

5.1. Комбиниране на наблюдаеми емисии

Когато имаме Observable, който излъчва други Observable s, можем да използваме един от методите за разширение в RxKotlin, за да комбинираме излъчените стойности.

По-специално, mergeAll комбинира наблюдаемите с flatMap:

val subject = PublishSubject.create
    
     () val observable = subject.mergeAll()
    

Което би било същото като:

val observable = subject.flatMap { it }

Полученият Observable ще излъчи всички стойности на източника Observable s в неопределен ред.

По подобен начин concatAll използва concatMap (стойностите се излъчват в същия ред като източниците), докато switchLatest използва switchMap (стойностите се излъчват от последното излъчено Observable ).

Както видяхме досега, всички горепосочени методи са предвидени и за течащи източници, със същата семантика.

5.2. Комбиниране на изпълними s , може би s и единични s

Когато имаме Observable, който излъчва екземпляри на Completable , Maybe или Single , можем да комбинираме тези с подходящия метод mergeAllXs като например mergeAllMaybes :

val subject = PublishSubject.create
    
     () val observable = subject.mergeAllMaybes() subject.onNext(Maybe.just(1)) subject.onNext(Maybe.just(2)) subject.onNext(Maybe.empty()) subject.onNext(Maybe.error(Exception("error"))) subject.onNext(Maybe.just(3)) observable.test().assertValues(1, 2).assertError(Exception::class.java)
    

5.3. Комбинирането Iterable е на Реални ите

Вместо това за колекции от наблюдаеми или текущи екземпляри RxKotlin има няколко други оператори, merge и mergeDelayError . И двамата имат ефект на комбиниране на всички наблюдаеми s или текущи s в едно, което ще излъчва всички стойности последователно:

val observables = mutableListOf(Observable.just("first", "second")) val observable = observables.merge() observables.add(Observable.just("third", "fourth")) observable.test().assertValues("first", "second", "third", "fourth")

The difference between the two operators — which are directly derived from the same-named operators in RxJava — is their treatment of errors.

The merge method emits errors as soon as they're emitted by the source:

// ... observables.add(Observable.error(Exception("e"))) observables.add(Observable.just("fifth")) // ... observable.test().assertValues("first", "second", "third", "fourth")

Whereas mergeDelayError emits them at the end of the stream:

// ... observables.add(Observable.error(Exception("e"))) observables.add(Observable.just("fifth")) // ... observable.test().assertValues("first", "second", "third", "fourth", "fifth")

6. Handling Values of Different Types

Let's now look at the extension methods in RxKotlin for dealing with values of different types.

These are variants of RxJava methods, that make use of Kotlin's reified generics. In particular, we can:

  • cast emitted values from one type to another, or
  • filter out values that are not of a certain type

So, we could, for example, cast an Observable of Numbers to one of Ints:

val observable = Observable.just(1, 1, 2, 3) observable.cast().test().assertValues(1, 1, 2, 3)

Here, the cast is unnecessary. However, when combining different observables together, we might need it.

With ofType, instead, we can filter out values that aren't of the type we expect:

val observable = Observable.just(1, "and", 2, "and") observable.ofType().test().assertValues(1, 2)

As always, cast and ofType are applicable to both Observables and Flowables.

Furthermore, Maybe supports these methods as well. The Single class, instead, only supports cast.

7. Other Helper Methods

Finally, RxKotlin includes several helper methods. Let's have a quick look.

We can use subscribeBy instead of subscribe – it allows named parameters:

Observable.just(1).subscribeBy(onNext = { println(it) })

Similarly, for blocking subscriptions we can use blockingSubscribeBy.

Additionally, RxKotlin includes some methods that mimic those in RxJava but work around a limitation of Kotlin's type inference.

For example, when using Observable#zip, specifying the zipper doesn't look so great:

Observable.zip(Observable.just(1), Observable.just(2), BiFunction { a, b -> a + b })

So, RxKotlin adds Observables#zip for more idiomatic usage:

Observables.zip(Observable.just(1), Observable.just(2)) { a, b -> a + b }

Notice the final “s” in Observables. Similarly, we have Flowables, Singles, and Maybes.

8. Conclusions

В тази статия разгледахме подробно библиотеката RxKotlin, която увеличава RxJava, за да направи нейния API по-скоро като идиоматичен Kotlin.

За повече информация, моля, вижте страницата RxKotlin GitHub. За повече примери препоръчваме тестове RxKotlin.

Изпълнението на всички тези примери и кодови фрагменти може да бъде намерено в проекта GitHub като проект Maven и Gradle, така че трябва да е лесно да се импортира и да се изпълнява както е.