1. Въведение
Този урок е ръководство за функционалността и случаите на използване на класа CompletableFuture, който е въведен като подобрение на API 8 за паралелна приложимост Java 8.
2. Асинхронно изчисление в Java
Асинхронното изчисление е трудно да се разсъждава. Обикновено искаме да разглеждаме всяко изчисление като поредица от стъпки, но в случай на асинхронно изчисление действията, представени като обратни извиквания, обикновено са разпръснати в кода или са дълбоко вложени един в друг . Нещата се влошават още повече, когато трябва да се справим с грешки, които могат да възникнат по време на една от стъпките.
Интерфейсът Future беше добавен в Java 5, за да служи като резултат от асинхронно изчисление, но не разполагаше с никакви методи за комбиниране на тези изчисления или обработка на възможни грешки.
Java 8 представи класа CompletableFuture . Заедно с интерфейса Future , той също така реализира интерфейса CompletionStage . Този интерфейс определя договора за асинхронна стъпка на изчисление, която можем да комбинираме с други стъпки.
CompletableFuture е едновременно градивен блок и рамка, с около 50 различни метода за съставяне, комбиниране и изпълнение на асинхронни изчислителни стъпки и обработка на грешки .
Такъв голям API може да бъде поразителен, но те най-вече попадат в няколко ясни и различни случаи на употреба.
3. Използване на CompletableFuture като просто бъдеще
На първо място, класът CompletableFuture реализира интерфейса Future , така че можем да го използваме като изпълнение на Future , но с допълнителна логика за завършване .
Например, можем да създадем екземпляр на този клас с конструктор no-arg, който да представя някакъв бъдещ резултат, да го раздаваме на потребителите и да го завършваме по някое време в бъдеще, използвайки пълния метод. Потребителите могат да използват метода get , за да блокират текущата нишка, докато се предостави този резултат.
В примера по-долу имаме метод, който създава екземпляр CompletableFuture , след това отделя някакво изчисление в друга нишка и веднага връща бъдещето .
Когато изчислението приключи, методът завършва бъдещето, като предоставя резултата на пълния метод:
public Future calculateAsync() throws InterruptedException { CompletableFuture completableFuture = new CompletableFuture(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture; }
За да отделим изчислението, използваме Executor API. Този метод за създаване и завършване на CompletableFuture може да се използва заедно с всеки механизъм за съвпадение или API, включително сурови нишки.
Забележете, че на calculateAsync метод връща Future инстанция .
Ние просто извикваме метода, получаваме бъдещия екземпляр и извикваме метода get върху него, когато сме готови да блокираме резултата.
Също така обърнете внимание, че методът get хвърля някои проверени изключения, а именно ExecutionException (капсулиране на изключение, възникнало по време на изчисление) и InterruptException (изключение, което означава, че нишка, изпълняваща метод, е била прекъсната):
Future completableFuture = calculateAsync(); // ... String result = completableFuture.get(); assertEquals("Hello", result);
Ако вече знаем резултата от изчислението , можем да използваме статичния метод completeFuture с аргумент, който представлява резултат от това изчисление. Вследствие на това GET метод на бъдещето никога няма да блокира незабавно връщане на този резултат, вместо:
Future completableFuture = CompletableFuture.completedFuture("Hello"); // ... String result = completableFuture.get(); assertEquals("Hello", result);
Като алтернативен сценарий може да поискаме да отменим изпълнението на бъдещето .
4. CompletableFuture с капсулирана изчислителна логика
Кодът по-горе ни позволява да изберем всеки механизъм за едновременно изпълнение, но какво, ако искаме да пропуснем този шаблон и просто да изпълним асинхронно някакъв код?
Статичните методи runAsync и supplyAsync ни позволяват да създадем екземпляр CompletableFuture от функционалните типове Runnable и доставчик съответно.
Както Runnable, така и доставчикът са функционални интерфейси, които позволяват предаването на техните екземпляри като ламбда изрази благодарение на новата функция на Java 8.
Интерфейсът Runnable е същият стар интерфейс, който се използва в нишки и не позволява да се върне стойност.
Интерфейсът на доставчика е общ функционален интерфейс с един метод, който няма аргументи и връща стойност от параметризиран тип.
Това ни позволява да предоставим екземпляр на доставчика като ламбда израз, който прави изчислението и връща резултата . Това е толкова просто, колкото:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello"); // ... assertEquals("Hello", future.get());
5. Обработка на резултати от асинхронни изчисления
Най-общият начин за обработка на резултата от изчислението е да се подаде към функция. Методът thenApply прави точно това; той приема екземпляр на функция , използва го за обработка на резултата и връща бъдеще, което съдържа стойност, върната от функция:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApply(s -> s + " World"); assertEquals("Hello World", future.get());
Ако не е необходимо да връщаме стойност по веригата на бъдещето , можем да използваме екземпляр на потребителския функционален интерфейс. Неговият единичен метод взема параметър и връща void .
Има метод за този случай на използване в CompletableFuture. Методът thenAccept получава потребител и му предава резултата от изчислението. Тогава последното извикване future.get () връща екземпляр от типа Void :
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();
И накрая, ако нито се нуждаем от стойността на изчислението, нито искаме да върнем някаква стойност в края на веригата, тогава можем да предадем Runnable ламбда на метода thenRun . В следващия пример просто отпечатваме ред в конзолата след извикване на future.get ():
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenRun(() -> System.out.println("Computation finished.")); future.get();
6. Комбиниране на фючърси
The best part of the CompletableFuture API is the ability to combine CompletableFuture instances in a chain of computation steps.
The result of this chaining is itself a CompletableFuture that allows further chaining and combining. This approach is ubiquitous in functional languages and is often referred to as a monadic design pattern.
In the following example we use the thenCompose method to chain two Futures sequentially.
Notice that this method takes a function that returns a CompletableFuture instance. The argument of this function is the result of the previous computation step. This allows us to use this value inside the next CompletableFuture‘s lambda:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", completableFuture.get());
The thenCompose method, together with thenApply, implement basic building blocks of the monadic pattern. They closely relate to the map and flatMap methods of Stream and Optional classes also available in Java 8.
Both methods receive a function and apply it to the computation result, but the thenCompose (flatMap) method receives a function that returns another object of the same type. This functional structure allows composing the instances of these classes as building blocks.
If we want to execute two independent Futures and do something with their results, we can use the thenCombine method that accepts a Future and a Function with two arguments to process both results:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync( () -> " World"), (s1, s2) -> s1 + s2)); assertEquals("Hello World", completableFuture.get());
A simpler case is when we want to do something with two Futures‘ results, but don't need to pass any resulting value down a Future chain. The thenAcceptBoth method is there to help:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));
7. Difference Between thenApply() and thenCompose()
In our previous sections, we've shown examples regarding thenApply() and thenCompose(). Both APIs help chain different CompletableFuture calls, but the usage of these 2 functions is different.
7.1. thenApply()
We can use this method to work with a result of the previous call. However, a key point to remember is that the return type will be combined of all calls.
So this method is useful when we want to transform the result of a CompletableFuture call:
CompletableFuture finalResult = compute().thenApply(s-> s + 1);
7.2. thenCompose()
The thenCompose() method is similar to thenApply() in that both return a new Completion Stage. However, thenCompose() uses the previous stage as the argument. It will flatten and return a Future with the result directly, rather than a nested future as we observed in thenApply():
CompletableFuture computeAnother(Integer i){ return CompletableFuture.supplyAsync(() -> 10 + i); } CompletableFuture finalResult = compute().thenCompose(this::computeAnother);
So if the idea is to chain CompletableFuture methods then it’s better to use thenCompose().
Also, note that the difference between these two methods is analogous to the difference between map() and flatMap().
8. Running Multiple Futures in Parallel
When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.
The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2, future3); // ... combinedFuture.get(); assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone());
Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture. The limitation of this method is that it does not return the combined results of all Futures. Instead, we have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:
String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); assertEquals("Hello Beautiful World", combined);
The CompletableFuture.join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally. This makes it possible to use it as a method reference in the Stream.map() method.
9. Handling Errors
For error handling in a chain of asynchronous computation steps, we have to adapt the throw/catch idiom in a similar fashion.
Instead of catching an exception in a syntactic block, the CompletableFuture class allows us to handle it in a special handle method. This method receives two parameters: a result of a computation (if it finished successfully), and the exception thrown (if some computation step did not complete normally).
In the following example, we use the handle method to provide a default value when the asynchronous computation of a greeting was finished with an error because no name was provided:
String name = null; // ... CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!"); assertEquals("Hello, Stranger!", completableFuture.get());
As an alternative scenario, suppose we want to manually complete the Future with a value, as in the first example, but also have the ability to complete it with an exception. The completeExceptionally method is intended for just that. The completableFuture.get() method in the following example throws an ExecutionException with a RuntimeException as its cause:
CompletableFuture completableFuture = new CompletableFuture(); // ... completableFuture.completeExceptionally( new RuntimeException("Calculation failed!")); // ... completableFuture.get(); // ExecutionException
In the example above, we could have handled the exception with the handle method asynchronously, but with the get method we can use the more typical approach of a synchronous exception processing.
10. Async Methods
Most methods of the fluent API in CompletableFuture class have two additional variants with the Async postfix. These methods are usually intended for running a corresponding step of execution in another thread.
The methods without the Async postfix run the next execution stage using a calling thread. In contrast, the Async method without the Executor argument runs a step using the common fork/join pool implementation of Executor that is accessed with the ForkJoinPool.commonPool() method. Finally, the Async method with an Executor argument runs a step using the passed Executor.
Here's a modified example that processes the result of a computation with a Function instance. The only visible difference is the thenApplyAsync method, but under the hood the application of a function is wrapped into a ForkJoinTask instance (for more information on the fork/join framework, see the article “Guide to the Fork/Join Framework in Java”). This allows us to parallelize our computation even more and use system resources more efficiently:
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApplyAsync(s -> s + " World"); assertEquals("Hello World", future.get());
11. JDK 9 CompletableFuture API
Java 9 enhances the CompletableFuture API with the following changes:
- New factory methods added
- Support for delays and timeouts
- Improved support for subclassing
and new instance APIs:
- Executor defaultExecutor()
- CompletableFuture newIncompleteFuture()
- CompletableFuture copy()
- CompletionStage minimalCompletionStage()
- CompletableFuture completeAsync(Supplier supplier, Executor executor)
- CompletableFuture completeAsync(Supplier supplier)
- CompletableFuture orTimeout(long timeout, TimeUnit unit)
- CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit)
We also now have a few static utility methods:
- Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
- Executor delayedExecutor(long delay, TimeUnit unit)
- CompletionStage completedStage(U value)
- CompletionStage failedStage(Throwable ex)
- CompletableFuture failedFuture(Throwable ex)
Finally, to address timeout, Java 9 has introduced two more new functions:
- orTimeout()
- completeOnTimeout()
Here's the detailed article for further reading: Java 9 CompletableFuture API Improvements.
12. Conclusion
В тази статия описахме методите и типичните случаи на използване на класа CompletableFuture .
Изходният код на статията е достъпен в GitHub.