Ръководство за NIO2 асинхронен сокет канал

1. Общ преглед

В тази статия ще демонстрираме как да изградим прост сървър и неговия клиент с помощта на API на Java 7 NIO.2 за канал.

Ще разгледаме класовете AsynchronousServerSocketChannel и AsynchronousSocketChannel , които са ключовите класове, използвани съответно за реализиране на сървъра и клиента.

Ако не сте запознати с приложните програмни интерфейси на NIO.2, имаме уводна статия на този сайт. Можете да го прочетете, като следвате тази връзка.

Всички класове, които са необходими за използване на API на NIO.2 за канал, са групирани в пакета java.nio.channels :

import java.nio.channels.*;

2. Сървърът с бъдеще

Екземпляр на AsynchronousServerSocketChannel се създава чрез извикване на статичния отворен API на неговия клас:

AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();

Новосъздаденият канал за асинхронен сървър е отворен, но все още не е обвързан, така че трябва да го обвържем с локален адрес и по избор да изберем порт:

server.bind(new InetSocketAddress("127.0.0.1", 4555));

Можем също така да предадем нула, така че да използва локален адрес и да се свърже с произволен порт:

server.bind(null);

Веднъж обвързан, API за приемане се използва за иницииране на приемането на връзки към сокета на канала:

Future acceptFuture = server.accept();

Както при операциите с асинхронни канали, горното повикване се връща веднага и изпълнението продължава.

След това можем да използваме API на get за заявка за отговор от обекта Future :

AsynchronousSocketChannel worker = future.get();

Това обаждане ще блокира, ако е необходимо, за да се изчака заявка за връзка от клиент. По желание можем да посочим време за изчакване, ако не искаме да чакаме вечно:

AsynchronousSocketChannel worker = acceptFuture.get(10, TimeUnit.SECONDS);

След като горното повикване се върне и операцията беше успешна, можем да създадем цикъл, в който да слушаме входящите съобщения и да ги връщаме обратно на клиента.

Нека създадем метод, наречен runServer, в който ще изчакаме и обработим всички входящи съобщения:

