Въведение в Java NIO Selector

1. Общ преглед

В тази статия ще разгледаме уводните части на компонента за избор на Java NIO .

Селекторът предоставя механизъм за наблюдение на един или повече NIO канали и разпознаване кога един или повече стават достъпни за трансфер на данни.

По този начин една нишка може да се използва за управление на множество канали и по този начин множество мрежови връзки.

2. Защо да използваме селектор?

Със селектора можем да използваме една нишка вместо няколко, за да управляваме множество канали. Превключването на контекст между нишки е скъпо за операционната система и освен това всяка нишка заема памет.

Следователно, колкото по-малко нишки използваме, толкова по-добре. Важно е обаче да запомните, че съвременните операционни системи и процесори продължават да се подобряват при многозадачност , така че режийните разходи за многопоточност продължават да намаляват с времето.

Тук ще се справим с това как можем да обработваме множество канали с една нишка с помощта на селектор.

Имайте предвид също, че селекторите не ви помагат само да четете данни; те също могат да слушат за входящи мрежови връзки и да записват данни през бавни канали.

3. Настройка

За да използваме селектора, не се нуждаем от специална настройка. Всички класове, от които се нуждаем, са основният пакет java.nio и ние просто трябва да импортираме това, от което се нуждаем.

След това можем да регистрираме множество канали с обект селектор. Когато I / O активност се случи на някой от каналите, селекторът ни уведомява. Ето как можем да четем от голям брой източници на данни от една нишка.

Всеки канал, който регистрираме със селектор, трябва да бъде подклас на SelectableChannel . Това са специален тип канали, които могат да бъдат поставени в неблокиращ режим.

4. Създаване на селектор

Селектор може да бъде създаден чрез извикване на статичния отворен метод на класа Selector , който ще използва доставчика на селектора по подразбиране на системата, за да създаде нов селектор:

Selector selector = Selector.open();

5. Регистриране на избираеми канали

За да може селекторът да наблюдава някакви канали, трябва да регистрираме тези канали в селектора. Правим това, като извикаме метода регистър на избираемия канал.

Но преди даден канал да бъде регистриран със селектор, той трябва да е в неблокиращ режим:

channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

Това означава, че не можем да използваме FileChannel s с селектор, тъй като те не могат да бъдат превключени в неблокиращ режим по начина, по който го правим с сокет канали.

Първият параметър е обектът Selector, който създадохме по-рано, вторият параметър определя набор от интереси , което означава какви събития се интересуваме да слушаме в наблюдавания канал чрез селектора.

Има четири различни събития, които можем да слушаме, всяко от тях е представено от константа в класа SelectionKey :

  • Свързване - когато клиент се опитва да се свърже със сървъра. Представено от SelectionKey.OP_CONNECT
  • Приемам - когато сървърът приема връзка от клиент. Представено от SelectionKey.OP_ACCEPT
  • Четене - когато сървърът е готов за четене от канала. Представено от SelectionKey.OP_READ
  • Пиши - когато сървърът е готов да пише в канала. Представено от SelectionKey.OP_WRITE

Върнатият обект SelectionKey представлява регистрацията на избираемия канал в селектора. Ще го разгледаме допълнително в следващия раздел.

6. SelectionKey обекта

Както видяхме в предишния раздел, когато регистрираме канал със селектор, получаваме обект SelectionKey . Този обект съдържа данни, представляващи регистрацията на канала.

Той съдържа някои важни свойства, които трябва да разберем добре, за да можем да използваме селектора на канала. Ще разгледаме тези свойства в следващите подраздели.

6.1. Лихвеният набор

Набор от лихви определя набора от събития, за които искаме селекторът да внимава в този канал. Това е цяла стойност; можем да получим тази информация по следния начин.

На първо място, ние имаме набор интерес върнат от SelectionKey е interestOps метод. Тогава имаме константата на събитието в SelectionKey, която разгледахме по-рано.

Когато ние И тези две стойности, получаваме логическа стойност, която ни казва дали събитието се наблюдава или не:

