Въведение в Netty

1. Въведение

В тази статия ще разгледаме Netty - асинхронна рамка за мрежови приложения, управлявана от събития.

Основната цел на Netty е изграждането на високопроизводителни сървъри за протоколи, базирани на NIO (или вероятно NIO.2) с разделяне и свободно свързване на мрежовите и бизнес логическите компоненти. Той може да внедри широко известен протокол, като HTTP, или вашия собствен специфичен протокол.

2. Основни понятия

Netty е неблокираща рамка. Това води до висока производителност в сравнение с блокиране на IO. Разбирането на неблокиращия IO е от решаващо значение за разбирането на основните компоненти на Netty и техните взаимоотношения.

2.1. Канал

Channel е основата на Java NIO. Представлява отворена връзка, която е способна на IO операции като четене и писане.

2.2. Бъдеще

Всяка IO операция на канал в Netty не блокира.

Това означава, че всяка операция се връща веднага след разговора. В стандартната библиотека на Java има интерфейс Future , но това не е удобно за целите на Netty - можем само да попитаме Future за завършването на операцията или да блокираме текущата нишка, докато операцията бъде завършена.

Ето защо Netty има собствен интерфейс ChannelFuture . Можем да предадем обратно обаждане на ChannelFuture, което ще бъде извикано след завършване на операцията.

2.3. Събития и обработчици

Netty използва парадигма на приложения, управлявана от събития, така че конвейерът за обработка на данни е верига от събития, преминаващи през манипулатори. Събитията и манипулаторите могат да бъдат свързани с входящия и изходящия поток от данни. Входящите събития могат да бъдат следните:

  • Активиране и деактивиране на канал
  • Прочетете операционни събития
  • Изключения
  • Потребителски събития

Изходящите събития са по-прости и като цяло са свързани с отваряне / затваряне на връзка и запис / измиване на данни.

Netty приложенията се състоят от няколко мрежови и логически събития на приложенията и техните манипулатори. Основните интерфейси за манипулаторите на събитията на канала са ChannelHandler и неговите предци ChannelOutboundHandler и ChannelInboundHandler .

Netty предоставя огромна йерархия на реализации на ChannelHandler. Струва си да се отбележат адаптерите, които са само празни реализации, например ChannelInboundHandlerAdapter и ChannelOutboundHandlerAdapter . Бихме могли да разширим тези адаптери, когато трябва да обработим само подмножество от всички събития.

Също така има много реализации на специфични протоколи като HTTP, например HttpRequestDecoder, HttpResponseEncoder, HttpObjectAggregator. Би било добре да се запознаете с тях в Javadoc на Netty.

2.4. Кодери и декодери

Докато работим с мрежовия протокол, трябва да извършим сериализация и десериализация на данни. За тази цел Netty въвежда специални разширения на ChannelInboundHandler за декодери, които могат да декодират входящи данни. Основният клас на повечето декодери е ByteToMessageDecoder.

За кодиране на изходящи данни, Netty има разширения на ChannelOutboundHandler, наречени енкодери. MessageToByteEncoder е основата за повечето внедрения на енкодери . Можем да преобразуваме съобщението от байтова последователност в Java обект и обратно с енкодери и декодери.

3. Примерно сървърно приложение

Нека създадем проект, представляващ прост протоколен сървър, който получава заявка, извършва изчисление и изпраща отговор.

3.1. Зависимости

На първо място, трябва да предоставим зависимостта на Netty в нашия pom.xml :

 io.netty netty-all 4.1.10.Final 

Можем да намерим най-новата версия на Maven Central.

3.2. Модел на данни

Класът на данните за заявките ще има следната структура:

