1. Въведение
Тази статия е поглед върху пуловете от нишки в Java - като се започне с различните реализации в стандартната библиотека на Java и след това се разгледа библиотеката на Гуава на Google.
2. Басейнът с нишки
В Java нишките се съпоставят с нишки на системно ниво, които са ресурси на операционната система. Ако създавате нишки неконтролируемо, може бързо да изчерпите тези ресурси.
Превключването на контекста между нишките се извършва и от операционната система - за да се емулира паралелизъм. Опростеният изглед е, че - колкото повече нишки хвърляте хайвера си, толкова по-малко време отделя всяка нишка в реална работа.
Шаблонът Thread Pool помага да се спестят ресурси в многонишко приложение, а също така да се съдържа паралелизмът в определени предварително дефинирани граници.
Когато използвате пул от нишки, пишете паралелния си код под формата на паралелни задачи и ги изпращате за изпълнение на екземпляр на пул от нишки . Този екземпляр контролира няколко повторно използвани нишки за изпълнение на тези задачи.
Моделът ви позволява да контролирате броя нишки, които приложението създава , техния жизнен цикъл, както и да планирате изпълнението на задачите и да държите входящите задачи в опашка.
3. Пулове за нишки в Java
3.1. Изпълнители , Изпълнител и ИзпълнителСервиз
Най- изпълнители помощник клас съдържа няколко метода за създаване на предварително конфигурирани конец басейн случаи за вас. Тези класове са добро място за започване - използвайте го, ако не е необходимо да прилагате никакви персонализирани фини настройки.
Интерфейсите Executor и ExecutorService се използват за работа с различни реализации на пула от нишки в Java. Обикновено трябва да държите кода си отделен от действителното изпълнение на пула от нишки и да използвате тези интерфейси в цялото си приложение.
Интерфейсът на изпълнителя има един метод за изпълнение , за да изпрати изпълними екземпляри за изпълнение.
Ето един бърз пример за това как можете да използвате приложния програмен интерфейс (API) на Executors, за да придобиете екземпляр на Executor, подкрепен от пул от една нишка и неограничена опашка за последователно изпълнение на задачи. Тук изпълняваме една задача, която просто отпечатва “ Hello World ” на екрана. Задачата е представен като ламбда (функция Java 8), която се заключи, че Изпълнима .
Executor executor = Executors.newSingleThreadExecutor(); executor.execute(() -> System.out.println("Hello World"));
Интерфейсът ExecutorService съдържа голям брой методи за контрол на хода на задачите и управление на прекратяването на услугата . Използвайки този интерфейс, можете да изпратите задачите за изпълнение и също така да контролирате тяхното изпълнение с помощта на върнатия бъдещ екземпляр.
В следващия пример създаваме ExecutorService , изпращаме задача и след това използваме върнатия метод за получаване на Future 's , за да изчакаме, докато изпратената задача приключи и стойността се върне:
ExecutorService executorService = Executors.newFixedThreadPool(10); Future future = executorService.submit(() -> "Hello World"); // some operations String result = future.get();
Разбира се, в сценарий от реалния живот обикновено не искате да се обадите на future.get () веднага, но отлагате извикването му, докато всъщност не се нуждаете от стойността на изчислението.
Методът за изпращане е претоварен, за да вземе Runnable или Callable, които са функционални интерфейси и могат да бъдат предадени като lambdas (като се започне с Java 8).
Единственият метод на Runnable не създава изключение и не връща стойност. Интерфейсът Callable може да е по-удобен, тъй като ни позволява да хвърлим изключение и да върнем стойност.
И накрая - за да позволите на компилатора да изведе типа Callable , просто върнете стойност от ламбда.
За повече примери за използване на интерфейса ExecutorService и фючърси, погледнете „Ръководство за Java ExecutorService“.
3.2. ThreadPoolExecutor
В ThreadPoolExecutor е разтегателен изпълнение конец басейн с много параметри и куки за фина настройка.
Основните параметри на конфигурацията, които ще обсъдим тук, са: corePoolSize , maximumPoolSize и keepAliveTime .
Пулът се състои от фиксиран брой основни нишки, които се държат през цялото време, и някои прекомерни нишки, които могат да бъдат породени и след това прекратени, когато вече не са необходими. Параметърът corePoolSize е броят на основните нишки, които ще бъдат създадени и запазени в пула. Когато влезе нова задача, ако всички основни нишки са заети и вътрешната опашка е пълна, тогава на пула се разрешава да нарасне до maximumPoolSize .
Параметърът keepAliveTime е интервалът от време, за който е позволено да съществуват прекомерни нишки (с екземпляр, надвишаващ corePoolSize ) в неактивно състояние. По подразбиране ThreadPoolExecutor разглежда само неядрени нишки за премахване. За да приложим същата политика за премахване към основните нишки, можем да използваме метода allowCoreThreadTimeOut (true) .
Тези параметри обхващат широк спектър от случаи на употреба, но най-типичните конфигурации са предварително дефинирани в статичните методи на Изпълнителите .
Например , newFixedThreadPool метод създава ThreadPoolExecutor с равен corePoolSize и maximumPoolSize стойности на параметрите и нула KeepAliveTime. Това означава, че броят на нишките в този пул от нишки е винаги един и същ:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(2, executor.getPoolSize()); assertEquals(1, executor.getQueue().size());
В горния пример създаваме екземпляр на ThreadPoolExecutor с фиксиран брой нишки от 2. Това означава, че ако броят на едновременно изпълняваните задачи е по-малък или равен на два през цялото време, тогава те се изпълняват веднага. В противен случай някои от тези задачи могат да бъдат поставени в опашка, за да изчакат своя ред .
Създадохме три Callable задачи, които имитират тежка работа, като спя за 1000 милисекунди. Първите две задачи ще бъдат изпълнени наведнъж, а третата ще трябва да изчака на опашката. Можем да го проверим, като извикаме методите getPoolSize () и getQueue (). Size () веднага след изпращане на задачите.
Друг предварително конфигуриран ThreadPoolExecutor може да бъде създаден с метода Executors.newCachedThreadPool () . Този метод изобщо не получава брой нишки. В corePoolSize всъщност трябва да е 0, а maximumPoolSize е настроен да Integer.MAX_VALUE за този случай. В KeepAliveTime е 60 секунди за това.
Тези стойности на параметри означават, че кешираният пул от нишки може да расте без граници, за да побере произволен брой подадени задачи . Но когато нишките вече не са необходими, те ще бъдат изхвърлени след 60 секунди бездействие. Типичен случай на употреба е, когато имате много краткотрайни задачи във вашето приложение.
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); executor.submit(() -> { Thread.sleep(1000); return null; }); assertEquals(3, executor.getPoolSize()); assertEquals(0, executor.getQueue().size());
Размерът на опашката в горния пример винаги ще бъде нула, тъй като вътрешно се използва екземпляр SynchronousQueue . В SynchronousQueue двойки операции за вмъкване и премахване винаги се извършват едновременно, така че опашката всъщност никога не съдържа нищо.
The Executors.newSingleThreadExecutor() API creates another typical form of ThreadPoolExecutor containing a single thread. The single thread executor is ideal for creating an event loop. The corePoolSize and maximumPoolSize parameters are equal to 1, and the keepAliveTime is zero.
Tasks in the above example will be executed sequentially, so the flag value will be 2 after the task's completion:
AtomicInteger counter = new AtomicInteger(); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { counter.set(1); }); executor.submit(() -> { counter.compareAndSet(1, 2); });
Additionally, this ThreadPoolExecutor is decorated with an immutable wrapper, so it cannot be reconfigured after creation. Note that also this is the reason we cannot cast it to a ThreadPoolExecutor.
3.3. ScheduledThreadPoolExecutor
The ScheduledThreadPoolExecutor extends the ThreadPoolExecutor class and also implements the ScheduledExecutorService interface with several additional methods:
- schedule method allows to execute a task once after a specified delay;
- scheduleAtFixedRate method allows to execute a task after a specified initial delay and then execute it repeatedly with a certain period; the period argument is the time measured between the starting times of the tasks, so the execution rate is fixed;
- scheduleWithFixedDelay method is similar to scheduleAtFixedRate in that it repeatedly executes the given task, but the specified delay is measured between the end of the previous task and the start of the next; the execution rate may vary depending on the time it takes to execute any given task.
The Executors.newScheduledThreadPool() method is typically used to create a ScheduledThreadPoolExecutor with a given corePoolSize, unbounded maximumPoolSize and zero keepAliveTime. Here's how to schedule a task for execution in 500 milliseconds:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); executor.schedule(() -> { System.out.println("Hello World"); }, 500, TimeUnit.MILLISECONDS);
The following code shows how to execute a task after 500 milliseconds delay and then repeat it every 100 milliseconds. After scheduling the task, we wait until it fires three times using the CountDownLatch lock, then cancel it using the Future.cancel() method.
CountDownLatch lock = new CountDownLatch(3); ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); ScheduledFuture future = executor.scheduleAtFixedRate(() -> { System.out.println("Hello World"); lock.countDown(); }, 500, 100, TimeUnit.MILLISECONDS); lock.await(1000, TimeUnit.MILLISECONDS); future.cancel(true);
3.4. ForkJoinPool
ForkJoinPool is the central part of the fork/join framework introduced in Java 7. It solves a common problem of spawning multiple tasks in recursive algorithms. Using a simple ThreadPoolExecutor, you will run out of threads quickly, as every task or subtask requires its own thread to run.
In a fork/join framework, any task can spawn (fork) a number of subtasks and wait for their completion using the join method. The benefit of the fork/join framework is that it does not create a new thread for each task or subtask, implementing the Work Stealing algorithm instead. This framework is thoroughly described in the article “Guide to the Fork/Join Framework in Java”
Let’s look at a simple example of using ForkJoinPool to traverse a tree of nodes and calculate the sum of all leaf values. Here’s a simple implementation of a tree consisting of a node, an int value and a set of child nodes:
static class TreeNode { int value; Set children; TreeNode(int value, TreeNode... children) { this.value = value; this.children = Sets.newHashSet(children); } }
Now if we want to sum all values in a tree in parallel, we need to implement a RecursiveTask interface. Each task receives its own node and adds its value to the sum of values of its children. To calculate the sum of children values, task implementation does the following:
- streams the children set,
- maps over this stream, creating a new CountingTask for each element,
- executes each subtask by forking it,
- collects the results by calling the join method on each forked task,
- sums the results using the Collectors.summingInt collector.
public static class CountingTask extends RecursiveTask { private final TreeNode node; public CountingTask(TreeNode node) { this.node = node; } @Override protected Integer compute() { return node.value + node.children.stream() .map(childNode -> new CountingTask(childNode).fork()) .collect(Collectors.summingInt(ForkJoinTask::join)); } }
The code to run the calculation on an actual tree is very simple:
TreeNode tree = new TreeNode(5, new TreeNode(3), new TreeNode(2, new TreeNode(2), new TreeNode(8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool(); int sum = forkJoinPool.invoke(new CountingTask(tree));
4. Thread Pool's Implementation in Guava
Guava is a popular Google library of utilities. It has many useful concurrency classes, including several handy implementations of ExecutorService. The implementing classes are not accessible for direct instantiation or subclassing, so the only entry point for creating their instances is the MoreExecutors helper class.
4.1. Adding Guava as a Maven Dependency
Add the following dependency to your Maven pom file to include the Guava library to your project. You can find the latest version of Guava library in the Maven Central repository:
com.google.guava guava 19.0
4.2. Direct Executor and Direct Executor Service
Sometimes you want to execute the task either in the current thread or in a thread pool, depending on some conditions. You would prefer to use a single Executor interface and just switch the implementation. Although it is not so hard to come up with an implementation of Executor or ExecutorService that executes the tasks in the current thread, it still requires writing some boilerplate code.
Gladly, Guava provides predefined instances for us.
Here's an example that demonstrates the execution of a task in the same thread. Although the provided task sleeps for 500 milliseconds, it blocks the current thread, and the result is available immediately after the execute call is finished:
Executor executor = MoreExecutors.directExecutor(); AtomicBoolean executed = new AtomicBoolean(); executor.execute(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } executed.set(true); }); assertTrue(executed.get());
The instance returned by the directExecutor() method is actually a static singleton, so using this method does not provide any overhead on object creation at all.
You should prefer this method to the MoreExecutors.newDirectExecutorService() because that API creates a full-fledged executor service implementation on every call.
4.3. Exiting Executor Services
Another common problem is shutting down the virtual machine while a thread pool is still running its tasks. Even with a cancellation mechanism in place, there is no guarantee that the tasks will behave nicely and stop their work when the executor service shuts down. This may cause JVM to hang indefinitely while the tasks keep doing their work.
To solve this problem, Guava introduces a family of exiting executor services. They are based on daemon threads that terminate together with the JVM.
These services also add a shutdown hook with the Runtime.getRuntime().addShutdownHook() method and prevent the VM from terminating for a configured amount of time before giving up on hung tasks.
In the following example, we're submitting the task that contains an infinite loop, but we use an exiting executor service with a configured time of 100 milliseconds to wait for the tasks upon VM termination. Without the exitingExecutorService in place, this task would cause the VM to hang indefinitely:
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); ExecutorService executorService = MoreExecutors.getExitingExecutorService(executor, 100, TimeUnit.MILLISECONDS); executorService.submit(() -> { while (true) { } });
4.4. Listening Decorators
Listening decorators allow you to wrap the ExecutorService and receive ListenableFuture instances upon task submission instead of simple Future instances. The ListenableFuture interface extends Future and has a single additional method addListener. This method allows adding a listener that is called upon future completion.
Рядко ще искате да използвате директно метода ListenableFuture.addListener () , но е от съществено значение за повечето помощни методи в класа на помощната програма Futures . Например с метода Futures.allAsList () можете да комбинирате няколко екземпляра ListenableFuture в един ListenableFuture, който завършва след успешното завършване на всички комбинирани фючърси:
ExecutorService executorService = Executors.newCachedThreadPool(); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(executorService); ListenableFuture future1 = listeningExecutorService.submit(() -> "Hello"); ListenableFuture future2 = listeningExecutorService.submit(() -> "World"); String greeting = Futures.allAsList(future1, future2).get() .stream() .collect(Collectors.joining(" ")); assertEquals("Hello World", greeting);
5. Заключение
В тази статия обсъдихме модела Thread Pool и неговите внедрения в стандартната библиотека Java и в библиотеката Guava на Google.
Изходният код на статията е достъпен в GitHub.