Ръководство за рамката Fork / Join в Java

1. Общ преглед

Рамката fork / join беше представена в Java 7. Тя предоставя инструменти, които да спомогнат за ускоряване на паралелната обработка, като се опитва да използва всички налични процесорни ядра - което се постига чрез подход разделяне и завладяване .

На практика това означава, че рамката първо се „разклонява“ , рекурсивно разбивайки задачата на по-малки независими подзадачи, докато те са достатъчно прости, за да бъдат изпълнени асинхронно.

След това започва частта „присъединяване“ , при която резултатите от всички подзадачи се свързват рекурсивно в един резултат или в случай на задача, която се връща невалидна, програмата просто изчаква, докато се изпълни всяка подзадача.

За да осигури ефективно паралелно изпълнение, рамката fork / join използва пул от нишки, наречен ForkJoinPool , който управлява работни нишки от тип ForkJoinWorkerThread .

2. ForkJoinPool

В ForkJoinPool е сърцето на рамката. Това е изпълнение на ExecutorService, което управлява работни нишки и ни предоставя инструменти за получаване на информация за състоянието и производителността на пула от нишки.

Работните нишки могат да изпълняват само една задача наведнъж, но ForkJoinPool не създава отделна нишка за всяка отделна подзадача. Вместо това, всяка нишка в пула има своя собствена опашка с двоен край (или deque, изразена колода ), която съхранява задачи.

Тази архитектура е жизненоважна за балансиране на натоварването на нишката с помощта на алгоритъма за кражба на работа.

2.1. Алгоритъм за кражба на работа

Най-просто казано - безплатните нишки се опитват да „откраднат“ работа от deques на заети нишки.

По подразбиране работната нишка получава задачи от главата на собствения си deque. Когато е празен, нишката поема задача от опашката на deque на друга заета нишка или от глобалната опашка за влизане, тъй като тук е възможно да се намират най-големите парчета работа.

Този подход свежда до минимум възможността нишките да се конкурират за задачи. Също така намалява броя пъти, в които нишката ще трябва да търси работа, тъй като работи първо на най-големите налични парчета работа.

2.2. ForkJoinPool примерна

В Java 8 най-удобният начин за получаване на достъп до екземпляра на ForkJoinPool е използването на статичния му метод commonPool (). Както подсказва името му, това ще предостави препратка към общия пул, който е пул от нишки по подразбиране за всеки ForkJoinTask .

Според документацията на Oracle, използването на предварително дефинирания общ пул намалява консумацията на ресурси, тъй като това обезкуражава създаването на отделен пул от нишки за задача.

ForkJoinPool commonPool = ForkJoinPool.commonPool();

Същото поведение може да се постигне и в Java 7 чрез създаване на ForkJoinPool и присвояването му на публично статично поле на клас помощна програма:

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

Сега той може лесно да бъде достъпен:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

С конструкторите на ForkJoinPool е възможно да се създаде персонализиран пул от нишки със специфично ниво на паралелизъм, фабрика на нишки и манипулатор на изключения. В горния пример пулът има ниво на паралелизъм 2. Това означава, че пулът ще използва 2 процесорни ядра.

3. ForkJoinTask

ForkJoinTask е основният тип за задачи, изпълнявани в ForkJoinPool. На практика трябва да се разшири един от двата му подкласа: RecursiveAction за невалидни задачи и RecursiveTask за задачи, които връщат стойност.И двамата имат абстрактен метод compute (), в който е дефинирана логиката на задачата.

3.1. RecursiveAction - пример

В примера по-долу единицата работа, която трябва да бъде обработена, е представена от низ, наречен работно натоварване . За демонстрационни цели задачата е безсмислена: тя просто главни букви вписва и регистрира.

За да демонстрира поведението на разклонението на рамката, примерът разделя задачата, ако натоварването .length () е по-голямо от определен прагизползвайки метода createSubtask () .

Низът е разделен рекурсивно на поднизове, създавайки екземпляри CustomRecursiveTask, които се основават на тези поднизове.

В резултат на това методът връща Списък.

Списъкът се изпраща във ForkJoinPool, използвайки метода invokeAll () :