public class RequestData { private int intValue; private String stringValue; // standard getters and setters }

Да приемем, че сървърът получава заявката и връща intValue, умножено по 2. Отговорът ще има единичната стойност int:

public class ResponseData { private int intValue; // standard getters and setters }

3.3. Заявете декодер

Сега трябва да създадем енкодери и декодери за нашите протоколни съобщения.

Трябва да се отбележи, че Netty работи с буфер за получаване на сокети , който е представен не като опашка, а просто като куп байтове. Това означава, че нашият входящ манипулатор може да бъде извикан, когато пълното съобщение не е получено от сървър.

Трябва да се уверим, че сме получили пълното съобщение преди обработката и има много начини да го направим.

На първо място, можем да създадем временен ByteBuf и да добавим към него всички входящи байтове, докато не получим необходимото количество байтове:

public class SimpleProcessingHandler extends ChannelInboundHandlerAdapter { private ByteBuf tmp; @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("Handler added"); tmp = ctx.alloc().buffer(4); } @Override public void handlerRemoved(ChannelHandlerContext ctx) { System.out.println("Handler removed"); tmp.release(); tmp = null; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf m = (ByteBuf) msg; tmp.writeBytes(m); m.release(); if (tmp.readableBytes() >= 4) { // request processing RequestData requestData = new RequestData(); requestData.setIntValue(tmp.readInt()); ResponseData responseData = new ResponseData(); responseData.setIntValue(requestData.getIntValue() * 2); ChannelFuture future = ctx.writeAndFlush(responseData); future.addListener(ChannelFutureListener.CLOSE); } } }

Показаният по-горе пример изглежда малко странно, но ни помага да разберем как работи Netty. Всеки метод на нашия манипулатор се извиква, когато се появи съответното събитие. Така че ние инициализираме буфера, когато манипулаторът е добавен, попълваме го с данни за получаване на нови байтове и започваме да го обработваме, когато получим достатъчно данни.

We deliberately did not use a stringValue — decoding in such a manner would be unnecessarily complex. That's why Netty provides useful decoder classes which are implementations of ChannelInboundHandler: ByteToMessageDecoder and ReplayingDecoder.

As we noted above we can create a channel processing pipeline with Netty. So we can put our decoder as the first handler and the processing logic handler can come after it.

The decoder for RequestData is shown next:

public class RequestDecoder extends ReplayingDecoder { private final Charset charset = Charset.forName("UTF-8"); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { RequestData data = new RequestData(); data.setIntValue(in.readInt()); int strLen = in.readInt(); data.setStringValue( in.readCharSequence(strLen, charset).toString()); out.add(data); } }

An idea of this decoder is pretty simple. It uses an implementation of ByteBuf which throws an exception when there is not enough data in the buffer for the reading operation.

When the exception is caught the buffer is rewound to the beginning and the decoder waits for a new portion of data. Decoding stops when the out list is not empty after decode execution.

3.4. Response Encoder

Besides decoding the RequestData we need to encode the message. This operation is simpler because we have the full message data when the write operation occurs.

We can write data to Channel in our main handler or we can separate the logic and create a handler extending MessageToByteEncoder which will catch the write ResponseData operation:

public class ResponseDataEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, ResponseData msg, ByteBuf out) throws Exception { out.writeInt(msg.getIntValue()); } }

3.5. Request Processing

Since we carried out the decoding and encoding in separate handlers we need to change our ProcessingHandler:

public class ProcessingHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RequestData requestData = (RequestData) msg; ResponseData responseData = new ResponseData(); responseData.setIntValue(requestData.getIntValue() * 2); ChannelFuture future = ctx.writeAndFlush(responseData); future.addListener(ChannelFutureListener.CLOSE); System.out.println(requestData); } }

3.6. Server Bootstrap

Now let's put it all together and run our server:

public class NettyServer { private int port; // constructor public static void main(String[] args) throws Exception { int port = args.length > 0 ? Integer.parseInt(args[0]); : 8080; new NettyServer(port).run(); } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RequestDecoder(), new ResponseDataEncoder(), new ProcessingHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }

The details of the classes used in the above server bootstrap example can be found in their Javadoc. The most interesting part is this line:

ch.pipeline().addLast( new RequestDecoder(), new ResponseDataEncoder(), new ProcessingHandler());

Here we define inbound and outbound handlers that will process requests and output in the correct order.

4. Client Application

The client should perform reverse encoding and decoding, so we need to have a RequestDataEncoder and ResponseDataDecoder:

public class RequestDataEncoder extends MessageToByteEncoder { private final Charset charset = Charset.forName("UTF-8"); @Override protected void encode(ChannelHandlerContext ctx, RequestData msg, ByteBuf out) throws Exception { out.writeInt(msg.getIntValue()); out.writeInt(msg.getStringValue().length()); out.writeCharSequence(msg.getStringValue(), charset); } }
public class ResponseDataDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { ResponseData data = new ResponseData(); data.setIntValue(in.readInt()); out.add(data); } }

Also, we need to define a ClientHandler which will send the request and receive the response from server:

public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { RequestData msg = new RequestData(); msg.setIntValue(123); msg.setStringValue( "all work and no play makes jack a dull boy"); ChannelFuture future = ctx.writeAndFlush(msg); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println((ResponseData)msg); ctx.close(); } }

Now let's bootstrap the client:

public class NettyClient { public static void main(String[] args) throws Exception { String host = "localhost"; int port = 8080; EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }

Както можем да видим, има много общи детайли при зареждането на сървъра.

Сега можем да стартираме основния метод на клиента и да разгледаме изхода на конзолата. Както се очакваше, получихме ResponseData с intValue, равна на 246.

5. Заключение

В тази статия имахме кратко въведение в Netty. Показахме основните му компоненти като Channel и ChannelHandler . Също така направихме прост неблокиращ сървър за протоколи и клиент за него.

Както винаги, всички примерни кодове са достъпни в GitHub.