Асинхронно HTTP програмиране с Play Framework

Java Top

Току що обявих новия курс Learn Spring , фокусиран върху основите на Spring 5 и Spring Boot 2:

>> ПРЕГЛЕД НА КУРСА

1. Общ преглед

Често нашите уеб услуги трябва да използват други уеб услуги, за да си вършат работата. Може да бъде трудно да се обслужват потребителски заявки, като същевременно се запазва ниско време за реакция. Бавната външна услуга може да увеличи времето за отговор и да накара системата ни да трупа заявки, използвайки повече ресурси. Тук неблокиращият подход може да бъде много полезен

В този урок ще задействаме множество асинхронни заявки към услуга от приложение на Play Framework. Използвайки неблокиращата HTTP способност на Java, ще можем безпроблемно да заявяваме външни ресурси, без да засягаме собствената си основна логика.

В нашия пример ще изследваме библиотеката на Play WebService.

2. Библиотеката Play WebService (WS)

WS е мощна библиотека, осигуряваща асинхронни HTTP повиквания, използващи Java Action .

Използвайки тази библиотека, нашият код изпраща тези заявки и продължава, без да блокира. За да обработим резултата от заявката, ние предоставяме консумираща функция, т.е. реализация на потребителския интерфейс.

Този модел споделя някои прилики с изпълнението на JavaScript на обратно извикване, обещания и модела async / await .

Нека създадем прост потребител, който регистрира някои от данните за отговора:

ws.url(url) .thenAccept(r -> log.debug("Thread#" + Thread.currentThread().getId() + " Request complete: Response code = " + r.getStatus() + " | Response: " + r.getBody() + " | Current Time:" + System.currentTimeMillis()))

Нашият потребител просто влиза в този пример. Потребителят може да направи всичко, което трябва да направим с резултата, като съхраняваме резултата в база данни.

Ако разгледаме по-задълбочено изпълнението на библиотеката, можем да забележим, че WS обвива и конфигурира AsyncHttpClient на Java , който е част от стандартния JDK и не зависи от Play.

3. Подгответе примерен проект

За да експериментираме с рамката, нека създадем някои модулни тестове за стартиране на заявки. Ще създадем скелетно уеб приложение, за да отговорим на тях и ще използваме WS рамката, за да отправяме HTTP заявки.

3.1. Уеб приложението скелет

На първо място, ние създаваме първоначалния проект, като използваме командата sbt new :

sbt new playframework/play-java-seed.g8

След това в новата папка редактираме файла build.sbt и добавяме зависимостта на WS библиотеката:

libraryDependencies += javaWs

Сега можем да стартираме сървъра с командата sbt run :

