Клиент на MQTT в Java

1. Общ преглед

В този урок ще видим как можем да добавим MQTT съобщения в Java проект, използвайки библиотеките, предоставени от проекта Eclipse Paho.

2. MQTT грунд

MQTT (MQ Telemetry Transport) е протокол за съобщения, който е създаден, за да отговори на необходимостта от прост и лек метод за прехвърляне на данни към / от устройства с ниска мощност, като тези, използвани в индустриални приложения.

С нарастващата популярност на устройствата на IoT (Internet of Things), MQTT наблюдава все по-голямо използване, което води до стандартизацията му от OASIS и ISO.

Протоколът поддържа единичен шаблон за съобщения, а именно шаблон Публикуване-Абониране: всяко съобщение, изпратено от клиент, съдържа свързана „тема“, която се използва от брокера, за да го насочи към абонирани клиенти. Имената на темите могат да бъдат прости низове като „ oiltemp “ или подобен на път низ „ motor / 1 / rpm “.

За да получава съобщения, клиент се абонира за една или повече теми, използвайки точното си име или низ, съдържащ един от поддържаните заместващи символи („#“ за многостепенни теми и „+“ за едно ниво “).

3. Настройка на проекта

За да включим библиотеката Paho в проект на Maven, трябва да добавим следната зависимост:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

Най-новата версия на библиотечния модул Eclipse Paho Java може да бъде изтеглена от Maven Central.

4. Настройка на клиента

Когато използваме библиотеката на Paho, първото нещо, което трябва да направим, за да изпращаме и / или получаваме съобщения от брокер на MQTT, е да получим изпълнение на интерфейса IMqttClient . Този интерфейс съдържа всички методи, изисквани от приложението, за да се установи връзка със сървъра, да се изпращат и получават съобщения.

Paho излиза от кутията с две реализации на този интерфейс, асинхронен ( MqttAsyncClient ) и синхронен ( MqttClient ).В нашия случай ще се съсредоточим върху синхронната версия, която има по-проста семантика.

Самата настройка е процес от две стъпки: първо създаваме екземпляр на класа MqttClient и след това го свързваме с нашия сървър. Следният подраздел подробно описва тези стъпки.

4.1. Създаване на нов IMqttClient съд

Следният кодов фрагмент показва как да създадете нов синхронен екземпляр на IMqttClient :

String publisherId = UUID.randomUUID().toString(); IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);

В този случай използваме най-простия конструктор на разположение, който взема адреса на крайната точка на нашия MQTT брокер и клиентски идентификатор , който уникално идентифицира нашия клиент.

В нашия случай използвахме произволен UUID, така че всеки клиентски идентификатор ще се генерира при всяко изпълнение.

Paho също така предоставя допълнителни конструктори, които можем да използваме, за да персонализираме механизма за постоянство, използван за съхраняване на непотвърдени съобщения и / или ScheduledExecutorService, използван за изпълнение на фонови задачи, изисквани от изпълнението на механизма на протокола.

Крайната точка на сървъра, която използваме, е публичен брокер на MQTT , хостван от проекта Paho , който позволява на всеки с интернет връзка да тества клиенти, без да е необходимо никакво удостоверяване.

4.2. Свързване със сървъра

Нашият новосъздаден екземпляр MqttClient не е свързан със сървъра. Правим това, като извикаме неговия метод connect () , като по избор предаваме екземпляр MqttConnectOptions, който ни позволява да персонализираме някои аспекти на протокола.

По-специално, можем да използваме тези опции за предаване на допълнителна информация като идентификационни данни за сигурност, режим на възстановяване на сесия, режим на повторно свързване и така нататък.

Класът MqttConnectionOptions излага тези опции като прости свойства, които можем да зададем с помощта на нормални методи за настройка. Трябва само да зададем свойствата, необходими за нашия сценарий - останалите ще приемат стойности по подразбиране.

Кодът, използван за установяване на връзка със сървъра, обикновено изглежда така:

MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setCleanSession(true); options.setConnectionTimeout(10); publisher.connect(options);

Тук дефинираме нашите опции за връзка, така че:

  • Библиотеката автоматично ще се опита да се свърже отново със сървъра в случай на мрежова повреда
  • Той ще отхвърли неизпратени съобщения от предишно изпълнение
  • Времето за изчакване на връзката е настроено на 10 секунди

5. Изпращане на съобщения

