Справяне с обратното налягане с RxJava

1. Общ преглед

В тази статия ще разгледаме начина, по който библиотеката RxJava ни помага да се справим с обратно налягане.

Казано по-просто - RxJava използва концепция за реактивни потоци чрез въвеждане на Observables, за които един или много наблюдатели могат да се абонират. Справянето с евентуално безкрайни потоци е много предизвикателно, тъй като трябва да се изправим пред проблем с обратно налягане.

Не е трудно да влезете в ситуация, в която Observable излъчва елементи по-бързо, отколкото абонатът може да ги консумира. Ще разгледаме различните решения на проблема с нарастващия буфер на неконсумирани артикули.

2. Горещи наблюдения срещу студени наблюдения

Първо, нека създадем проста потребителска функция, която ще се използва като консуматор на елементи от Observables , които ще дефинираме по-късно:

public class ComputeFunction { public static void compute(Integer v) { try { System.out.println("compute integer v: " + v); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }

Нашата функция compute () просто отпечатва аргумента. Важното, което трябва да забележите тук, е извикване на метод Thread.sleep (1000) - ние го правим, за да подражаваме на някоя продължителна задача, която ще накара Observable да се запълни с елементи по-бързо, че Observer да ги консумира.

Имаме два вида наблюдаеми - горещи и студени - които се различават напълно, когато става въпрос за управление на обратното налягане.

2.1. Студени наблюдаеми

Студеният наблюдателен излъчва определена последователност от елементи, но може да започне да излъчва тази последователност, когато наблюдателят му намери, че е удобна и с каквато и скорост да пожелае наблюдателят , без да нарушава целостта на последователността. Cold Observable предоставя лениви предмети.

The Observer взема елементи само когато тя е готова да обработва тази позиция, както и елементи, които не е нужно да бъдат буферирани в наблюдаемата , защото те се изисква в разтегателен мода.

Например, ако създадете Observable въз основа на статичен диапазон от елементи от един до един милион, този Observable ще излъчва същата последователност от елементи, независимо колко често се наблюдават тези елементи:

Observable.range(1, 1_000_000) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute);

Когато стартираме програмата си, елементите ще бъдат изчислени от наблюдателя мързеливо и ще бъдат изискани по дърпащ начин. Методът Schedulers.computation () означава, че искаме да стартираме нашия Observer в рамките на пул от изчислителни нишки в RxJava.

Резултатът от програмата ще се състои от резултат от метод compute () , извикан за един по един елемент от Observable :

compute integer v: 1 compute integer v: 2 compute integer v: 3 compute integer v: 4 ...

Студените наблюдаеми не е необходимо да имат някаква форма на противоналягане, тъй като те работят по дърпащ начин. Примери за елементи, излъчени от студено Observable, могат да включват резултатите от заявка за база данни, извличане на файлове или уеб заявка.

2.2. Горещи наблюдения

Горещият наблюдателен започва да генерира елементи и ги излъчва незабавно, когато са създадени. Това е в противоречие с модел на обработка Cold Observables . Hot Observable излъчва артикули със свое собствено темпо и от неговите наблюдатели зависи да се справят.

Когато Наблюдателят не е в състояние да консумира елементи толкова бързо, колкото са произведени от Наблюдаем, те трябва да бъдат буферирани или обработени по някакъв друг начин, тъй като те ще запълнят паметта, накрая причинявайки OutOfMemoryException.

Нека разгледаме пример за горещо наблюдаемо, което произвежда 1 милион артикула на краен потребител, който обработва тези елементи. Когато методът за изчисляване () в Observer отнема известно време за обработка на всеки елемент, Observable започва да запълва памет с елементи, причинявайки неуспех на програмата:

PublishSubject source = PublishSubject.create(); source.observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); IntStream.range(1, 1_000_000).forEach(source::onNext); 

Изпълнението на тази програма ще се провали с MissingBackpressureException, защото не сме дефинирали начин за обработка на свръхпроизводството Observable .

Примерите за елементи, излъчвани от гореща Observable, могат да включват събития с мишката и клавиатурата, системни събития или цени на акции.

3. Наблюдение на свръхпроизводство в буфери

Първият начин за справяне със свръхпроизводството на Observable е да се дефинира някакъв вид буфер за елементи, които не могат да бъдат обработени от Observer.

Можем да го направим, като извикаме метод на buffer () :

PublishSubject source = PublishSubject.create(); source.buffer(1024) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); 

Определянето на буфер с размер 1024 ще даде на Наблюдателя известно време да навакса свръхпроизводствения източник. Буферът ще съхранява елементи, които все още не са били обработени.

Можем да увеличим размера на буфера, за да имаме достатъчно място за произведени стойности.

Имайте предвид обаче, че като цяло това може да е само временно решение, тъй като препълването все още може да се случи, ако източникът свръхпроизводи предвидения размер на буфера.

4. Партиране на емитираните артикули