$ sbt run ... --- (Running the application, auto-reloading is enabled) --- [info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000

След като приложението стартира, можем да проверим дали всичко е наред, като разгледаме // localhost: 9000 , което ще отвори началната страница на Play.

3.2. Тестова среда

За да тестваме нашето приложение, ще използваме клас на единичен тест HomeControllerTest .

Първо, трябва да разширим WithServer, който ще осигури жизнения цикъл на сървъра:

public class HomeControllerTest extends WithServer { 

Благодарение на своя родител, този клас сега стартира нашия скелетен уеб сървър в тестов режим и на произволен порт , преди да стартира тестовете. Класът WithServer също спира приложението, когато тестът приключи.

След това трябва да предоставим приложение за стартиране.

Ние можем да го създадем с Guice е GuiceApplicationBuilder :

@Override protected Application provideApplication() { return new GuiceApplicationBuilder().build(); } 

И накрая, настроихме URL адреса на сървъра да се използва в нашите тестове, като използвахме номера на порта, предоставен от тестовия сървър:

@Override @Before public void setup() { OptionalInt optHttpsPort = testServer.getRunningHttpsPort(); if (optHttpsPort.isPresent()) { port = optHttpsPort.getAsInt(); url = "//localhost:" + port; } else { port = testServer.getRunningHttpPort() .getAsInt(); url = "//localhost:" + port; } }

Сега сме готови да напишем тестове. Цялостната рамка за тестове ни позволява да се концентрираме върху кодирането на нашите заявки за тест.

4. Подгответе WSRequest

Нека да видим как можем да задействаме основни типове заявки, като GET или POST, и заявки от няколко части за качване на файлове.

4.1. Инициализирайте обекта WSRequest

На първо място, трябва да получим екземпляр WSClient, за да конфигурираме и инициализираме нашите заявки.

В реално приложение можем да получим клиент, автоматично конфигуриран с настройки по подразбиране, чрез инжектиране на зависимост:

@Autowired WSClient ws;

В нашия тестов клас обаче използваме WSTestClient , достъпен от Play Test framework:

WSClient ws = play.test.WSTestClient.newClient(port);

След като имаме наш клиент, можем да инициализираме обект WSRequest, като извикаме метода url :

ws.url(url)

Методът url прави достатъчно, за да ни позволи да задействаме заявка. Въпреки това можем да го персонализираме допълнително, като добавим някои персонализирани настройки:

ws.url(url) .addHeader("key", "value") .addQueryParameter("num", "" + num);

As we can see, it's pretty easy to add headers and query parameters.

After we've fully configured our request, we can call the method to initiate it.

4.2. Generic GET Request

To trigger a GET request we just have to call the get method on our WSRequest object:

ws.url(url) ... .get();

As this is a non-blocking code, it starts the request and then continues execution at the next line of our function.

The object returned by get is a CompletionStage instance, which is part of the CompletableFuture API.

Once the HTTP call has completed, this stage executes just a few instructions. It wraps the response in a WSResponse object.

Normally, this result would be passed on to the next stage of the execution chain. In this example, we have not provided any consuming function, so the result is lost.

For this reason, this request is of type “fire-and-forget”.

4.3. Submit a Form

Submitting a form is not very different from the get example.

To trigger the request we just call the post method:

ws.url(url) ... .setContentType("application/x-www-form-urlencoded") .post("key1=value1&key2=value2");

In this scenario, we need to pass a body as a parameter. This can be a simple string like a file, a json or xml document, a BodyWritable or a Source.

4.4. Submit a Multipart/Form Data

A multipart form requires us to send both input fields and data from an attached file or stream.

To implement this in the framework, we use the post method with a Source.

Inside the source, we can wrap all the different data types needed by our form:

Source file = FileIO.fromPath(Paths.get("hello.txt")); FilePart file = new FilePart("fileParam", "myfile.txt", "text/plain", file); DataPart data = new DataPart("key", "value"); ws.url(url) ... .post(Source.from(Arrays.asList(file, data)));

Though this approach adds some more configuration, it is still very similar to the other types of requests.

5. Process the Async Response

Up to this point, we have only triggered fire-and-forget requests, where our code doesn't do anything with the response data.

Let's now explore two techniques for processing an asynchronous response.

We can either block the main thread, waiting for a CompletableFuture, or consume asynchronously with a Consumer.

5.1. Process Response by Blocking With CompletableFuture

Even when using an asynchronous framework, we may choose to block our code's execution and wait for the response.

Using the CompletableFuture API, we just need a few changes in our code to implement this scenario:

WSResponse response = ws.url(url) .get() .toCompletableFuture() .get();

This could be useful, for example, to provide a strong data consistency that we cannot achieve in other ways.

5.2. Process Response Asynchronously

To process an asynchronous response without blocking, we provide a Consumer or Function that is run by the asynchronous framework when the response is available.

For example, let's add a Consumer to our previous example to log the response:

ws.url(url) .addHeader("key", "value") .addQueryParameter("num", "" + 1) .get() .thenAccept(r -> log.debug("Thread#" + Thread.currentThread().getId() + " Request complete: Response code = " + r.getStatus() + " | Response: " + r.getBody() + " | Current Time:" + System.currentTimeMillis()));

We then see the response in the logs:

[debug] c.HomeControllerTest - Thread#30 Request complete: Response code = 200 | Response: { "Result" : "ok", "Params" : { "num" : [ "1" ] }, "Headers" : { "accept" : [ "*/*" ], "host" : [ "localhost:19001" ], "key" : [ "value" ], "user-agent" : [ "AHC/2.1" ] } } | Current Time:1579303109613

It's worth noting that we used thenAccept, which requires a Consumer function since we don't need to return anything after logging.

When we want the current stage to return something, so that we can use it in the next stage, we need thenApply instead, which takes a Function.

These use the conventions of the standard Java Functional Interfaces.

5.3. Large Response Body

The code we've implemented so far is a good solution for small responses and most use cases. However, if we need to process a few hundreds of megabytes of data, we'll need a better strategy.

We should note: Request methods like get and post load the entire response in memory.

To avoid a possible OutOfMemoryError, we can use Akka Streams to process the response without letting it fill our memory.

For example, we can write its body in a file:

ws.url(url) .stream() .thenAccept( response -> { try { OutputStream outputStream = Files.newOutputStream(path); Sink
    
      outputWriter = Sink.foreach(bytes -> outputStream.write(bytes.toArray())); response.getBodyAsSource().runWith(outputWriter, materializer); } catch (IOException e) { log.error("An error happened while opening the output stream", e); } });
    

The stream method returns a CompletionStage where the WSResponse has a getBodyAsStream method that provides a Source.

We can tell the code how to process this type of body by using Akka's Sink, which in our example will simply write any data passing through in the OutputStream.

5.4. Timeouts

When building a request, we can also set a specific timeout, so the request is interrupted if we don't receive the complete response in time.

This is a particularly useful feature when we see that a service we're querying is particularly slow and could cause a pile-up of open connections stuck waiting for the response.

We can set a global timeout for all our requests using tuning parameters. For a request-specific timeout, we can add to a request using setRequestTimeout:

ws.url(url) .setRequestTimeout(Duration.of(1, SECONDS));

There's still one case to handle, though: We may have received all the data, but our Consumer may be very slow processing it. This might happen if there is lots of data crunching, database calls, etc.

In low throughput systems, we can simply let the code run until it completes. However, we may wish to abort long-running activities.

To achieve that, we have to wrap our code with some futures handling.

Let's simulate a very long process in our code:

ws.url(url) .get() .thenApply( result -> { try { Thread.sleep(10000L); return Results.ok(); } catch (InterruptedException e) { return Results.status(SERVICE_UNAVAILABLE); } });

This will return an OK response after 10 seconds, but we don't want to wait that long.

Instead, with the timeout wrapper, we instruct our code to wait for no more than 1 second:

CompletionStage f = futures.timeout( ws.url(url) .get() .thenApply(result -> { try { Thread.sleep(10000L); return Results.ok(); } catch (InterruptedException e) { return Results.status(SERVICE_UNAVAILABLE); } }), 1L, TimeUnit.SECONDS); 

Now our future will return a result either way: the computation result if the Consumer finished in time, or the exception due to the futures timeout.

5.5. Handling Exceptions

In the previous example, we created a function that either returns a result or fails with an exception. So, now we need to handle both scenarios.

We can handle both success and failure scenarios with the handleAsync method.

Let's say that we want to return the result, if we've got it, or log the error and return the exception for further handling:

CompletionStage res = f.handleAsync((result, e) -> { if (e != null) { log.error("Exception thrown", e); return e.getCause(); } else { return result; } }); 

The code should now return a CompletionStage containing the TimeoutException thrown.

We can verify it by simply calling an assertEquals on the class of the exception object returned:

Class clazz = res.toCompletableFuture().get().getClass(); assertEquals(TimeoutException.class, clazz);

When running the test, it will also log the exception we received:

[error] c.HomeControllerTest - Exception thrown java.util.concurrent.TimeoutException: Timeout after 1 second ...

6. Request Filters

Sometimes, we need to run some logic before a request is triggered.

We could manipulate the WSRequest object once initialized, but a more elegant technique is to set a WSRequestFilter.

A filter can be set during initialization, before calling the triggering method, and is attached to the request logic.

We can define our own filter by implementing the WSRequestFilter interface, or we can add a ready-made one.

A common scenario is logging what the request looks like before executing it.

In this case, we just need to set the AhcCurlRequestLogger:

ws.url(url) ... .setRequestFilter(new AhcCurlRequestLogger()) ... .get();

The resulting log has a curl-like format:

[info] p.l.w.a.AhcCurlRequestLogger - curl \ --verbose \ --request GET \ --header 'key: value' \ '//localhost:19001'

We can set the desired log level, by changing our logback.xml configuration.

7. Caching Responses

WSClient also supports the caching of responses.

This feature is particularly useful when the same request is triggered multiple times and we don't need the freshest data every time.

It also helps when the service we're calling is temporarily down.

7.1. Add Caching Dependencies

To configure caching we need first to add the dependency in our build.sbt:

libraryDependencies += ehcache

This configures Ehcache as our caching layer.

If we don't want Ehcache specifically, we can use any other JSR-107 cache implementation.

7.2. Force Caching Heuristic

By default, Play WS won't cache HTTP responses if the server doesn't return any caching configuration.

To circumvent this, we can force the heuristic caching by adding a setting to our application.conf:

play.ws.cache.heuristics.enabled=true

This will configure the system to decide when it's useful to cache an HTTP response, regardless of the remote service's advertised caching.

8. Additional Tuning

Making requests to an external service may require some client configuration. We may need to handle redirects, a slow server, or some filtering depending on the user-agent header.

To address that, we can tune our WS client, using properties in our application.conf:

play.ws.followRedirects=false play.ws.useragent=MyPlayApplication play.ws.compressionEnabled=true # time to wait for the connection to be established play.ws.timeout.connection=30 # time to wait for data after the connection is open play.ws.timeout.idle=30 # max time available to complete the request play.ws.timeout.request=300

It's also possible to configure the underlying AsyncHttpClient directly.

The full list of available properties can be checked in the source code of AhcConfig.

9. Conclusion

In this article, we explored the Play WS library and its main features. We configured our project, learned how to fire common requests and to process their response, both synchronously and asynchronously.

We worked with large data downloads and saw how to cut short long-running activities.

Накрая разгледахме кеширането, за да подобрим производителността, и как да настроим клиента.

Както винаги, изходният код за този урок е достъпен в GitHub.

Дъно на Java

Току що обявих новия курс Learn Spring , фокусиран върху основите на Spring 5 и Spring Boot 2:

>> ПРЕГЛЕД НА КУРСА