Изпращането на съобщения с помощта на вече свързан MqttClient е много лесно. Използваме един от вариантите на метод objav () , за да изпратим полезния товар, който винаги е байтов масив, към дадена тема , като използваме една от следните опции за качество на услугата:

  • 0 - семантика „най-много веднъж“, известна още като „стреляй и забрави“. Използвайте тази опция, когато загубата на съобщение е приемлива, тъй като не изисква никакво потвърждение или постоянство
  • 1 - семантика „поне веднъж“. Използвайте тази опция, когато загубата на съобщения не е приемлива и вашите абонати могат да обработват дубликати
  • 2 - семантика „точно веднъж“. Използвайте тази опция, когато загубата на съобщения не е приемлива и вашите абонати не могат да обработват дубликати

В нашия примерен проект класът EngineTemperatureSensor играе ролята на фалшив сензор, който генерира ново отчитане на температурата всеки път, когато извикаме метода му call () .

Този клас реализира интерфейса Callable, за да можем лесно да го използваме с една от реализациите на ExecutorService, налични в пакета java.util.concurrent :

public class EngineTemperatureSensor implements Callable { // ... private members omitted public EngineTemperatureSensor(IMqttClient client) { this.client = client; } @Override public Void call() throws Exception { if ( !client.isConnected()) { return null; } MqttMessage msg = readEngineTemp(); msg.setQos(0); msg.setRetained(true); client.publish(TOPIC,msg); return null; } private MqttMessage readEngineTemp() { double temp = 80 + rnd.nextDouble() * 20.0; byte[] payload = String.format("T:%04.2f",temp) .getBytes(); return new MqttMessage(payload); } }

В MqttMessage капсулира самата полезния товар, искания за качество на услугата, а също и на задържани флаг на съобщението. Този флаг показва на брокера, че трябва да запази това съобщение, докато не бъде консумирано от абонат.

Можем да използваме тази функция, за да приложим „последно известно добро“ поведение, така че когато нов абонат се свърже със сървъра, той ще получи запазеното съобщение веднага.

6. Получаване на съобщения

За да получаваме съобщения от посредника на MQTT, трябва да използваме един от вариантите на метода subscribe () , който ни позволява да посочим:

  • Един или повече филтри на теми за съобщения, които искаме да получаваме
  • Свързаното QoS
  • Манипулаторът за обратно извикване за обработка на получени съобщения

В следващия пример показваме как да добавите слушател на съобщения към съществуващ екземпляр IMqttClient, за да получавате съобщения от дадена тема. Използваме CountDownLatch като механизъм за синхронизация между обратното ни обаждане и основната нишка за изпълнение, като го намаляваме всеки път, когато пристигне ново съобщение.

В примерния код използвахме различен екземпляр IMqttClient за получаване на съобщения. Направихме го само за да изясним кой клиент какво прави, но това не е ограничение на Paho - ако искате, можете да използвате същия клиент за публикуване и получаване на съобщения:

CountDownLatch receivedSignal = new CountDownLatch(10); subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> { byte[] payload = msg.getPayload(); // ... payload handling omitted receivedSignal.countDown(); }); receivedSignal.await(1, TimeUnit.MINUTES);

В абонират () вариант се използва по-горе заема IMqttMessageListener например като второто аргумент.

В нашия случай използваме проста ламбда функция, която обработва полезния товар и намалява брояч. Ако в посочения времеви прозорец (1 минута) не пристигнат достатъчно съобщения, методът await () ще изведе изключение.

When using Paho, we don't need to explicitly acknowledge message receipt. If the callback returns normally, Paho assumes it a successful consumption and sends an acknowledgment to the server.

If the callback throws an Exception, the client will be shut down. Please note that this will result in loss of any messages sent with QoS level of 0.

Messages sent with QoS level 1 or 2 will be resent by the server once the client is reconnected and subscribes to the topic again.

7. Conclusion

In this article, we demonstrated how we can add support for the MQTT protocol in our Java applications using the library provided by the Eclipse Paho project.

Тази библиотека обработва всички подробности за протокола на ниско ниво, което ни позволява да се съсредоточим върху други аспекти на нашето решение, като същевременно оставя достатъчно място за персонализиране на важни аспекти на неговите вътрешни функции, като постоянство на съобщенията.

Кодът, показан в тази статия, е достъпен в GitHub.