Можем да пакетираме свръхпродуцирани елементи в прозорци с N елемента.

Когато Observable произвежда елементи по-бързо, отколкото Observer може да ги обработи, можем да смекчим това, като групираме произведените елементи заедно и изпратим партида от елементи на Observer, която е в състояние да обработи колекция от елементи вместо елемент един по един:

PublishSubject source = PublishSubject.create(); source.window(500) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace); 

Използването на метод window () с аргумент 500, ще каже на Observable да групира елементи в партидите с размер 500. Тази техника може да намали проблема с свръхпроизводството на Observable, когато Observer е в състояние да обработи по-бързо партида от елементи в сравнение с обработката на елементи един по един.

5. Пропускане на елементи

Ако някои от стойностите, произведени от Observable, могат безопасно да бъдат игнорирани, можем да използваме вземането на проби в рамките на определено време и операторите за регулиране.

Методите sample () и throttleFirst () вземат продължителност като параметър:

  • Методът s ample () периодично разглежда последователността на елементите и излъчва последния елемент, който е произведен в рамките на продължителността, посочена като параметър
  • Методът throttleFirst () излъчва първия елемент, който е произведен след продължителността, посочена като параметър

Продължителността е време, след което един определен елемент се избира от последователността на произведените елементи. Можем да определим стратегия за работа с противоналягане, като пропуснем елементи:

PublishSubject source = PublishSubject.create(); source.sample(100, TimeUnit.MILLISECONDS) .observeOn(Schedulers.computation()) .subscribe(ComputeFunction::compute, Throwable::printStackTrace);

Уточнихме, че стратегията за пропускане на елементи ще бъде примерен метод () . Искаме проба от последователност с продължителност от 100 милисекунди. Този елемент ще бъде излъчен за наблюдателя.

Не забравяйте обаче, че тези оператори само намаляват скоростта на приемане на стойността от наблюдателя надолу по веригата и по този начин те все още могат да доведат до MissingBackpressureException .

6. Работа с пълним наблюдаем буфер

В случай, че нашите стратегии за вземане на проби или дозиране на елементи не помагат за попълване на буфер , трябва да приложим стратегия за обработка на случаи, когато буфер се запълва.

Трябва да използваме метод onBackpressureBuffer () , за да предотвратим BufferOverflowException.

Методът onBackpressureBuffer () взема три аргумента: капацитет на наблюдаем буфер, метод, който се извиква, когато буферът се запълва, и стратегия за обработка на елементи, които трябва да бъдат изхвърлени от буфер. Стратегиите за преливане са в клас BackpressureOverflow .

Има 4 вида действия, които могат да бъдат изпълнени, когато буферът се запълни:

  • ON_OVERFLOW_ERROR - това е поведението по подразбиране, сигнализиращо за BufferOverflowException, когато буферът е пълен
  • ON_OVERFLOW_DEFAULT - в момента е същото като ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST - ако се случи преливане, текущата стойност просто ще бъде игнорирана и само старите стойности ще бъдат доставени, след като наблюдателят надолу по веригата поиска
  • ON_OVERFLOW_DROP_OLDEST - изпуска най-стария елемент в буфера и добавя текущата стойност към него

Нека да видим как да посочим тази стратегия:

Observable.range(1, 1_000_000) .onBackpressureBuffer(16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .observeOn(Schedulers.computation()) .subscribe(e -> {}, Throwable::printStackTrace); 

Тук нашата стратегия за работа с препълващия се буфер е пускането на най-стария елемент в буфер и добавяне на най-новия елемент, създаден от Observable .

Обърнете внимание, че последните две стратегии причиняват прекъсване в потока, когато отпадат елементи. Освен това те няма да сигнализират за BufferOverflowException .

7. Отпадане на всички свръхпродуцирани елементи

Whenever the downstream Observer is not ready to receive an element, we can use an onBackpressureDrop() method to drop that element from the sequence.

We can think of that method as an onBackpressureBuffer() method with a capacity of a buffer set to zero with a strategy ON_OVERFLOW_DROP_LATEST.

This operator is useful when we can safely ignore values from a source Observable (such as mouse moves or current GPS location signals) as there will be more up-to-date values later on:

Observable.range(1, 1_000_000) .onBackpressureDrop() .observeOn(Schedulers.computation()) .doOnNext(ComputeFunction::compute) .subscribe(v -> {}, Throwable::printStackTrace);

The method onBackpressureDrop() is eliminating a problem of overproducing Observable but needs to be used with caution.

8. Conclusion

В тази статия разгледахме проблема за свръхпроизводството на Observable и начините за справяне с обратно налягане. Разгледахме стратегии за буфериране, групиране и пропускане на елементи, когато наблюдателят не е в състояние да консумира елементи толкова бързо, колкото са създадени от наблюдател .

Изпълнението на всички тези примери и кодови фрагменти може да се намери в проекта GitHub - това е проект на Maven, така че трябва да е лесно да се импортира и да се изпълнява както е.