Ръководство за DelayQueue

1. Общ преглед

В тази статия ще разгледаме конструкцията DelayQueue от пакета java.util.concurrent . Това е блокираща опашка, която може да се използва в програми за производители и потребители.

Той има много полезна характеристика - когато потребителят иска да вземе елемент от опашката, той може да го вземе само когато закъснението за този конкретен елемент е изтекло.

2. Прилагане на Delayed за елементи в DelayQueue

Всеки елемент, който искаме да вложим в DelayQueue, трябва да реализира интерфейса Delayed . Да кажем, че искаме да създадем клас DelayObject . Екземпляри от този клас ще бъдат поставени в DelayQueue.

Ще предадем данните String и delayInMilliseconds като и аргументи на неговия конструктор:

public class DelayObject implements Delayed { private String data; private long startTime; public DelayObject(String data, long delayInMilliseconds) { this.data = data; this.startTime = System.currentTimeMillis() + delayInMilliseconds; }

Определяме startTime - това е време, когато елементът трябва да се консумира от опашката. След това трябва да приложим метода getDelay () - той трябва да върне оставащото закъснение, свързано с този обект в дадената единица време.

Следователно трябва да използваме метода TimeUnit.convert () , за да върнем останалото закъснение в правилния TimeUnit:

@Override public long getDelay(TimeUnit unit) { long diff = startTime - System.currentTimeMillis(); return unit.convert(diff, TimeUnit.MILLISECONDS); }

Когато потребителят се опита да вземе елемент от опашката, DelayQueue ще изпълни getDelay (), за да разбере дали този елемент е разрешен да бъде върнат от опашката. Ако методът getDelay () ще върне нула или отрицателно число, това означава, че той може да бъде извлечен от опашката.

Също така трябва да приложим метода compareTo () , тъй като елементите в DelayQueue ще бъдат сортирани според времето на изтичане. Елементът, който ще изтече първи, се съхранява в началото на опашката, а елементът с най-голямо време на изтичане се държи в опашката на опашката:

@Override public int compareTo(Delayed o) { return Ints.saturatedCast( this.startTime - ((DelayObject) o).startTime); }

3. DelayQueue C onsumer и Producer

За да можем да тестваме нашата DelayQueue , трябва да внедрим логика на производител и потребител. Класът на производител приема опашката, броя на елементите, които трябва да произведе, и закъснението на всяко съобщение в милисекунди като аргументи.

След това, когато методът run () бъде извикан, той поставя елементи в опашката и спи за 500 милисекунди след всяко пускане:

public class DelayQueueProducer implements Runnable { private BlockingQueue queue; private Integer numberOfElementsToProduce; private Integer delayOfEachProducedMessageMilliseconds; // standard constructor @Override public void run() { for (int i = 0; i < numberOfElementsToProduce; i++) { DelayObject object = new DelayObject( UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds); System.out.println("Put object: " + object); try { queue.put(object); Thread.sleep(500); } catch (InterruptedException ie) { ie.printStackTrace(); } } } }

Потребителското внедряване е много подобно, но също така отчита броя на съобщенията, които са били консумирани:

public class DelayQueueConsumer implements Runnable { private BlockingQueue queue; private Integer numberOfElementsToTake; public AtomicInteger numberOfConsumedElements = new AtomicInteger(); // standard constructors @Override public void run() { for (int i = 0; i < numberOfElementsToTake; i++) { try { DelayObject object = queue.take(); numberOfConsumedElements.incrementAndGet(); System.out.println("Consumer take: " + object); } catch (InterruptedException e) { e.printStackTrace(); } } } }

4. Тест за използване на DelayQueue

За да тестваме поведението на DelayQueue, ще създадем една нишка производител и една нишка потребител.

Производителят ще постави () два обекта в опашката със закъснение от 500 милисекунди. Тестът твърди, че потребителят е консумирал две съобщения:

@Test public void givenDelayQueue_whenProduceElement _thenShouldConsumeAfterGivenDelay() throws InterruptedException { // given ExecutorService executor = Executors.newFixedThreadPool(2); BlockingQueue queue = new DelayQueue(); int numberOfElementsToProduce = 2; int delayOfEachProducedMessageMilliseconds = 500; DelayQueueConsumer consumer = new DelayQueueConsumer( queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds); // when executor.submit(producer); executor.submit(consumer); // then executor.awaitTermination(5, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce); }

Можем да забележим, че стартирането на тази програма ще даде следния резултат:

Put object: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007} Consumer take: {data='86046157-e8a0-49b2-9cbb-8326124bcab8', startTime=1494069868007} Put object: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512} Consumer take: {data='d47927ef-18c7-449b-b491-5ff30e6795ed', startTime=1494069868512}

Производителят поставя обекта и след известно време се консумира първият обект, за който изтече закъснението.

Същата ситуация се случи и за втория елемент.

5. Потребителят не може да консумира в дадения момент

Да кажем, че имаме производител, който произвежда елемент, който ще изтече след 10 секунди :

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = 10_000; DelayQueueConsumer consumer = new DelayQueueConsumer( queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

Ще започнем нашия тест, но той ще приключи след 5 секунди. Поради характеристиките на DelayQueue, потребителят няма да може да използва съобщението от опашката, тъй като елементът все още не е изтекъл:

executor.submit(producer); executor.submit(consumer); executor.awaitTermination(5, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), 0);

Имайте предвид, че числото на потребителя numberOfConsumedElements има стойност, равна на нула.

6. Изработване на елемент с незабавно изтичане

Когато реализациите на метода за забавено съобщение getDelay () връщат отрицателно число, това означава, че даденият елемент вече е изтекъл. В тази ситуация производителят ще консумира този елемент незабавно.

Можем да тестваме ситуацията на създаване на елемент с отрицателно закъснение:

int numberOfElementsToProduce = 1; int delayOfEachProducedMessageMilliseconds = -10_000; DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce); DelayQueueProducer producer = new DelayQueueProducer( queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

Когато стартираме тестовия случай, потребителят ще консумира елемента веднага, защото той вече е изтекъл:

executor.submit(producer); executor.submit(consumer); executor.awaitTermination(1, TimeUnit.SECONDS); executor.shutdown(); assertEquals(consumer.numberOfConsumedElements.get(), 1);

7. Заключение

В тази статия разглеждахме конструкцията DelayQueue от пакета java.util.concurrent .

Ние внедри Забавена елемент, който е бил произведен и използван от опашката.

Използвахме изпълнението на DelayQueue, за да консумираме елементи с изтекъл срок на годност.

Внедряването на всички тези примери и кодови фрагменти може да се намери в проекта GitHub - който е проект на Maven, така че трябва да е лесно да се импортира и да се изпълнява както е.