Въведение в Spring Cloud Stream

1. Общ преглед

Spring Cloud Stream е рамка, изградена върху Spring Boot и Spring Integration, която помага при създаването на управлявани от събития или управлявани от съобщения микроуслуги .

В тази статия ще ви представим концепции и конструкции на Spring Cloud Stream с няколко прости примера.

2. Зависимости на Maven

За да започнем, ще трябва да добавим Spring Cloud Starter Stream с зависимостта на брокера RabbitMQ Maven като междинен софтуер за съобщения към нашия pom.xml :

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.RELEASE 

И ще добавим модулната зависимост от Maven Central, за да активираме и поддръжката на JUnit:

 org.springframework.cloud spring-cloud-stream-test-support 1.3.0.RELEASE test 

3. Основни понятия

Архитектурата на Microservices следва принципа „интелигентни крайни точки и тъпи тръби“. Комуникацията между крайните точки се задвижва от страни за междинни съобщения като RabbitMQ или Apache Kafka. Услугите комуникират чрез публикуване на събития в домейна чрез тези крайни точки или канали .

Нека да преминем през концепциите, които изграждат рамката на Spring Cloud Stream, заедно с основните парадигми, с които трябва да сме наясно, за да изградим услуги, управлявани от съобщения.

3.1. Конструкции

Нека разгледаме проста услуга в Spring Cloud Stream, която слуша входно обвързване и изпраща отговор на изходното обвързване:

@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { public static void main(String[] args) { SpringApplication.run(MyLoggerServiceApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); } }

Анотацията @EnableBinding конфигурира приложението да обвързва каналите INPUT и OUTPUT, дефинирани в интерфейсния процесор . И двата канала са свързващи елементи, които могат да бъдат конфигурирани да използват конкретен междинен софтуер за съобщения или свързващо вещество.

Нека да разгледаме дефиницията на всички тези понятия:

  • Обвързвания - колекция от интерфейси, които декларативно идентифицират входните и изходните канали
  • Binder - внедряване на междинен софтуер за съобщения като Kafka или RabbitMQ
  • Канал - представлява комуникационната тръба между междинния софтуер за съобщения и приложението
  • StreamListeners - методи за обработка на съобщения в компоненти, които автоматично ще бъдат извикани в съобщение от канала, след като MessageConverter извърши сериализация / десериализация между специфични за средната програма събития и типове обект на домейн / POJOs
  • Mes sage схеми - използвани за сериализация и десериализация на съобщения, тези схеми могат да се четат статично от дадено местоположение или да се зареждат динамично, поддържайки развитието на типовете обект на домейн

3.2. Модели за комуникация

Съобщенията, предназначени за дестинации, се доставят по модела за съобщения Публикуване-Абониране . Издателите категоризират съобщенията по теми, всяка от които се идентифицира с име. Абонатите изразяват интерес към една или повече теми. Средният софтуер филтрира съобщенията, като доставя тези от интересните теми на абонатите.

Сега абонатите могат да бъдат групирани. А група потребители е набор от абонати или потребители, определени от една група номер , в рамките на който на съобщения от дадена тема или дял тема са доставени в с балансиране на натоварването начин.

4. Модел за програмиране

Този раздел описва основите на изграждането на приложенията на Spring Cloud Stream.

4.1. Функционално тестване

Тестовата поддръжка е свързващо изпълнение, което позволява взаимодействие с каналите и проверка на съобщения.

Нека изпратим съобщение до горната услуга enrichLogMessage и проверим дали отговорът съдържа текста „[1]:“ в началото на съобщението:

@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = MyLoggerServiceApplication.class) @DirtiesContext public class MyLoggerApplicationTests { @Autowired private Processor pipe; @Autowired private MessageCollector messageCollector; @Test public void whenSendMessage_thenResponseShouldUpdateText() { pipe.input() .send(MessageBuilder.withPayload(new LogMessage("This is my message")) .build()); Object payload = messageCollector.forChannel(pipe.output()) .poll() .getPayload(); assertEquals("[1]: This is my message", payload.toString()); } }

4.2. Персонализирани канали

В горния пример използвахме процесорния интерфейс, предоставен от Spring Cloud, който има само един входен и един изходен канал.

Ако се нуждаем от нещо различно, като един входен и два изходни канала, можем да създадем персонализиран процесор:

public interface MyProcessor { String INPUT = "myInput"; @Input SubscribableChannel myInput(); @Output("myOutput") MessageChannel anOutput(); @Output MessageChannel anotherOutput(); }

Spring ще осигури правилното изпълнение на този интерфейс за нас. Имената на каналите могат да бъдат зададени с помощта на анотации като в @Output (“myOutput”) .

В противен случай Spring ще използва имената на методите като имена на канали. Следователно имаме три канала, наречени myInput , myOutput и anotherOutput .

Сега, нека си представим, че искаме да насочим съобщенията към един изход, ако стойността е по-малка от 10, а в друг изход стойността е по-голяма или равна на 10:

@Autowired private MyProcessor processor; @StreamListener(MyProcessor.INPUT) public void routeValues(Integer val) { if (val < 10) { processor.anOutput().send(message(val)); } else { processor.anotherOutput().send(message(val)); } } private static final  Message message(T val) { return MessageBuilder.withPayload(val).build(); }

4.3. Условно изпращане

Използвайки анотацията @StreamListener , ние също можем да филтрираме съобщенията, които очакваме в потребителя, използвайки всяко условие, което дефинираме с изрази SpEL.

Като пример бихме могли да използваме условно изпращане като друг подход за насочване на съобщенията към различни изходи:

@Autowired private MyProcessor processor; @StreamListener( target = MyProcessor.INPUT, condition = "payload = 10") public void routeValuesToAnotherOutput(Integer val) { processor.anotherOutput().send(message(val)); }

Единственото ограничение на този подход е, че тези методи не трябва да връщат стойност.

5. Настройка

Let's set up the application that will process the message from the RabbitMQ broker.

5.1. Binder Configuration

We can configure our application to use the default binder implementation via META-INF/spring.binders:

rabbit:\ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Or we can add the binder library for RabbitMQ to the classpath by including this dependency:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 1.3.0.RELEASE 

If no binder implementation is provided, Spring will use direct message communication between the channels.

5.2. RabbitMQ Configuration

To configure the example in section 3.1 to use the RabbitMQ binder, we need to update the application.yml located at src/main/resources:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit output: destination: queue.pretty.log.messages binder: local_rabbit binders: local_rabbit: type: rabbit environment: spring: rabbitmq: host:  port: 5672 username:  password:  virtual-host: /

The input binding will use the exchange called queue.log.messages, and the output binding will use the exchange queue.pretty.log.messages. Both bindings will use the binder called local_rabbit.

Note that we don't need to create the RabbitMQ exchanges or queues in advance. When running the application, both exchanges are automatically created.

To test the application, we can use the RabbitMQ management site to publish a message. In the Publish Message panel of the exchange queue.log.messages, we need to enter the request in JSON format.

5.3. Customizing Message Conversion

Spring Cloud Stream allows us to apply message conversion for specific content types. In the above example, instead of using JSON format, we want to provide plain text.

To do this, we'll to apply a custom transformation to LogMessage using a MessageConverter:

@SpringBootApplication @EnableBinding(Processor.class) public class MyLoggerServiceApplication { //... @Bean public MessageConverter providesTextPlainMessageConverter() { return new TextPlainMessageConverter(); } //... }
public class TextPlainMessageConverter extends AbstractMessageConverter { public TextPlainMessageConverter() { super(new MimeType("text", "plain")); } @Override protected boolean supports(Class clazz) { return (LogMessage.class == clazz); } @Override protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { Object payload = message.getPayload(); String text = payload instanceof String ? (String) payload : new String((byte[]) payload); return new LogMessage(text); } }

After applying these changes, going back to the Publish Message panel, if we set the header “contentTypes” to “text/plain” and the payload to “Hello World“, it should work as before.

5.4. Consumer Groups

When running multiple instances of our application, every time there is a new message in an input channel, all subscribers will be notified.

Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.

To enable this behavior, each consumer binding can use the spring.cloud.stream.bindings..group property to specify a group name:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit group: logMessageConsumers ...

6. Message-Driven Microservices

In this section, we introduce all the required features for running our Spring Cloud Stream applications in a microservices context.

6.1. Scaling Up

When multiple applications are running, it's important to ensure the data is split properly across consumers. To do so, Spring Cloud Stream provides two properties:

  • spring.cloud.stream.instanceCount — number of running applications
  • spring.cloud.stream.instanceIndex — index of the current application

For example, if we've deployed two instances of the above MyLoggerServiceApplication application, the property spring.cloud.stream.instanceCount should be 2 for both applications, and the property spring.cloud.stream.instanceIndex should be 0 and 1 respectively.

These properties are automatically set if we deploy the Spring Cloud Stream applications using Spring Data Flow as described in this article.

6.2. Partitioning

The domain events could be Partitioned messages. This helps when we are scaling up the storage and improving application performance.

The domain event usually has a partition key so that it ends up in the same partition with related messages.

Let's say that we want the log messages to be partitioned by the first letter in the message, which would be the partition key, and grouped into two partitions.

There would be one partition for the log messages that start with A-M and another partition for N-Z. This can be configured using two properties:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression — the expression to partition the payloads
  • spring.cloud.stream.bindings.output.producer.partitionCount — the number of groups

Sometimes the expression to partition is too complex to write it in only one line. For these cases, we can write our custom partition strategy using the property spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Health Indicator

In a microservices context, we also need to detect when a service is down or starts failing. Spring Cloud Stream provides the property management.health.binders.enabled to enable the health indicators for binders.

When running the application, we can query the health status at //:/health.

7. Conclusion

В този урок представихме основните концепции на Spring Cloud Stream и показахме как да го използваме чрез някои прости примери над RabbitMQ. Повече информация за Spring Cloud Stream можете да намерите тук.

Изходният код за тази статия може да бъде намерен в GitHub.