1. Общ преглед
Java 8 представи концепцията за S treams като ефективен начин за извършване на групови операции с данни. И паралелни потоци могат да бъдат получени в среди, които поддържат едновременност.
Тези потоци могат да се доставят с подобрена производителност - с цената на многопоточни режийни.
В този бърз урок ще разгледаме едно от най-големите ограничения на Stream API и ще видим как да накараме паралелен поток да работи с персонализиран екземпляр на ThreadPool , като алтернатива - има библиотека, която се справя с това.
2. Паралелен поток
Да започнем с един прост пример - наричайки parallelStream метода на някоя от Колекция видове - което ще върне възможно успоредно на живо :
@Test public void givenList_whenCallingParallelStream_shouldBeParallelStream(){ List aList = new ArrayList(); Stream parallelStream = aList.parallelStream(); assertTrue(parallelStream.isParallel()); }
Обработката по подразбиране, която се проявява по такъв поток използва ForkJoinPool.commonPool (), а темата басейн споделяна от цялото приложение.
3. Персонализиран пул от нишки
Всъщност можем да предадем персонализиран ThreadPool, когато обработваме потока .
Следващият пример позволява паралелен поток да използва персонализиран Thread Pool за изчисляване на сумата от дълги стойности от 1 до 1 000 000 включително:
@Test public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal() throws InterruptedException, ExecutionException { long firstNum = 1; long lastNum = 1_000_000; List aList = LongStream.rangeClosed(firstNum, lastNum).boxed() .collect(Collectors.toList()); ForkJoinPool customThreadPool = new ForkJoinPool(4); long actualTotal = customThreadPool.submit( () -> aList.parallelStream().reduce(0L, Long::sum)).get(); assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal); }
Използвахме конструктора ForkJoinPool с ниво на паралелизъм 4. Необходими са известни експерименти, за да се определи оптималната стойност за различните среди, но добро правило е просто да се избере числото въз основа на колко ядра има вашият процесор.
След това обработихме съдържанието на паралелния поток , обобщавайки ги в повикването за намаляване .
Този прост пример може да не демонстрира пълната полезност на използването на персонализиран Thread Pool , но предимствата стават очевидни в ситуации, когато не искаме да обвързваме общия Thread Pool с продължителни задачи (например обработка на данни от мрежов източник) , или общата група от нишки се използва от други компоненти в приложението.
4. Заключение
Накратко разгледахме как да стартираме паралелен поток, използвайки персонализиран Thread Pool . В подходящата среда и при правилното използване на нивото на паралелизъм може да се постигне повишаване на ефективността в определени ситуации.
Пълните примерни кодове, посочени в тази статия, могат да бъдат намерени в Github.