public void runServer() { clientChannel = acceptResult.get(); if ((clientChannel != null) && (clientChannel.isOpen())) { while (true) { ByteBuffer buffer = ByteBuffer.allocate(32); Future readResult = clientChannel.read(buffer); // perform other computations readResult.get(); buffer.flip(); Future writeResult = clientChannel.write(buffer); // perform other computations writeResult.get(); buffer.clear(); } clientChannel.close(); serverChannel.close(); } }

Вътре в цикъла всичко, което правим, е да създадем буфер, от който да четем и пишем, в зависимост от операцията.

След това, всеки път, когато правим четене или запис, можем да продължим да изпълняваме всеки друг код и когато сме готови да обработим резултата, извикваме API get () на обекта Future .

За да стартираме сървъра, извикваме неговия конструктор и след това метода runServer вътре в main :

public static void main(String[] args) { AsyncEchoServer server = new AsyncEchoServer(); server.runServer(); }

3. Сървърът с CompletionHandler

В този раздел ще видим как да приложим същия сървър, използвайки подхода CompletionHandler , а не подхода Future .

Вътре в конструктора създаваме AsynchronousServerSocketChannel и го свързваме с локален адрес по същия начин, както преди:

serverChannel = AsynchronousServerSocketChannel.open(); InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999); serverChannel.bind(hostAddress);

След това, все още в конструктора, ние създаваме цикъл while, в който приемаме всяка входяща връзка от клиент. Този цикъл while се използва стриктно, за да се предотврати излизането на сървъра, преди да се установи връзка с клиент .

За да предотвратим безкрайното изпълнение на цикъла , извикваме System.in.read () в края му, за да блокира изпълнението, докато входящата връзка не бъде прочетена от стандартния входен поток:

while (true) { serverChannel.accept( null, new CompletionHandler() { @Override public void completed( AsynchronousSocketChannel result, Object attachment) { if (serverChannel.isOpen()){ serverChannel.accept(null, this); } clientChannel = result; if ((clientChannel != null) && (clientChannel.isOpen())) { ReadWriteHandler handler = new ReadWriteHandler(); ByteBuffer buffer = ByteBuffer.allocate(32); Map readInfo = new HashMap(); readInfo.put("action", "read"); readInfo.put("buffer", buffer); clientChannel.read(buffer, readInfo, handler); } } @Override public void failed(Throwable exc, Object attachment) { // process error } }); System.in.read(); }

Когато се установи връзка, се извиква завършеният метод за обратно извикване в CompletionHandler на приемащата операция.

Типът му на връщане е екземпляр на AsynchronousSocketChannel . Ако каналът на сокета на сървъра е все още отворен, ние отново извикваме API за приемане , за да се подготвим за друга входяща връзка, докато използваме отново същия манипулатор.

След това присвояваме върнатия канал на сокет на глобален екземпляр. След това проверяваме дали той не е нулев и дали е отворен, преди да извършим операции върху него.

Точката, в която можем да започнем операции за четене и запис, е вътре в завършения API за обратно извикване на манипулатора на операцията accept . Тази стъпка замества предишния подход, при който анкетирахме канала с API на get .

Забележете, че сървърът вече няма да излиза след установяване на връзка, освен ако изрично не го затворим.

Забележете също така, че създадохме отделен вътрешен клас за обработка на операции за четене и запис; ReadWriteHandler . Ще видим как обектът за закрепване е полезен в този момент.

Първо, нека разгледаме класа ReadWriteHandler :

class ReadWriteHandler implements CompletionHandler
    
      { @Override public void completed( Integer result, Map attachment) { Map actionInfo = attachment; String action = (String) actionInfo.get("action"); if ("read".equals(action)) { ByteBuffer buffer = (ByteBuffer) actionInfo.get("buffer"); buffer.flip(); actionInfo.put("action", "write"); clientChannel.write(buffer, actionInfo, this); buffer.clear(); } else if ("write".equals(action)) { ByteBuffer buffer = ByteBuffer.allocate(32); actionInfo.put("action", "read"); actionInfo.put("buffer", buffer); clientChannel.read(buffer, actionInfo, this); } } @Override public void failed(Throwable exc, Map attachment) { // } }
    

Общият тип на прикачения файл в класа ReadWriteHandler е карта. По-специално трябва да предадем два важни параметъра през него - вида на операцията (действието) и буфера.

След това ще видим как се използват тези параметри.

Първата операция, която извършваме, е четене, тъй като това е ехо сървър, който реагира само на клиентски съобщения. Вътре в ReadWriteHandler е завършен метод за обратно повикване, ние възвръщаме приложените данни и да реши какво да прави това.

Ако това е операция за четене , която е завършила, ние извличаме буфера, променяме параметъра за действие на прикачения файл и извършваме операция за запис веднага, за да ехо на съобщението до клиента.

If it's a write operation which has just completed, we call the read API again to prepare the server to receive another incoming message.

4. The Client

After setting up the server, we can now set up the client by calling the open API on the AsyncronousSocketChannel class. This call creates a new instance of the client socket channel which we then use to make a connection to the server:

AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); InetSocketAddress hostAddress = new InetSocketAddress("localhost", 4999) Future future = client.connect(hostAddress);

The connect operation returns nothing on success. However, we can still use the Future object to monitor the state of the asynchronous operation.

Let's call the get API to await connection:

future.get()

След тази стъпка можем да започнем да изпращаме съобщения до сървъра и да получаваме ехо за същото. Методът sendMessage изглежда така:

public String sendMessage(String message) { byte[] byteMsg = new String(message).getBytes(); ByteBuffer buffer = ByteBuffer.wrap(byteMsg); Future writeResult = client.write(buffer); // do some computation writeResult.get(); buffer.flip(); Future readResult = client.read(buffer); // do some computation readResult.get(); String echo = new String(buffer.array()).trim(); buffer.clear(); return echo; }

5. Тестът

За да потвърдим, че нашите сървърни и клиентски приложения се представят според очакванията, можем да използваме тест:

@Test public void givenServerClient_whenServerEchosMessage_thenCorrect() { String resp1 = client.sendMessage("hello"); String resp2 = client.sendMessage("world"); assertEquals("hello", resp1); assertEquals("world", resp2); }

6. Заключение

В тази статия разгледахме API за асинхронни сокет канали на Java NIO.2. Успяхме да преминем през процеса на изграждане на сървър и клиент с тези нови API.

Можете да получите достъп до пълния изходен код за тази статия от проекта Github.