int interestSet = selectionKey.interestOps(); boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

6.2. Готовият комплект

Готовият набор определя набора от събития, за които каналът е готов. Това е и цяла стойност; можем да получим тази информация по следния начин.

Имаме готов набор върнати от SelectionKey е readyOps метод. Когато ние И тази стойност с константи на събитията, както направихме в случая на зададен интерес, получаваме булева стойност, представяща дали каналът е готов за определена стойност или не.

Друг алтернативен и по-кратък начин да направите това е да използвате методите за удобство на SelectionKey за същата тази цел:

selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWriteable();

6.3. Каналът

Достъпът до гледания канал от обекта SelectionKey е много лесен. Просто извикваме метода на канала :

Channel channel = key.channel();

6.4. Селекторът

Подобно на получаването на канал, е много лесно да получите обекта Selector от обекта SelectionKey :

Selector selector = key.selector();

6.5. Прикачване на обекти

Можем да прикачим обект към SelectionKey. Понякога може да поискаме да дадем на канал персонализиран идентификатор или да прикачим някакъв вид Java обект, който може да искаме да следим.

Attaching objects is a handy way of doing it. Here is how you attach and get objects from a SelectionKey:

key.attach(Object); Object object = key.attachment();

Alternatively, we can choose to attach an object during channel registration. We add it as a third parameter to channel's register method, like so:

SelectionKey key = channel.register( selector, SelectionKey.OP_ACCEPT, object);

7. Channel Key Selection

So far, we have looked at how to create a selector, register channels to it and inspect the properties of the SelectionKey object which represents a channel's registration to a selector.

This is only half of the process, now we have to perform a continuous process of selecting the ready set which we looked at earlier. We do selection using selector's select method, like so:

int channels = selector.select();

This method blocks until at least one channel is ready for an operation. The integer returned represents the number of keys whose channels are ready for an operation.

Next, we usually retrieve the set of selected keys for processing:

Set selectedKeys = selector.selectedKeys();

The set we have obtained is of SelectionKey objects, each key represents a registered channel which is ready for an operation.

After this, we usually iterate over this set and for each key, we obtain the channel and perform any of the operations that appear in our interest set on it.

During the lifetime of a channel, it may be selected several times as its key appears in the ready set for different events. This is why we must have a continuous loop to capture and process channel events as and when they occur.

8. Complete Example

To cement the knowledge we have gained in the previous sections, we're going to build a complete client-server example.

For ease of testing out our code, we'll build an echo server and an echo client. In this kind of setup, the client connects to the server and starts sending messages to it. The server echoes back messages sent by each client.

When the server encounters a specific message, such as end, it interprets it as the end of the communication and closes the connection with the client.

8.1. The Server

Here is our code for EchoServer.java:

public class EchoServer { private static final String POISON_PILL = "POISON_PILL"; public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.bind(new InetSocketAddress("localhost", 5454)); serverSocket.configureBlocking(false); serverSocket.register(selector, SelectionKey.OP_ACCEPT); ByteBuffer buffer = ByteBuffer.allocate(256); while (true) { selector.select(); Set selectedKeys = selector.selectedKeys(); Iterator iter = selectedKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); if (key.isAcceptable()) { register(selector, serverSocket); } if (key.isReadable()) { answerWithEcho(buffer, key); } iter.remove(); } } } private static void answerWithEcho(ByteBuffer buffer, SelectionKey key) throws IOException { SocketChannel client = (SocketChannel) key.channel(); client.read(buffer); if (new String(buffer.array()).trim().equals(POISON_PILL)) { client.close(); System.out.println("Not accepting client messages anymore"); } else { buffer.flip(); client.write(buffer); buffer.clear(); } } private static void register(Selector selector, ServerSocketChannel serverSocket) throws IOException { SocketChannel client = serverSocket.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); } public static Process start() throws IOException, InterruptedException { String javaHome = System.getProperty("java.home"); String javaBin = javaHome + File.separator + "bin" + File.separator + "java"; String classpath = System.getProperty("java.class.path"); String className = EchoServer.class.getCanonicalName(); ProcessBuilder builder = new ProcessBuilder(javaBin, "-cp", classpath, className); return builder.start(); } }

