Планировчици в RxJava

1. Общ преглед

В тази статия ще се съсредоточим върху различни типове Планировчици , които ще използваме при писането на многонишкови програми, базирани на RxJava Observable методите subscribeOn и obserOn .

Планировъците дават възможност да определят къде и вероятно кога да изпълняват задачи, свързани с работата на наблюдаема верига.

Можем да получим Scheduler от фабричните методи, описани в класа Schedulers.

2. Поведение на резби по подразбиране

По подразбиране Rx е с една нишка, което предполага, че Observable и веригата от оператори, които можем да приложим към него, ще уведомят своите наблюдатели в същата нишка, на която е извикан неговият метод абонамент () .

Методите obserOn и subscribeOn приемат като аргумент Планировчик, който, както подсказва името, е инструмент, който можем да използваме за планиране на отделни действия.

Ще създадем нашата реализация на Планировчик, като използваме метода create Worker , който връща Scheduler.Worker. А работник приема действия и да ги изпълнява последователно на един конец.

По някакъв начин работникът е самият S cheduler, но няма да го наричаме Scheduler, за да избегнем объркване.

2.1. Планиране на действие

Можем да планираме работа на всеки планировчик, като създадем нов работник и планираме някои действия:

Scheduler scheduler = Schedulers.immediate(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> result += "action"); Assert.assertTrue(result.equals("action"));

След това действието се поставя на опашка в нишката, към която е назначен работникът.

2.2. Отмяна на действие

Scheduler.Worker разширява абонамента . Извикването на метода за отписване на работник ще доведе до изпразване на опашката и отмяна на всички чакащи задачи. Можем да видим това чрез пример:

Scheduler scheduler = Schedulers.newThread(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += "First_Action"; worker.unsubscribe(); }); worker.schedule(() -> result += "Second_Action"); Assert.assertTrue(result.equals("First_Action"));

Втората задача никога не се изпълнява, защото тази преди нея е анулирала цялата операция. Действията, които са били в процес на изпълнение, ще бъдат прекъснати.

3. Планеристи.newThread

Този планировчик просто стартира нова нишка всеки път, когато е поискана чрез subscribeOn () или obserOn () .

Едва ли е добър избор, не само поради латентността, свързана с стартирането на нишка, но и защото тази нишка не се използва повторно:

Observable.just("Hello") .observeOn(Schedulers.newThread()) .doOnNext(s -> result2 += Thread.currentThread().getName() ) .observeOn(Schedulers.newThread()) .subscribe(s -> result1 += Thread.currentThread().getName() ); Thread.sleep(500); Assert.assertTrue(result1.equals("RxNewThreadScheduler-1")); Assert.assertTrue(result2.equals("RxNewThreadScheduler-2"));

Когато Worker приключи, нишката просто се прекратява. Този планировчик може да се използва само когато задачите са грубо зърнести: отнема много време за изпълнение, но има много малко от тях, така че е малко вероятно нишките да бъдат използвани повторно.

Scheduler scheduler = Schedulers.newThread(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "_Start"; worker.schedule(() -> result += "_worker_"); result += "_End"; }); Thread.sleep(3000); Assert.assertTrue(result.equals( "RxNewThreadScheduler-1_Start_End_worker_"));

Когато насрочихме работника на NewThreadScheduler, видяхме, че работникът е обвързан с определена нишка.

4. Планировчици.незабавни

Schedulers.immediate е специален планировчик, който извиква задача в клиентската нишка по блокиращ начин, а не асинхронно и се връща при завършване на действието:

Scheduler scheduler = Schedulers.immediate(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "_Start"; worker.schedule(() -> result += "_worker_"); result += "_End"; }); Thread.sleep(500); Assert.assertTrue(result.equals( "main_Start_worker__End"));

Всъщност абонирането за Observable чрез незабавен планировчик обикновено има същия ефект като изобщо да не се абонирате за някакъв конкретен S cheduler:

Observable.just("Hello") .subscribeOn(Schedulers.immediate()) .subscribe(s -> result += Thread.currentThread().getName() ); Thread.sleep(500); Assert.assertTrue(result.equals("main"));

5. Планеристи.трамплин

Най- батут Scheduler е много подобен на незабавно , защото тя също графици задачи в една и съща нишка, ефективно блокиране.

Предстоящата задача обаче се изпълнява, когато всички предварително планирани задачи приключат:

Observable.just(2, 4, 6, 8) .subscribeOn(Schedulers.trampoline()) .subscribe(i -> result += "" + i); Observable.just(1, 3, 5, 7, 9) .subscribeOn(Schedulers.trampoline()) .subscribe(i -> result += "" + i); Thread.sleep(500); Assert.assertTrue(result.equals("246813579"));

Незабавно извиква дадена задача веднага, докато батутът чака текущата задача да завърши.

Най- батут е работник изпълнява всяка задача на нишката, която насрочена първата задача. Първото извикване на график се блокира, докато опашката не се изпразни:

