Въведение в Project Reactor Bus

1. Общ преглед

В тази кратка статия ще представим реакторната шина, като създадем реален сценарий за реактивно, управлявано от събития приложение.

2. Основите на Project Reactor

2.1. Защо реактор?

Съвременните приложения трябва да се справят с огромен брой едновременни заявки и да обработват значително количество данни. Стандартният код за блокиране вече не е достатъчен за изпълнение на тези изисквания.

Моделът на реактивния дизайн е базиран на събития архитектурен подход за асинхронна обработка на голям обем едновременни заявки за услуги, идващи от единични или множество манипулатори на услуги.

Project Reactor се основава на този модел и има ясна и амбициозна цел да изгради неблокиращи, реактивни приложения на JVM .

2.2. Примерни сценарии

Преди да започнем, ето няколко интересни сценария, при които използването на реактивния архитектурен стил би имало смисъл, само за да получим представа къде можем да го приложим:

  • Нотификационни услуги за голяма платформа за онлайн пазаруване като Amazon
  • Огромни услуги за обработка на транзакции за банковия сектор
  • Акции, търгуващи предприятия, при които цените на акциите се променят едновременно

3. Зависимости на Maven

Нека започнем да използваме Project Reactor Bus, като добавим следната зависимост в нашия pom.xml:

 io.projectreactor reactor-bus 2.0.8.RELEASE 

Можем да проверим най-новата версия на реакторната шина в Maven Central.

4. Изграждане на демо приложение

За да разберем по-добре ползите от реакторния подход, нека разгледаме практически пример.

Ще създадем просто приложение, отговорно за изпращане на известия до потребителите на платформа за онлайн пазаруване. Например, ако потребителят направи нова поръчка, тогава приложението изпраща потвърждение на поръчката по имейл или SMS.

Типично синхронно изпълнение естествено би било ограничено от производителността на услугата за електронна поща или SMS. Следователно скоковете на трафика, като празници, като цяло биха били проблематични.

С реактивен подход можем да проектираме нашата система да бъде по-гъвкава и да се адаптира по-добре към грешки или изчаквания, които могат да възникнат във външните системи, като сървъри за шлюз.

Нека да разгледаме приложението - като започнем с по-традиционните аспекти и преминем към по-реактивните конструкции.

4.1. Обикновено POJO

Първо, нека създадем POJO клас, който да представя данните за уведомяване:

public class NotificationData { private long id; private String name; private String email; private String mobile; // getter and setter methods }

4.2. Сервизният слой

Нека сега дефинираме прост сервизен слой:

public interface NotificationService { void initiateNotification(NotificationData notificationData) throws InterruptedException; }

И изпълнението, симулиращо продължителна операция:

@Service public class NotificationServiceimpl implements NotificationService { @Override public void initiateNotification(NotificationData notificationData) throws InterruptedException { System.out.println("Notification service started for " + "Notification ID: " + notificationData.getId()); Thread.sleep(5000); System.out.println("Notification service ended for " + "Notification ID: " + notificationData.getId()); } }

Забележете, че за да се илюстрира с реалния живот сценарий на изпращане на съобщения чрез SMS или имейл шлюз, ние умишлено въвеждане на пет секунди забавяне в initiateNotification метод с Thread.sleep (5000).

Следователно, когато нишка удари услугата, тя ще бъде блокирана за пет секунди.

4.3. Потребителят

Нека сега да преминем към по-реактивните аспекти на нашето приложение и да внедрим потребител - който след това ще картографираме към шината на събитието на реактора:

@Service public class NotificationConsumer implements Consumer
    
