Тестване на Kafka и Spring Boot

1. Общ преглед

Apache Kafka е мощна, разпределена, устойчива на грешки система за обработка на потоци. В предишен урок научихме как да работим с Spring и Kafka.

В този урок ще надградим предишния и ще се научим как да пишем надеждни, самостоятелни тестове за интеграция, които не разчитат на работещ външен сървър на Kafka .

Първо ще започнем, но ще разгледаме как да използваме и конфигурираме вграден екземпляр на Kafka. Тогава ще видим как можем да използваме популярните рамкови Testcontainers от нашите тестове.

2. Зависимости

Разбира се, ще трябва да добавим стандартната зависимост spring-kafka към нашия pom.xml :

 org.springframework.kafka spring-kafka 2.6.3.RELEASE 

Тогава ще са ни необходими още две зависимости, специално за нашите тестове . Първо ще добавим артефакт за пролетен тест kafka :

 org.springframework.kafka spring-kafka-test 2.6.3.RELEASE test 

И накрая, ще добавим зависимостта Testcontainers Kafka, която е достъпна и в Maven Central:

 org.testcontainers kafka 1.15.0 test 

Сега, след като сме конфигурирали всички необходими зависимости, можем да напишем просто приложение Spring Boot, използвайки Kafka.

3. Просто приложение на производителя и потребителя на Kafka

По време на този урок фокусът на нашите тестове ще бъде просто приложение на производител-потребител Spring Boot Kafka.

Нека започнем с дефиниране на нашата точка за влизане в приложението:

@SpringBootApplication @EnableAutoConfiguration public class KafkaProducerConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaProducerConsumerApplication.class, args); } }

Както виждаме, това е стандартно приложение Spring Boot. Където е възможно, искаме да използваме стойностите на конфигурацията по подразбиране . Имайки предвид това, ние използваме анотацията @EnableAutoConfiguration за автоматично конфигуриране на нашето приложение.

3.1. Настройка на продуцента

След това нека помислим за производител на боб, който ще използваме за изпращане на съобщения до дадена тема на Kafka:

@Component public class KafkaProducer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class); @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, String payload) { LOGGER.info("sending payload="{}" to topic="{}"", payload, topic); kafkaTemplate.send(topic, payload); } }

Нашият KafkaProducer боб, дефиниран по-горе, е просто обвивка около класа KafkaTemplate . Този клас осигурява операции на високо ниво, безопасни за нишки, като изпращане на данни към предоставената тема, което е точно това, което правим в нашия метод за изпращане .

3.2. Потребителска настройка

По същия начин сега ще дефинираме прост потребителски боб, който ще слуша тема на Kafka и получава съобщения:

@Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); private CountDownLatch latch = new CountDownLatch(1); private String payload = null; @KafkaListener(topics = "${test.topic}") public void receive(ConsumerRecord consumerRecord) { LOGGER.info("received payload="{}"", consumerRecord.toString()); setPayload(consumerRecord.toString()); latch.countDown(); } public CountDownLatch getLatch() { return latch; } public String getPayload() { return payload; } }

Нашият прост потребител използва анотацията @KafkaListener за метода за получаване , за да слуша съобщения по дадена тема. Ще видим по-късно как конфигурираме test.topic от нашите тестове.

Освен това методът за получаване съхранява съдържанието на съобщението в нашия боб и намалява броя на променливата на резето . Тази променлива е просто поле за брояч, безопасно за нишки, което ще използваме по-късно от нашите тестове, за да гарантираме, че сме получили успешно съобщение .

Сега, когато имаме нашето просто приложение Kafka, използващо Spring Boot, нека да видим как можем да напишем тестове за интеграция.

4. Слово за тестване

Като цяло, когато пишем тестове за чиста интеграция, не бива да разчитаме на външни услуги, които може да не можем да контролираме или може внезапно да спрат да работят . Това може да има неблагоприятни ефекти върху резултатите от теста ни.

По същия начин, ако сме зависими от външна услуга, в този случай работещ брокер на Kafka, вероятно няма да можем да я настроим, контролираме и съборим по начина, по който искаме от нашите тестове.

4.1. Свойства на приложението

Ще използваме много лек набор от свойства за конфигуриране на приложения от нашите тестове. Ще определим тези свойства в нашия файл src / test / resources / application.yml :

spring: kafka: consumer: auto-offset-reset: earliest group-id: baeldung test: topic: embedded-test-topic

Това е минималният набор от свойства, от които се нуждаем, когато работим с вграден екземпляр на Kafka или местен брокер.

Повечето от тях са разбираеми, но това, което трябва да подчертаем от особено значение, е автоматичното компенсиране-нулиране на потребителското свойство : най-рано . Това свойство гарантира, че нашата група потребители получава съобщенията, които изпращаме, защото контейнерът може да стартира след завършване на изпращанията.

Освен това конфигурираме свойство на тема със стойността embedded-test-topic , която е темата, която ще използваме от нашите тестове.

5. Тестване с помощта на вграден Kafka

