Apache RocketMQ с Spring Boot

1. Въведение

В този урок ще създадем производител и потребител на съобщения, използвайки Spring Boot и Apache RocketMQ, платформа за разпространение на съобщения и поточно предаване на данни с отворен код.

2. Зависимости

За проектите на Maven трябва да добавим зависимостта RocketMQ Spring Boot Starter:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. Създаване на съобщения

За нашия пример ще създадем основен производител на съобщения, който ще изпраща събития всеки път, когато потребителят добави или премахне елемент от пазарската кошница.

Първо, нека зададем местоположението на сървъра и името на групата в нашата application.properties :

rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.group=cart-producer-group

Имайте предвид, че ако имахме повече от един сървър за имена, бихме могли да ги изброим като хост: порт; хост: порт .

Сега, за да бъде просто, ще създадем приложение CommandLineRunner и ще генерираме няколко събития по време на стартиране на приложението:

@SpringBootApplication public class CartEventProducer implements CommandLineRunner { @Autowired private RocketMQTemplate rocketMQTemplate; public static void main(String[] args) { SpringApplication.run(CartEventProducer.class, args); } public void run(String... args) throws Exception { rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("bike", 1)); rocketMQTemplate.convertAndSend("cart-item-add-topic", new CartItemEvent("computer", 2)); rocketMQTemplate.convertAndSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); } }

В CartItemEvent се състои от само две свойства - идентификационният номер на елемента и количество:

class CartItemEvent { private String itemId; private int quantity; // constructor, getters and setters }

В горния пример използваме метода convertAndSend () , общ метод, дефиниран от абстрактния клас AbstractMessageSendingTemplate , за да изпратим нашите събития в количката. Отнема два параметъра: Дестинация, която в нашия случай е име на тема, и полезен товар на съобщението.

4. Потребител на съобщения

Консумирането на RocketMQ съобщения е толкова просто, колкото създаването на Spring компонент, коментиран с @RocketMQMessageListener и внедряването на интерфейса RocketMQListener :

@SpringBootApplication public class CartEventConsumer { public static void main(String[] args) { SpringApplication.run(CartEventConsumer.class, args); } @Service @RocketMQMessageListener( topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic" ) public class CardItemAddConsumer implements RocketMQListener { public void onMessage(CartItemEvent addItemEvent) { log.info("Adding item: {}", addItemEvent); // additional logic } } @Service @RocketMQMessageListener( topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic" ) public class CardItemRemoveConsumer implements RocketMQListener { public void onMessage(CartItemEvent removeItemEvent) { log.info("Removing item: {}", removeItemEvent); // additional logic } } }

Трябва да създадем отделен компонент за всяка тема на съобщението, която слушаме. Във всеки от тези слушатели дефинираме името на темата и името на потребителската група чрез анотацията @ RocketMQMessageListener .

5. Синхронно и асинхронно предаване

В предишните примери използвахме метода convertAndSend , за да изпращаме съобщенията си. Имаме обаче някои други възможности.

Можем например да извикаме syncSend, което е различно от convertAndSend, защото връща обект SendResult .

Може да се използва, например, за да провери дали нашето съобщение е изпратено успешно или да получи своя идентификатор:

public void run(String... args) throws Exception { SendResult addBikeResult = rocketMQTemplate.syncSend("cart-item-add-topic", new CartItemEvent("bike", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend("cart-item-add-topic", new CartItemEvent("computer", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend("cart-item-removed-topic", new CartItemEvent("bike", 1)); }

Подобно на convertAndSend, този метод се връща само когато процедурата за изпращане завърши.

Трябва да използваме синхронно предаване в случаи, изискващи висока надеждност, като важни съобщения за известяване или SMS известия.

От друга страна, вместо това може да искаме да изпратим съобщението асинхронно и да бъдем уведомени, когато изпращането приключи.

Можем да направим това с asyncSend , който приема SendCallback като параметър и връща незабавно:

rocketMQTemplate.asyncSend("cart-item-add-topic", new CartItemEvent("bike", 1), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.error("Successfully sent cart item"); } @Override public void onException(Throwable throwable) { log.error("Exception during cart item sending", throwable); } });

Използваме асинхронно предаване в случаи, изискващи висока производителност.

И накрая, за сценарии, при които имаме много високи изисквания за производителност, можем да използваме sendOneWay вместо asyncSend . sendOneWay се различава от asyncSend, тъй като не гарантира, че съобщението ще бъде изпратено.

Еднопосочното предаване може да се използва и за обикновени случаи на надеждност, като събиране на трупи.

6. Изпращане на съобщения в транзакция

RocketMQ ни предоставя възможността да изпращаме съобщения в рамките на транзакция. Можем да го направим, като използваме метода sendInTransaction () :

MessageBuilder.withPayload(new CartItemEvent("bike", 1)).build(); rocketMQTemplate.sendMessageInTransaction("test-transaction", "topic-name", msg, null);

Също така трябва да внедрим интерфейс RocketMQLocalTransactionListener :

@RocketMQTransactionListener(txProducerGroup="test-transaction") class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return ROLLBACK, COMMIT or UNKNOWN return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return ROLLBACK, COMMIT or UNKNOWN return RocketMQLocalTransactionState.COMMIT; } }

В sendMessageInTransaction () първият параметър е името на транзакцията. То трябва да е същото като полето за член на @RocketMQTransactionListener txProducerGroup.

7. Конфигурация на производителя на съобщения

Също така можем да конфигурираме аспекти на самия производител на съобщения:

  • rocketmq.producer.send-message-timeout : Времето за изчакване на изпращането на съобщението е в милисекунди - стойността по подразбиране е 3000
  • rocketmq.producer.compress-message-body-prag : Праг, над който RocketMQ ще компресира съобщения - стойността по подразбиране е 1024.
  • rocketmq.producer.max-message-size : Максималният размер на съобщението в байтове - стойността по подразбиране е 4096.
  • rocketmq.producer.retry-times-when-send-async-failed : Максималният брой опити за вътрешно изпълнение в асинхронен режим преди изпращане на неуспех - стойността по подразбиране е 2.
  • rocketmq.producer.retry-next-server : Показва дали да се опита нов брокер при вътрешно изпращане на грешка - стойността по подразбиране е false .
  • rocketmq.producer.retry-times-when-send-failed : Максималният брой опити за вътрешно изпълнение в асинхронен режим преди изпращане на неуспех - стойността по подразбиране е 2.

8. Заключение

В тази статия научихме как да изпращаме и консумираме съобщения с помощта на Apache RocketMQ и Spring Boot. Както винаги целият изходен код е достъпен в GitHub.