Ръководство за Java Phaser

1. Общ преглед

В тази статия ще разгледаме конструкцията Phaser от пакета java.util.concurrent . Това е много подобна конструкция на CountDownLatch , която ни позволява да координираме изпълнението на нишки. В сравнение с CountDownLatch , той има някои допълнителни функционалности.

В Phaser е бариера, на която динамично брой нишки трябва да изчакате, преди да продължи изпълнението. В CountDownLatch този номер не може да бъде конфигуриран динамично и трябва да бъде предоставен, когато създаваме екземпляра.

2. API на Phaser

В Phaser ни позволява да се изгради логика, в която конци трябва да чакам на бариерата преди да отидете на следващата стъпка на изпълнение .

Ние можем да координира няколко фази на изпълнение, да се използва повторно Phaser инстанция за всяка фаза на програмата. Всяка фаза може да има различен брой нишки, които чакат за преминаване към друга фаза. По-късно ще разгледаме пример за използване на фази.

За да участва в координацията, нишката трябва да се регистрира () в екземпляра Phaser . Имайте предвид, че това само увеличава броя на регистрираните страни и не можем да проверим дали текущата нишка е регистрирана - ще трябва да подкласираме изпълнението, за да поддържа това.

Нишката сигнализира, че е пристигнала до бариерата, като извика awaAndAwaitAdvance () , което е метод за блокиране. Когато броят на пристигналите страни е равен на броя на регистрираните страни, изпълнението на програмата ще продължи и броят на фазите ще се увеличи. Можем да получим номера на текущата фаза, като извикаме метода getPhase () .

Когато нишката приключи работата си, трябва да извикаме метода acceptAndDeregister () , за да сигнализираме, че текущата нишка вече не трябва да се отчита в тази конкретна фаза.

3. Внедряване на логика с помощта на Phaser API

Да кажем, че искаме да координираме множество фази на действия. Три нишки ще обработят първата фаза, а две нишки ще обработят втората фаза.

Ще създадем клас LongRunningAction, който реализира интерфейса Runnable :

class LongRunningAction implements Runnable { private String threadName; private Phaser ph; LongRunningAction(String threadName, Phaser ph) { this.threadName = threadName; this.ph = ph; ph.register(); } @Override public void run() { ph.arriveAndAwaitAdvance(); try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } ph.arriveAndDeregister(); } }

Когато нашият клас на действие е създаден, ние се регистрираме в екземпляра на Phaser, използвайки метода register () . Това ще увеличи броя на нишките, използвайки конкретния Phaser.

Извикването на arriAndAwaitAdvance () ще накара текущата нишка да изчака на бариерата. Както вече споменахме, когато броят на пристигналите страни стане същият като броя на регистрираните страни, изпълнението ще продължи.

След като обработката приключи, текущата нишка се отписва от себе си, като извиква метода acceptAndDeregister () .

Нека създадем тестов случай, в който ще стартираме три нишки LongRunningAction и ще блокираме върху бариерата. След това, след като действието приключи, ще създадем две допълнителни нишки LongRunningAction, които ще извършат обработка на следващата фаза.

Когато създаваме екземпляр на Phaser от основната нишка, ние предаваме 1 като аргумент. Това е еквивалентно на извикване на метод register () от текущата нишка. Правим това, защото когато създаваме три работни нишки, основната нишка е координатор и следователно Phaser трябва да има регистрирани четири нишки:

ExecutorService executorService = Executors.newCachedThreadPool(); Phaser ph = new Phaser(1); assertEquals(0, ph.getPhase());

Фазата след инициализацията е равна на нула.

Класът Phaser има конструктор, в който можем да му предадем родителски екземпляр. Полезно е в случаите, когато имаме голям брой страни, които биха претърпели огромни разходи за съхранение на синхронизацията. В такива ситуации могат да бъдат настроени екземпляри на Phasers , така че групи от подфазери да споделят общ родител.

След това, нека започнем три нишки на действие LongRunningAction , които ще чакат на бариерата, докато не извикаме метода acceptAndAwaitAdvance () от основната нишка.

Имайте предвид, че инициализирахме Phaser с 1 и извикахме register () още три пъти. Сега три нишки за действие обявиха, че са стигнали до бариерата, така че е необходимо още едно обаждане на acceptAndAwaitAdvance () - тази от основната нишка:

executorService.submit(new LongRunningAction("thread-1", ph)); executorService.submit(new LongRunningAction("thread-2", ph)); executorService.submit(new LongRunningAction("thread-3", ph)); ph.arriveAndAwaitAdvance(); assertEquals(1, ph.getPhase());

След приключване на тази фаза методът getPhase () ще върне такъв, защото програмата завърши обработката на първата стъпка на изпълнение.

Да кажем, че две нишки трябва да провеждат следващата фаза на обработка. Можем да използваме Phaser, за да постигнем това, защото ни позволява да конфигурираме динамично броя нишки, които трябва да чакат на бариерата. Стартираме две нови нишки, но те няма да продължат да се изпълняват, докато извикването на arriAndAwaitAdvance () от основната нишка (същото като в предишния случай):

executorService.submit(new LongRunningAction("thread-4", ph)); executorService.submit(new LongRunningAction("thread-5", ph)); ph.arriveAndAwaitAdvance(); assertEquals(2, ph.getPhase()); ph.arriveAndDeregister();

След това методът getPhase () ще върне фазово число, равно на две. Когато искаме да завършим нашата програма, трябва да извикаме метода acceptAndDeregister () , тъй като основната нишка все още е регистрирана във Phaser. Когато премахването на регистрацията доведе до нулев брой регистрирани страни, Phaser се прекратява. Всички извиквания към методи за синхронизация вече няма да се блокират и ще се върнат веднага.

Стартирането на програмата ще даде следния изход (пълният изходен код с отпечатъците на реда за печат може да бъде намерен в хранилището на кода):

This is phase 0 This is phase 0 This is phase 0 Thread thread-2 before long running action Thread thread-1 before long running action Thread thread-3 before long running action This is phase 1 This is phase 1 Thread thread-4 before long running action Thread thread-5 before long running action

Виждаме, че всички нишки чакат изпълнение, докато бариерата се отвори. Следващата фаза на изпълнението се извършва само когато предишната завърши успешно.

4. Заключение

В този урок разгледахме конструкцията Phaser от java.util.concurrent и внедрихме координационната логика с множество фази, използвайки клас Phaser .

Изпълнението на всички тези примери и кодови фрагменти може да се намери в проекта GitHub - това е проект на Maven, така че трябва да е лесно да се импортира и да се изпълнява както е.