1. Общ преглед
В тази статия ще разгледаме една от най-полезните конструкции java.util.concurrent за решаване на едновременния проблем производител-потребител. Ще разгледаме API на интерфейса BlockingQueue и как методите от този интерфейс улесняват писането на едновременни програми.
По-нататък в статията ще покажем пример за проста програма, която има множество нишки на производител и множество нишки на потребителя.
2. Типове BlockingQueue
Можем да различим два вида BlockingQueue :
- неограничена опашка - може да расте почти безкрайно
- ограничена опашка - с дефиниран максимален капацитет
2.1. Неограничена опашка
Създаването на неограничени опашки е просто:
BlockingQueue blockingQueue = new LinkedBlockingDeque();
Капацитетът на blockingQueue ще бъде зададен на Integer.MAX_VALUE. Всички операции, които добавят елемент към неограничената опашка, никога няма да блокират, поради което може да нарасне до много голям размер.
Най-важното при проектирането на програма производител-потребител, използваща неограничена BlockingQueue, е, че потребителите трябва да могат да консумират съобщения толкова бързо, колкото производителите добавят съобщения към опашката. В противен случай паметта може да се запълни и ще получим изключение OutOfMemory .
2.2. Ограничена опашка
Вторият тип опашки е ограничената опашка. Можем да създадем такива опашки, като предадем капацитета като аргумент на конструктор:
BlockingQueue blockingQueue = new LinkedBlockingDeque(10);
Тук имаме blockingQueue с капацитет равен на 10. Това означава, че когато производителят се опитва да добави елемент към вече пълна опашка, в зависимост от метод, който е бил използван за добавянето му ( offer () , add () или put () ), той ще блокира, докато не стане налично място за вмъкване на обект. В противен случай операциите ще се провалят.
Използването на ограничена опашка е добър начин за проектиране на едновременни програми, защото когато вмъкваме елемент във вече пълна опашка, тези операции трябва да изчакат, докато потребителите наваксат и предоставят малко място в опашката. Дава ни дроселиране без никакви усилия от наша страна.
3. API на BlockingQueue
В интерфейса BlockingQueue има два типа методи - методи, отговорни за добавяне на елементи към опашката и методи, които извличат тези елементи. Всеки метод от тези две групи се държи по различен начин, в случай че опашката е пълна / празна.
3.1. Добавяне на елементи
- add () - връща true, ако вмъкването е било успешно, в противен случай изхвърля IllegalStateException
- put () - вмъква посочения елемент в опашка в очакване на свободен слот, ако е необходимо
- offer () - връща true, ако вмъкването е било успешно, в противен случай е false
- оферта (E e, long timeout, TimeUnit unit) - опитва се да вмъкне елемент в опашка и изчаква наличен слот в рамките на определен таймаут
3.2. Извличане на елементи
- take () - изчаква head елемент от опашката и го премахва. Ако опашката е празна, тя блокира и изчаква даден елемент да стане достъпен
- анкета (дълго изчакване, единица TimeUnit) - извлича и премахва главата на опашката, изчаква до определеното време за изчакване, ако е необходимо елементът да стане достъпен. Връща нула след изчакване
Тези методи са най-важните градивни елементи от интерфейса BlockingQueue при изграждането на програми производител-потребител.
4. Пример за многонишков производител-потребител
Нека създадем програма, която се състои от две части - производител и потребител.
Производителят ще произведе произволно число от 0 до 100 и ще постави това число в BlockingQueue . Ще имаме 4 нишки на производител и ще използваме метода put () , за да блокираме, докато в опашката няма място.
Важното нещо, което трябва да запомните, е, че трябва да спрем потребителските нишки да чакат елемент да се появява в опашка за неопределено време.
Добра техника за сигнализиране от производителя на потребителя, че няма повече съобщения за обработка, е изпращането на специално съобщение, наречено отровно хапче. Трябва да изпратим толкова отровни хапчета, колкото имаме потребители. Тогава, когато потребителят вземе от опашката съобщението за специални отровни хапчета, изпълнението ще завърши изящно.
Нека разгледаме клас на производител:
public class NumbersProducer implements Runnable { private BlockingQueue numbersQueue; private final int poisonPill; private final int poisonPillPerProducer; public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run() { try { generateNumbers(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put(poisonPill); } } }
Нашият конструктор на производител приема като аргумент BlockingQueue, който се използва за координиране на обработката между производителя и потребителя. Виждаме, че методът createNumbers () ще постави 100 елемента в опашка. Необходимо е също съобщение за отровно хапче, за да се знае какъв тип съобщение трябва да се постави в опашка, когато изпълнението ще приключи. Това съобщение трябва да бъде поставено в опашка за отрови за отроваPillPerProducer .
Всеки потребител ще вземе елемент от BlockingQueue, използвайки метода take () , така че ще блокира, докато има елемент в опашката. След като вземе цяло число от опашката, той проверява дали съобщението е отровно хапче, ако да, изпълнението на нишка е завършено. В противен случай ще отпечата резултата на стандартен изход заедно с името на текущата нишка.
Това ще ни даде представа за вътрешната работа на нашите потребители:
public class NumbersConsumer implements Runnable { private BlockingQueue queue; private final int poisonPill; public NumbersConsumer(BlockingQueue queue, int poisonPill) { this.queue = queue; this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); if (number.equals(poisonPill)) { return; } System.out.println(Thread.currentThread().getName() + " result: " + number); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
Важното нещо, което трябва да забележите, е използването на опашка. Същото като в конструктора на производител, опашката се предава като аргумент. Можем да го направим, защото BlockingQueue може да се споделя между нишки без изрична синхронизация.
Now that we have our producer and consumer, we can start our program. We need to define the queue's capacity, and we set it to 100 elements.
We want to have 4 producer threads and a number of consumers threads will be equal to the number of available processors:
int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS % N_PRODUCERS; BlockingQueue queue = new LinkedBlockingQueue(BOUND); for (int i = 1; i < N_PRODUCERS; i++) { new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } for (int j = 0; j < N_CONSUMERS; j++) { new Thread(new NumbersConsumer(queue, poisonPill)).start(); } new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
BlockingQueue is created using construct with a capacity. We're creating 4 producers and N consumers. We specify our poison pill message to be an Integer.MAX_VALUE because such value will never be sent by our producer under normal working conditions. The most important thing to notice here is that BlockingQueue is used to coordinate work between them.
Когато стартираме програмата, 4 нишки на производител ще поставят произволни цели числа в BlockingQueue и потребителите ще вземат тези елементи от опашката. Всяка нишка ще отпечата на стандартен изход името на нишката заедно с резултат.
5. Заключение
Тази статия показва практическо използване на BlockingQueue и обяснява методите, които се използват за добавяне и извличане на елементи от него. Също така показахме как да изградим многонишкова програма производител-потребител, използвайки BlockingQueue за координиране на работата между производители и потребители.
Внедряването на всички тези примери и кодови фрагменти може да бъде намерено в проекта GitHub - това е проект, базиран на Maven, така че трябва да е лесно да се импортира и да се изпълнява както е.