public class CustomRecursiveAction extends RecursiveAction { private String workload = ""; private static final int THRESHOLD = 4; private static Logger logger = Logger.getAnonymousLogger(); public CustomRecursiveAction(String workload) { this.workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask.invokeAll(createSubtasks()); } else { processing(workload); } } private List createSubtasks() { List subtasks = new ArrayList(); String partOne = workload.substring(0, workload.length() / 2); String partTwo = workload.substring(workload.length() / 2, workload.length()); subtasks.add(new CustomRecursiveAction(partOne)); subtasks.add(new CustomRecursiveAction(partTwo)); return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info("This result - (" + result + ") - was processed by " + Thread.currentThread().getName()); } }

Този модел може да се използва за разработване на собствени класове RecursiveAction . За да направите това, създайте обект, който представлява общото количество работа, изберете подходящ праг, дефинирайте метод за разделяне на работата и дефинирайте метод за извършване на работата.

3.2. Рекурсивна задача

За задачи, които връщат стойност, логиката тук е подобна, с изключение на това, че резултатът за всяка подзадача е обединен в един резултат:

public class CustomRecursiveTask extends RecursiveTask { private int[] arr; private static final int THRESHOLD = 20; public CustomRecursiveTask(int[] arr) { this.arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll(createSubtasks()) .stream() .mapToInt(ForkJoinTask::join) .sum(); } else { return processing(arr); } } private Collection createSubtasks() { List dividedTasks = new ArrayList(); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, 0, arr.length / 2))); dividedTasks.add(new CustomRecursiveTask( Arrays.copyOfRange(arr, arr.length / 2, arr.length))); return dividedTasks; } private Integer processing(int[] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a  a * 10) .sum(); } }

В този пример работата е представена от масив, съхраняван в полето arr на класа CustomRecursiveTask . Методът createSubtasks () рекурсивно разделя задачата на по-малки части от работата, докато всяка част е по-малка от прага . След това методът invokeAll () изпраща подзадачите в общия пул и връща списък с бъдеще .

За да задейства изпълнението, методът join () се извиква за всяка подзадача.

В този пример това се постига с помощта на Java 8 Stream API; на сума () метод се използва като представяне на комбиниране под резултати в крайния резултат.

4. Submitting Tasks to the ForkJoinPool

To submit tasks to the thread pool, few approaches can be used.

The submit() or execute()method (their use cases are the same):

forkJoinPool.execute(customRecursiveTask); int result = customRecursiveTask.join();

The invoke()method forks the task and waits for the result, and doesn’t need any manual joining:

int result = forkJoinPool.invoke(customRecursiveTask);

The invokeAll() method is the most convenient way to submit a sequence of ForkJoinTasks to the ForkJoinPool. It takes tasks as parameters (two tasks, var args, or a collection), forks then returns a collection of Future objects in the order in which they were produced.

Alternatively, you can use separate fork() and join() methods. The fork() method submits a task to a pool, but it doesn't trigger its execution. The join() method must be used for this purpose. In the case of RecursiveAction, the join() returns nothing but null; for RecursiveTask, it returns the result of the task's execution:

customRecursiveTaskFirst.fork(); result = customRecursiveTaskLast.join();

In our RecursiveTask example we used the invokeAll() method to submit a sequence of subtasks to the pool. The same job can be done with fork() and join(), though this has consequences for the ordering of the results.

За да се избегне объркване, обикновено е добра идея да се използва метод invokeAll () , за да се изпрати повече от една задача в ForkJoinPool.

5. Заключения

Използването на рамката fork / join може да ускори обработката на големи задачи, но за да се постигне този резултат, трябва да се следват някои насоки:

  • Използвайте възможно най-малко пулове от нишки - в повечето случаи най-доброто решение е да използвате един пул от нишки за приложение или система
  • Използвайте стандартния пул с общи нишки, ако не е необходима конкретна настройка
  • Използвайте разумен праг за разделяне на ForkJoinTask на подзадачи
  • Избягвайте блокиране във вашите ForkJoinTasks

Примерите, използвани в тази статия, са налични в свързаното хранилище на GitHub.