This is what is happening; we create a Selector object by calling the static open method. We then create a channel also by calling its static open method, specifically a ServerSocketChannel instance.

This is because ServerSocketChannel is selectable and good for a stream-oriented listening socket.

We then bind it to a port of our choice. Remember we said earlier that before registering a selectable channel to a selector, we must first set it to non-blocking mode. So next we do this and then register the channel to the selector.

We don't need the SelectionKey instance of this channel at this stage, so we will not remember it.

Java NIO uses a buffer-oriented model other than a stream-oriented model. So socket communication usually takes place by writing to and reading from a buffer.

We, therefore, create a new ByteBuffer which the server will be writing to and reading from. We initialize it to 256 bytes, it's just an arbitrary value, depending on how much data we plan to transfer to and fro.

Finally, we perform the selection process. We select the ready channels, retrieve their selection keys, iterate over the keys and perform the operations for which each channel is ready.

We do this in an infinite loop since servers usually need to keep running whether there is an activity or not.

The only operation a ServerSocketChannel can handle is an ACCEPT operation. When we accept the connection from a client, we obtain a SocketChannel object on which we can do read and writes. We set it to non-blocking mode and register it for a READ operation to the selector.

During one of the subsequent selections, this new channel will become read-ready. We retrieve it and read it contents into the buffer. True to it's as an echo server, we must write this content back to the client.

When we desire to write to a buffer from which we have been reading, we must call the flip() method.

We finally set the buffer to write mode by calling the flip method and simply write to it.

The start() method is defined so that the echo server can be started as a separate process during unit testing.

8.2. The Client

Here is our code for EchoClient.java:

public class EchoClient { private static SocketChannel client; private static ByteBuffer buffer; private static EchoClient instance; public static EchoClient start() { if (instance == null) instance = new EchoClient(); return instance; } public static void stop() throws IOException { client.close(); buffer = null; } private EchoClient() { try { client = SocketChannel.open(new InetSocketAddress("localhost", 5454)); buffer = ByteBuffer.allocate(256); } catch (IOException e) { e.printStackTrace(); } } public String sendMessage(String msg) { buffer = ByteBuffer.wrap(msg.getBytes()); String response = null; try { client.write(buffer); buffer.clear(); client.read(buffer); response = new String(buffer.array()).trim(); System.out.println("response=" + response); buffer.clear(); } catch (IOException e) { e.printStackTrace(); } return response; } }

The client is simpler than the server.

We use a singleton pattern to instantiate it inside the start static method. We call the private constructor from this method.

In the private constructor, we open a connection on the same port on which the server channel was bound and still on the same host.

We then create a buffer to which we can write and from which we can read.

Finally, we have a sendMessage method which reads wraps any string we pass to it into a byte buffer which is transmitted over the channel to the server.

След това четем от клиентския канал, за да получим съобщението, изпратено от сървъра. Връщаме това като ехо на нашето послание.

8.3. Тестване

Вътре в клас, наречен EchoTest.java , ще създадем тестов случай, който стартира сървъра, изпраща съобщения до сървъра и преминава само когато същите съобщения са получени обратно от сървъра. Като последна стъпка тестовият случай спира сървъра преди завършване.

Вече можем да стартираме теста:

public class EchoTest { Process server; EchoClient client; @Before public void setup() throws IOException, InterruptedException { server = EchoServer.start(); client = EchoClient.start(); } @Test public void givenServerClient_whenServerEchosMessage_thenCorrect() { String resp1 = client.sendMessage("hello"); String resp2 = client.sendMessage("world"); assertEquals("hello", resp1); assertEquals("world", resp2); } @After public void teardown() throws IOException { server.destroy(); EchoClient.stop(); } }

9. Заключение

В тази статия разгледахме основното използване на компонента Java NIO Selector.

Пълният изходен код и всички кодови фрагменти за тази статия са налични в моя проект GitHub.