      { @Autowired private NotificationService notificationService; @Override public void accept(Event notificationDataEvent) { NotificationData notificationData = notificationDataEvent.getData(); try { notificationService.initiateNotification(notificationData); } catch (InterruptedException e) { // ignore } } }
    

Както виждаме, потребителят, който създадохме, изпълнява потребителския интерфейс. Основната логика се намира в метода accept .

Това е подобен подход, който можем да срещнем в типично изпълнение на слушателя на Spring.

4.4. Контролерът

И накрая, сега, когато сме в състояние да консумираме събитията, нека ги генерираме и ние.

Ще направим това в обикновен контролер:

@Controller public class NotificationController { @Autowired private EventBus eventBus; @GetMapping("/startNotification/{param}") public void startNotification(@PathVariable Integer param) { for (int i = 0; i < param; i++) { NotificationData data = new NotificationData(); data.setId(i); eventBus.notify("notificationConsumer", Event.wrap(data)); System.out.println( "Notification " + i + ": notification task submitted successfully"); } } }

Това е доста обяснимо - излъчваме събития чрез EventBus тук.

Например, ако клиент удари URL адреса със стойност на параметър десет, тогава десет събития ще бъдат изпратени през шината на събитията.

4.5. Java Config

Нека сега съберем всичко и да създадем просто приложение Spring Boot.

Първо, трябва да конфигурираме компоненти EventBus и Environment :

@Configuration public class Config { @Bean public Environment env() { return Environment.initializeIfEmpty().assignErrorJournal(); } @Bean public EventBus createEventBus(Environment env) { return EventBus.create(env, Environment.THREAD_POOL); } }

В нашия случай създаваме екземпляр на EventBus с пул от нишки по подразбиране, наличен в средата .

Като алтернатива можем да използваме персонализиран екземпляр на Dispatcher :

EventBus evBus = EventBus.create( env, Environment.newDispatcher( REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT, DispatcherType.THREAD_POOL_EXECUTOR));

Сега сме готови да създадем основен код на приложението:

import static reactor.bus.selector.Selectors.$; @SpringBootApplication public class NotificationApplication implements CommandLineRunner { @Autowired private EventBus eventBus; @Autowired private NotificationConsumer notificationConsumer; @Override public void run(String... args) throws Exception { eventBus.on($("notificationConsumer"), notificationConsumer); } public static void main(String[] args) { SpringApplication.run(NotificationApplication.class, args); } }

В нашия план метод сме регистриране на notificationConsumer да се задейства, когато уведомлението съвпадне с определен селектор .

Забележете как използваме статичното импортиране на атрибута $ за създаване на обект Selector .

5. Тествайте приложението

Нека сега създадем тест, за да видим нашето приложение NotificationApplication в действие:

@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class NotificationApplicationIntegrationTest { @LocalServerPort private int port; @Test public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() { RestTemplate restTemplate = new RestTemplate(); restTemplate.getForObject("//localhost:" + port + "/startNotification/10", String.class); } }

Както виждаме, веднага щом заявката бъде изпълнена, всичките десет задачи се подават незабавно, без да се създава блокиране . И след като бъдат изпратени, събитията за уведомяване се обработват паралелно.

Notification 0: notification task submitted successfully Notification 1: notification task submitted successfully Notification 2: notification task submitted successfully Notification 3: notification task submitted successfully Notification 4: notification task submitted successfully Notification 5: notification task submitted successfully Notification 6: notification task submitted successfully Notification 7: notification task submitted successfully Notification 8: notification task submitted successfully Notification 9: notification task submitted successfully Notification service started for Notification ID: 1 Notification service started for Notification ID: 2 Notification service started for Notification ID: 3 Notification service started for Notification ID: 0 Notification service ended for Notification ID: 1 Notification service ended for Notification ID: 0 Notification service started for Notification ID: 4 Notification service ended for Notification ID: 3 Notification service ended for Notification ID: 2 Notification service started for Notification ID: 6 Notification service started for Notification ID: 5 Notification service started for Notification ID: 7 Notification service ended for Notification ID: 4 Notification service started for Notification ID: 8 Notification service ended for Notification ID: 6 Notification service ended for Notification ID: 5 Notification service started for Notification ID: 9 Notification service ended for Notification ID: 7 Notification service ended for Notification ID: 8 Notification service ended for Notification ID: 9

Важно е да имате предвид, че в нашия сценарий няма нужда да обработваме тези събития в някакъв определен ред.

6. Заключение

В този бърз урок създадохме просто приложение, управлявано от събития . Видяхме също как да започнем да пишем по-реактивен и неблокиращ код.

Въпреки това, този сценарий само докосва повърхността на обекта и представлява просто една добра основа, за да започнете да експериментирате с реактивен парадигма .

Както винаги, изходният код е достъпен в GitHub.