Scheduler scheduler = Schedulers.trampoline(); Scheduler.Worker worker = scheduler.createWorker(); worker.schedule(() -> { result += Thread.currentThread().getName() + "Start"; worker.schedule(() -> { result += "_middleStart"; worker.schedule(() -> result += "_worker_" ); result += "_middleEnd"; }); result += "_mainEnd"; }); Thread.sleep(500); Assert.assertTrue(result .equals("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Планеристи.от

Планировъците са вътрешно по-сложни от изпълнителите от java.util.concurrent - така че беше необходима отделна абстракция.

Но тъй като те концептуално са доста сходни, не е изненадващо, че има обвивка, която може да превърне Executor в Scheduler, използвайки метода от фабриката:

private ThreadFactory threadFactory(String pattern) { return new ThreadFactoryBuilder() .setNameFormat(pattern) .build(); } @Test public void givenExecutors_whenSchedulerFrom_thenReturnElements() throws InterruptedException { ExecutorService poolA = newFixedThreadPool( 10, threadFactory("Sched-A-%d")); Scheduler schedulerA = Schedulers.from(poolA); ExecutorService poolB = newFixedThreadPool( 10, threadFactory("Sched-B-%d")); Scheduler schedulerB = Schedulers.from(poolB); Observable observable = Observable.create(subscriber -> { subscriber.onNext("Alfa"); subscriber.onNext("Beta"); subscriber.onCompleted(); });; observable .subscribeOn(schedulerA) .subscribeOn(schedulerB) .subscribe( x -> result += Thread.currentThread().getName() + x + "_", Throwable::printStackTrace, () -> result += "_Completed" ); Thread.sleep(2000); Assert.assertTrue(result.equals( "Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

SchedulerB се използва за кратък период от време, но той едва планира ново действие върху SchedulerA , който върши цялата работа. По този начин многобройните методи за SubscriOn не само се игнорират, но и въвеждат малки режийни разходи.

7. Планировчици.io

This Scheduler is similar to the newThread except for the fact that already started threads are recycled and can possibly handle future requests.

This implementation works similarly to ThreadPoolExecutor from java.util.concurrent with an unbounded pool of threads. Every time a new worker is requested, either a new thread is started (and later kept idle for some time) or the idle one is reused:

Observable.just("io") .subscribeOn(Schedulers.io()) .subscribe(i -> result += Thread.currentThread().getName()); Assert.assertTrue(result.equals("RxIoScheduler-2"));

We need to be careful with unbounded resources of any kind – in case of slow or unresponsive external dependencies like web services, ioscheduler might start an enormous number of threads, leading to our very own application becoming unresponsive.

In practice, following Schedulers.io is almost always a better choice.

8. Schedulers.computation

Computation Scheduler by default limits the number of threads running in parallel to the value of availableProcessors(), as found in the Runtime.getRuntime() utility class.

So we should use a computation scheduler when tasks are entirely CPU-bound; that is, they require computational power and have no blocking code.

It uses an unbounded queue in front of every thread, so if the task is scheduled, but all cores are occupied, it will be queued. However, the queue just before each thread will keep growing:

Observable.just("computation") .subscribeOn(Schedulers.computation()) .subscribe(i -> result += Thread.currentThread().getName()); Assert.assertTrue(result.equals("RxComputationScheduler-1"));

If for some reason, we need a different number of threads than the default, we can always use the rx.scheduler.max-computation-threads system property.

By taking fewer threads we can ensure that there is always one or more CPU cores idle, and even under heavy load, computation thread pool does not saturate the server. It's simply not possible to have more computation threads than cores.

9. Schedulers.test

This Scheduler is used only for testing purposes, and we'll never see it in production code. Its main advantage is the ability to advance the clock, simulating time passing by arbitrarily:

List letters = Arrays.asList("A", "B", "C"); TestScheduler scheduler = Schedulers.test(); TestSubscriber subscriber = new TestSubscriber(); Observable tick = Observable .interval(1, TimeUnit.SECONDS, scheduler); Observable.from(letters) .zipWith(tick, (string, index) -> index + "-" + string) .subscribeOn(scheduler) .subscribe(subscriber); subscriber.assertNoValues(); subscriber.assertNotCompleted(); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); subscriber.assertNoErrors(); subscriber.assertValueCount(1); subscriber.assertValues("0-A"); scheduler.advanceTimeTo(3, TimeUnit.SECONDS); subscriber.assertCompleted(); subscriber.assertNoErrors(); subscriber.assertValueCount(3); assertThat( subscriber.getOnNextEvents(), hasItems("0-A", "1-B", "2-C"));

10. Default Schedulers

Some Observable operators in RxJava have alternate forms that allow us to set which Scheduler the operator will use for its operation. Others don't operate on any particular Scheduler or operate on a particular default Scheduler.

For example, the delay operator takes upstream events and pushes them downstream after a given time. Obviously, it cannot hold the original thread during that period, so it must use a different Scheduler:

ExecutorService poolA = newFixedThreadPool( 10, threadFactory("Sched1-")); Scheduler schedulerA = Schedulers.from(poolA); Observable.just('A', 'B') .delay(1, TimeUnit.SECONDS, schedulerA) .subscribe(i -> result+= Thread.currentThread().getName() + i + " "); Thread.sleep(2000); Assert.assertTrue(result.equals("Sched1-A Sched1-B "));

Without supplying a custom schedulerA, all operators below delay would use the computation Scheduler.

Other important operators that support custom Schedulers are buffer, interval, range, timer, skip, take, timeout, and several others. If we don't provide a Scheduler to such operators, computation scheduler is utilized, which is a safe default in most cases.

11. Conclusion

In truly reactive applications, for which all long-running operations are asynchronous, very few threads and thus Schedulers are needed.

Овладяването на планировчиците е от съществено значение за писането на мащабируем и безопасен код с помощта на RxJava. Разликата между абонамент и наблюдение е особено важна при голямо натоварване, където всяка задача трябва да бъде изпълнена точно когато очакваме.

Не на последно място, трябва да сме сигурни, че Schedulers използвани надолу по веригата може да се справи с рекламата ето, генерирани от Schedulers upstrea м. За повече информация има тази статия за обратното налягане.

Изпълнението на всички тези примери и кодови фрагменти може да се намери в проекта GitHub - това е проект на Maven, така че трябва да е лесно да се импортира и да се изпълнява както е.