Отправяне на съобщения от RabbitMQ с пролетен AMQP

1. Въведение

В този урок ще разгледаме понятието fanout и темата обмен с Пролет AMQP и RabbitMQ.

На високо ниво обменните борси ще излъчват едно и също съобщение до всички обвързани опашки , докато обменните теми използват маршрутизиращ ключ за предаване на съобщения до определена обвързана опашка или опашки .

Предварително четене на Messaging With Spring AMQP се препоръчва за този урок.

2. Настройване на обмен на Fanout

Нека създадем една обменна фен с две опашки, свързани с нея. Когато изпратим съобщение до тази борса, и двете опашки ще получат съобщението. Нашият обмен на фанати игнорира всеки маршрутизиращ ключ, включен в съобщението.

Spring AMQP ни позволява да обобщаваме всички декларации за опашки, обмени и обвързвания в обект Declarables :

@Bean public Declarables fanoutBindings() { Queue fanoutQueue1 = new Queue("fanout.queue1", false); Queue fanoutQueue2 = new Queue("fanout.queue2", false); FanoutExchange fanoutExchange = new FanoutExchange("fanout.exchange"); return new Declarables( fanoutQueue1, fanoutQueue2, fanoutExchange, bind(fanoutQueue1).to(fanoutExchange), BindingBuilder.bind(fanoutQueue2).to(fanoutExchange)); }

3. Настройване на обмен на теми

Сега ще настроим и обмен на теми с две опашки, всяка с различен модел на обвързване:

@Bean public Declarables topicBindings() { Queue topicQueue1 = new Queue(topicQueue1Name, false); Queue topicQueue2 = new Queue(topicQueue2Name, false); TopicExchange topicExchange = new TopicExchange(topicExchangeName); return new Declarables( topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind(topicQueue1) .to(topicExchange).with("*.important.*"), BindingBuilder .bind(topicQueue2) .to(topicExchange).with("#.error")); }

Обменът на теми ни позволява да свързваме опашки с него с различни ключови модели. Това е много гъвкаво и ни позволява да свързваме множество опашки с един и същ шаблон или дори множество модели към една и съща опашка.

Когато ключът за маршрутизиране на съобщението съвпада с модела, той ще бъде поставен в опашката. Ако опашката има множество обвързвания, които съответстват на маршрутизиращия ключ на съобщението, на опашката се поставя само едно копие на съобщението.

Нашите модели за свързване могат да използват звездичка („*“), за да съвпадат с дума в определена позиция или знак за паунд („#“), за да съответстват на нула или повече думи.

И така, нашата topicQueue1 ще получава съобщения, които имат маршрутизиращи ключове с три думи, като средната дума е „важна“ - например: „user.important.error“ или „blog.important.notification“.

И нашият topicQueue2 ще получава съобщения, които имат ключове за маршрутизация, завършващи с думата грешка; съответстващите примери са „грешка“ , „потребител.значителна грешка“ или „блог.пост.запис.погрешка“.

4. Настройване на продуцент

Ще използваме метода convertAndSend на RabbitTemplate, за да изпратим нашите примерни съобщения:

 String message = " payload is broadcast"; return args -> { rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "", "fanout" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "topic important warn" + message); rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "topic important error" + message); };

В RabbitTemplate осигурява много претоварен convertAndSend () методи за различни типове обмен.

Когато изпращаме съобщение до обменна централа, ключът за маршрутизация се игнорира и съобщението се предава на всички обвързани опашки.

Когато изпращаме съобщение до обмена на теми, трябва да предадем ключ за маршрутизация. Въз основа на този ключ за маршрутизиране съобщението ще бъде доставено до определени опашки.

5. Конфигуриране на потребителите

И накрая, нека настроим четирима потребители - по един за всяка опашка - да вземат произведените съобщения:

 @RabbitListener(queues = {FANOUT_QUEUE_1_NAME}) public void receiveMessageFromFanout1(String message) { System.out.println("Received fanout 1 message: " + message); } @RabbitListener(queues = {FANOUT_QUEUE_2_NAME}) public void receiveMessageFromFanout2(String message) { System.out.println("Received fanout 2 message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_1_NAME}) public void receiveMessageFromTopic1(String message) { System.out.println("Received topic 1 (" + BINDING_PATTERN_IMPORTANT + ") message: " + message); } @RabbitListener(queues = {TOPIC_QUEUE_2_NAME}) public void receiveMessageFromTopic2(String message) { System.out.println("Received topic 2 (" + BINDING_PATTERN_ERROR + ") message: " + message); }

Конфигурираме потребителите, като използваме анотацията @RabbitListener . Единственият аргумент, предаден тук, е името на опашките. Потребителите не са наясно тук за обмен или ключове за маршрутизиране.

6. Стартиране на Примера

Нашият примерен проект е приложение Spring Spring и затова ще инициализира приложението заедно с връзка с RabbitMQ и ще настрои всички опашки, обмени и обвързвания.

По подразбиране нашето приложение очаква екземпляр RabbitMQ, работещ на localhost на порт 5672. Можем да модифицираме това и други настройки по подразбиране в application.yaml .

Нашият проект разкрива HTTP крайна точка на URI - / излъчване - която приема POST с съобщение в тялото на заявката.

Когато изпращаме заявка до този URI с тяло „Тест“, трябва да видим нещо подобно на това в изхода:

Received fanout 1 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important warn payload is broadcast Received topic 2 (#.error) message: topic important error payload is broadcast Received fanout 2 message: fanout payload is broadcast Received topic 1 (*.important.*) message: topic important error payload is broadcast

Редът, в който ще виждаме тези съобщения, разбира се, не е гарантиран.

7. Заключение

В този бърз урок разгледахме обмен на фен и теми с Spring AMQP и RabbitMQ.

Пълният изходен код и всички кодови фрагменти за този урок са достъпни в хранилището на GitHub.