Семафори в Java

1. Общ преглед

В този бърз урок ще изследваме основите на семафорите и мутексите в Java.

2. Семафор

Ще започнем с java.util.concurrent.Semaphore. Можем да използваме семафори, за да ограничим броя на едновременните нишки с достъп до определен ресурс.

В следващия пример ще приложим проста опашка за влизане, за да ограничим броя на потребителите в системата:

class LoginQueueUsingSemaphore { private Semaphore semaphore; public LoginQueueUsingSemaphore(int slotLimit) { semaphore = new Semaphore(slotLimit); } boolean tryLogin() { return semaphore.tryAcquire(); } void logout() { semaphore.release(); } int availableSlots() { return semaphore.availablePermits(); } }

Забележете как използвахме следните методи:

  • tryAcquire () - връща true, ако разрешението е налично незабавно и го придобива, иначе връща false, но придобива () придобива разрешение и блокира, докато такова е налично
  • release () - освобождаване на разрешение
  • availablePermissions () - връща броя на наличните текущи разрешения

За да тестваме нашата опашка за вход, първо ще се опитаме да достигнем лимита и да проверим дали следващият опит за влизане ще бъде блокиран:

@Test public void givenLoginQueue_whenReachLimit_thenBlocked() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); assertFalse(loginQueue.tryLogin()); }

След това ще видим дали има свободни слотове след излизане:

@Test public void givenLoginQueue_whenLogout_thenSlotsAvailable() { int slots = 10; ExecutorService executorService = Executors.newFixedThreadPool(slots); LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(loginQueue::tryLogin)); executorService.shutdown(); assertEquals(0, loginQueue.availableSlots()); loginQueue.logout(); assertTrue(loginQueue.availableSlots() > 0); assertTrue(loginQueue.tryLogin()); }

3. Времеви семафор

След това ще обсъдим Apache Commons TimedSemaphore. TimedSemaphore позволява редица разрешения като обикновен Semaphore, но за даден период от време, след този период нулирането на времето и всички разрешения се освобождават.

Можем да използваме TimedSemaphore, за да изградим проста опашка за забавяне, както следва:

class DelayQueueUsingTimedSemaphore { private TimedSemaphore semaphore; DelayQueueUsingTimedSemaphore(long period, int slotLimit) { semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit); } boolean tryAdd() { return semaphore.tryAcquire(); } int availableSlots() { return semaphore.getAvailablePermits(); } }

Когато използваме опашка със забавяне с една секунда като период от време и след като използваме всички слотове в рамките на една секунда, не трябва да има налични:

public void givenDelayQueue_whenReachLimit_thenBlocked() { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); assertFalse(delayQueue.tryAdd()); }

Но след като спи известно време, семафорът трябва да нулира и освободи разрешенията :

@Test public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException { int slots = 50; ExecutorService executorService = Executors.newFixedThreadPool(slots); DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots); IntStream.range(0, slots) .forEach(user -> executorService.execute(delayQueue::tryAdd)); executorService.shutdown(); assertEquals(0, delayQueue.availableSlots()); Thread.sleep(1000); assertTrue(delayQueue.availableSlots() > 0); assertTrue(delayQueue.tryAdd()); }

4. Семафор срещу Mutex

Mutex действа подобно на двоичен семафор, ние можем да го използваме за осъществяване на взаимно изключване.

В следващия пример ще използваме прост двоичен семафор, за да изградим брояч:

class CounterUsingMutex { private Semaphore mutex; private int count; CounterUsingMutex() { mutex = new Semaphore(1); count = 0; } void increase() throws InterruptedException { mutex.acquire(); this.count = this.count + 1; Thread.sleep(1000); mutex.release(); } int getCount() { return this.count; } boolean hasQueuedThreads() { return mutex.hasQueuedThreads(); } }

Когато много нишки се опитват да получат достъп до брояча наведнъж, те просто ще бъдат блокирани на опашка :

@Test public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); }

Когато изчакаме, всички нишки ще имат достъп до брояча и няма останали нишки в опашката:

@Test public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException { int count = 5; ExecutorService executorService = Executors.newFixedThreadPool(count); CounterUsingMutex counter = new CounterUsingMutex(); IntStream.range(0, count) .forEach(user -> executorService.execute(() -> { try { counter.increase(); } catch (InterruptedException e) { e.printStackTrace(); } })); executorService.shutdown(); assertTrue(counter.hasQueuedThreads()); Thread.sleep(5000); assertFalse(counter.hasQueuedThreads()); assertEquals(count, counter.getCount()); }

5. Заключение

В тази статия разгледахме основите на семафорите в Java.

Както винаги, пълният изходен код е достъпен в GitHub.