Ръководство за CountDownLatch в Java

1. Въведение

В тази статия ще дадем ръководство за класа CountDownLatch и ще демонстрираме как може да се използва в няколко практически примера.

По същество, използвайки CountDownLatch, можем да накараме нишка да блокира, докато другите нишки не изпълнят дадена задача.

2. Използване при едновременно програмиране

Просто казано, CountDownLatch има поле за брояч , което можете да намалите според изискванията ни. След това можем да го използваме, за да блокира извикваща нишка, докато не бъде отброена до нула.

Ако правехме паралелна обработка, бихме могли да създадем екземпляр на CountDownLatch със същата стойност за брояча като брой нишки, през които искаме да работим. След това бихме могли просто да извикаме обратно броене () след завършване на всяка нишка, гарантирайки, че извикване на зависима нишка await () ще блокира, докато работните нишки не приключат.

3. Изчакваме да завърши пул от нишки

Нека изпробваме този модел, като създадем Worker и използваме поле CountDownLatch, за да сигнализираме, когато завърши:

public class Worker implements Runnable { private List outputScraper; private CountDownLatch countDownLatch; public Worker(List outputScraper, CountDownLatch countDownLatch) { this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @Override public void run() { doSomeWork(); outputScraper.add("Counted down"); countDownLatch.countDown(); } }

След това нека създадем тест, за да докажем, че можем да получим CountDownLatch, за да изчакаме завършването на екземплярите на Worker :

@Test public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new Worker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); outputScraper.add("Latch released"); assertThat(outputScraper) .containsExactly( "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released" ); }

Естествено „Latch освободен“ винаги ще бъде последният изход - тъй като зависи от пускането на CountDownLatch .

Имайте предвид, че ако не извикаме await () , няма да можем да гарантираме подреждането на изпълнението на нишките, така че тестът ще се провали на случаен принцип.

4. Набор от нишки, които чакат да започнат

Ако взехме предишния пример, но този път стартирахме хиляди нишки вместо пет, вероятно много от по-ранните ще са завършили обработката, преди дори да сме извикали start () за по-късните. Това може да затрудни опита за възпроизвеждане на проблем със съвпадението, тъй като не бихме могли да накараме всички нишки да работят паралелно.

За да заобиколим това, нека караме CountdownLatch да работи по различен начин, отколкото в предишния пример. Вместо да блокираме родителска нишка, докато някои дъщерни нишки завършат, можем да блокираме всяка дъщерна нишка, докато всички останали стартират.

Нека модифицираме нашия метод run () , така че да блокира преди обработка:

public class WaitingWorker implements Runnable { private List outputScraper; private CountDownLatch readyThreadCounter; private CountDownLatch callingThreadBlocker; private CountDownLatch completedThreadCounter; public WaitingWorker( List outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) { this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completedThreadCounter; } @Override public void run() { readyThreadCounter.countDown(); try { callingThreadBlocker.await(); doSomeWork(); outputScraper.add("Counted down"); } catch (InterruptedException e) { e.printStackTrace(); } finally { completedThreadCounter.countDown(); } } }

Сега, нека модифицираме нашия тест, така че той да блокира, докато всички работници стартират, да отблокира работниците и след това да блокира, докато работниците приключат:

@Test public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch readyThreadCounter = new CountDownLatch(5); CountDownLatch callingThreadBlocker = new CountDownLatch(1); CountDownLatch completedThreadCounter = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new WaitingWorker( outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter))) .limit(5) .collect(toList()); workers.forEach(Thread::start); readyThreadCounter.await(); outputScraper.add("Workers ready"); callingThreadBlocker.countDown(); completedThreadCounter.await(); outputScraper.add("Workers complete"); assertThat(outputScraper) .containsExactly( "Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete" ); }

Този модел е наистина полезен за опити за възпроизвеждане на грешки при съвпадение, тъй като може да се използва за принуждаване на хиляди нишки да се опитват да изпълняват паралелно някаква логика.

5. Ранно прекратяване на CountdownLatch

Понякога може да изпаднем в ситуация, при която работниците прекратяват по погрешка, преди да отброят CountDownLatch. Това може да доведе до това, че никога няма да достигне нула и await () никога не завършва:

@Override public void run() { if (true) { throw new RuntimeException("Oh dear, I'm a BrokenWorker"); } countDownLatch.countDown(); outputScraper.add("Counted down"); }

Нека модифицираме нашия по-ранен тест, за да използваме BrokenWorker, за да покажем как await () ще блокира завинаги:

@Test public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck() throws InterruptedException { List outputScraper = Collections.synchronizedList(new ArrayList()); CountDownLatch countDownLatch = new CountDownLatch(5); List workers = Stream .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch))) .limit(5) .collect(toList()); workers.forEach(Thread::start); countDownLatch.await(); }

Ясно е, че това не е поведението, което искаме - би било много по-добре приложението да продължи, отколкото безкрайно блокиране.

За да заобиколим това, нека добавим аргумент за изчакване към нашето обаждане към await ().

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS); assertThat(completed).isFalse();

Както виждаме, тестът в крайна сметка ще изтече и await () ще върне false .

6. Заключение

В това кратко ръководство демонстрирахме как можем да използваме CountDownLatch, за да блокираме нишка, докато другите нишки завършат някаква обработка.

Също така показахме как може да се използва за отстраняване на грешки при проблеми с едновременността, като се уверим, че нишките работят паралелно.

Изпълнението на тези примери може да бъде намерено в GitHub; това е проект, базиран на Maven, така че трябва да се изпълнява лесно, както е.