В този раздел ще разгледаме как да използваме екземпляр Kafka в паметта, за да стартираме нашите тестове. Това е известно и като Embedded Kafka.

Тестът за зависимост spring-kafka, който добавихме по-рано, съдържа някои полезни помощни програми, които помагат при тестването на нашето приложение. Най-забележителното е, че съдържа класа EmbeddedKafkaBroker .

Имайки това предвид, нека продължим и напишем първия си тест за интеграция:

@SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) class EmbeddedKafkaIntegrationTest { @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own simple KafkaProducer"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }

Нека да разгледаме ключовите части на нашия тест. Първо, започваме с декориране на нашия тестов клас с две доста стандартни пролетни анотации:

  • В @SpringBootTest пояснението ще се гарантира, че нашия тест за връзките на обувките контекста на пролетния приложение
  • Също така използваме анотацията @DirtiesContext , която ще гарантира, че този контекст е почистен и нулиран между различните тестове

Тук идва ключовата част, използваме анотацията @EmbeddedKafka, за да инжектираме екземпляр на EmbeddedKafkaBroker в нашите тестове . Освен това има няколко налични свойства, които можем да използваме за конфигуриране на вградения възел Kafka:

  • дялове - това е броят на дяловете, използвани за тема. За да запазим нещата приятни и прости, искаме да използваме само едно от нашите тестове
  • brokerProperties - допълнителни свойства за брокера Kafka. Отново улесняваме нещата и посочваме слушател на обикновен текст и номер на порт

След това автоматично свързваме класовете на потребителите и производителите и конфигурираме тема, за да използва стойността от нашата application.properties .

За последното парче от мозайката ние просто изпращаме съобщение до тестовата ни тема и проверяваме дали съобщението е получено и съдържа името на тестовата ни тема .

Когато стартираме нашия тест, ще видим сред многословния изход Spring:

... 12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer - sending payload="Sending with our own simple KafkaProducer" to topic="embedded-test-topic" ... 12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.b.kafka.embedded.KafkaConsumer - received payload= 'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, serialized key size = -1, serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),  key = null, value = Изпращане със собствения ни прост KafkaProducer) ' 

Това потвърждава, че нашият тест работи правилно. Страхотно! Сега имаме начин да напишем самостоятелни, независими тестове за интеграция, използвайки брокер на Kafka в паметта .

6. Тестване на Kafka с TestContainers

Понякога можем да видим малки разлики между реална външна услуга и вграден екземпляр на памет на услуга, която е специално предоставена за целите на тестването. Макар и малко вероятно, може да се окаже, че портът, използван от нашия тест, може да бъде зает, причинявайки неизправност .

Имайки това предвид, в този раздел ще видим вариация на предишния ни подход към тестване с помощта на рамката Testcontainers. Ще видим как да създадем екземпляр и да управляваме външен брокер на Apache Kafka, хостван в контейнер на Docker от нашия тест за интеграция.

Нека дефинираме друг тест за интеграция, който ще бъде доста подобен на този, който видяхме в предишния раздел:

@RunWith(SpringRunner.class) @Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest(classes = KafkaProducerConsumerApplication.class) @DirtiesContext public class KafkaTestContainersLiveTest { @ClassRule public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own controller"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }

Нека да разгледаме разликите този път. Декларираме полето kafka , което е стандартно JUnit @ClassRule . Това поле е екземпляр на класа KafkaContainer, който ще подготви и управлява жизнения цикъл на нашия контейнер, работещ с Kafka.

За да се избегнат сблъсъци на портове, Testcontainers разпределя динамично номера на порта, когато стартира нашият контейнер за докер. Поради тази причина предлагаме персонализирана потребителска и производствена конфигурация на производителя, използвайки класа KafkaTestContainersConfiguration :

@Bean public Map consumerConfigs() { Map props = new HashMap(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // more standard configuration return props; } @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); // more standard configuration return new DefaultKafkaProducerFactory(configProps); }

След това препращаме към тази конфигурация чрез анотацията @Import в началото на нашия тест.

The reason for this is that we need a way to inject the server address into our application, which as previously mentioned, is generated dynamically. We achieve this by calling the getBootstrapServers() method, which will return the bootstrap server location:

bootstrap.servers = [PLAINTEXT://localhost:32789]

Now when we run our test, we should see that Testcontainers does several things:

  • Checks our local Docker setup.
  • Pulls the confluentinc/cp-kafka:5.4.3 docker image if necessary
  • Starts a new container and waits for it to be ready
  • Finally, shuts down and deletes the container after our test finishes

Again, this is confirmed by inspecting the test output:

13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Creating container for image: confluentinc/cp-kafka:5.4.3 13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! A working integration test using a Kafka docker container.

7. Conclusion

В тази статия научихме за няколко подхода за тестване на приложенията на Kafka с Spring Boot. При първия подход видяхме как да конфигурираме и използваме локален брокер Kafka в паметта.

След това видяхме как да използваме Testcontainers, за да настроим външен брокер на Kafka, работещ в контейнер за докер от нашите тестове.

Както винаги, пълният изходен код на статията е достъпен в GitHub.