Въведение в KafkaStreams в Java

1. Общ преглед

В тази статия ще разгледаме библиотеката KafkaStreams .

KafkaStreams е проектиран от създателите на Apache Kafka . Основната цел на тази част от софтуера е да позволи на програмистите да създават ефективни поточни приложения в реално време, които биха могли да работят като Microservices.

KafkaStreams ни позволява да използваме от темите на Kafka, да анализираме или трансформираме данни и евентуално да ги изпращаме към друга тема на Kafka.

За да демонстрираме KafkaStreams, ще създадем просто приложение, което чете изречения от дадена тема, преброява появите на думи и отпечатва броя на думите.

Важно е да се отбележи, че библиотеката KafkaStreams не реагира и няма поддръжка за асинхронни операции и обработка на обратно налягане.

2. Зависимост на Maven

За да започнем да пишем логика за обработка на потоци, използвайки KafkaStreams, трябва да добавим зависимост към kafka-потоци и kafka-клиенти :

 org.apache.kafka kafka-streams 1.0.0   org.apache.kafka kafka-clients 1.0.0  

Също така трябва да инсталираме и стартираме Apache Kafka, защото ще използваме тема за Kafka. Тази тема ще бъде източникът на данни за нашата поточна работа.

Можем да изтеглим Kafka и други необходими зависимости от официалния уебсайт.

3. Конфигуриране на входа на KafkaStreams

Първото нещо, което ще направим, е дефиницията на входната тема Kafka.

Можем да използваме инструмента Confluent , който изтеглихме - той съдържа сървър Kafka. Той също така съдържа производителя на конзола kafka, който можем да използваме, за да публикуваме съобщения до Kafka.

За да започнем, нека стартираме нашия клъстер Kafka:

./confluent start

След като Kafka стартира, можем да дефинираме нашия източник на данни и името на нашето приложение, използвайки APPLICATION_ID_CONFIG :

String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties(); streamsConfiguration.put( StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

Важен параметър за конфигурация е BOOTSTRAP_SERVER_CONFIG. Това е URL адресът на нашия локален екземпляр на Kafka, който току-що започнахме:

private String bootstrapServers = "localhost:9092"; streamsConfiguration.put( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

След това трябва да предадем вида на ключа и стойността на съобщенията, които ще се консумират от inputTopic:

streamsConfiguration.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

Обработката на потоци често е с държавно състояние. Когато искаме да запазим междинни резултати, трябва да посочим параметъра STATE_DIR_CONFIG .

В нашия тест използваме локална файлова система:

streamsConfiguration.put( StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); 

4. Изграждане на поточна топология

След като дефинирахме нашата тема за въвеждане, можем да създадем поточна топология - това е дефиниция за това как трябва да се обработват и трансформират събитията.

В нашия пример бихме искали да приложим брояч на думи. За всяко изречение, изпратено до inputTopic, искаме да го разделим на думи и да изчислим появата на всяка дума.

Можем да използваме екземпляр на класа KStreamsBuilder, за да започнем да изграждаме нашата топология:

KStreamBuilder builder = new KStreamBuilder(); KStream textLines = builder.stream(inputTopic); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) .groupBy((key, word) -> word) .count();

За да приложим броя на думите, първо, трябва да разделим стойностите, използвайки регулярния израз.

Методът на разделяне връща масив. Използваме flatMapValues ​​(), за да го изравним . В противен случай бихме получили списък с масиви и би било неудобно да пишем код, използвайки такава структура.

И накрая, обобщаваме стойностите за всяка дума и извикваме count (), който ще изчисли появата на конкретна дума.

5. Обработка на резултатите

Вече изчислихме броя на думите на нашите входни съобщения. Сега нека отпечатаме резултатите на стандартния изход, използвайки метода foreach () :

wordCounts .foreach((w, c) -> System.out.println("word: " + w + " -> " + c));

При производството често такава работа за стрийминг може да публикува резултата в друга тема на Kafka.

Можем да направим това, използвайки метода to ():

String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String(); Serde longSerde = Serdes.Long(); wordCounts.to(stringSerde, longSerde, outputTopic);

Класът Serde ни дава предварително конфигурирани сериализатори за типове Java, които ще се използват за сериализиране на обекти до масив от байтове. След това масивът от байтове ще бъде изпратен към темата Kafka.

Използваме String като ключ към нашата тема и Long като стойност за действителното броене. Методът to () ще запази получените данни в outputTopic .

6. Стартиране на KafkaStream Job

До този момент ние изградихме топология, която може да бъде изпълнена. Работата обаче все още не е започнала.

Трябва да започнем нашата работа изрично, като извикаме метода start () в екземпляра KafkaStreams :

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start(); Thread.sleep(30000); streams.close();

Note that we are waiting 30 seconds for the job to finish. In a real-world scenario, that job would be running all the time, processing events from Kafka as they arrive.

We can test our job by publishing some events to our Kafka topic.

Let's start a kafka-console-producer and manually send some events to our inputTopic:

./kafka-console-producer --topic inputTopic --broker-list localhost:9092 >"this is a pony" >"this is a horse and pony" 

This way, we published two events to Kafka. Our application will consume those events and will print the following output:

word: -> 1 word: this -> 1 word: is -> 1 word: a -> 1 word: pony -> 1 word: -> 2 word: this -> 2 word: is -> 2 word: a -> 2 word: horse -> 1 word: and -> 1 word: pony -> 2

We can see that when the first message arrived, the word pony occurred only once. But when we sent the second message, the word pony happened for the second time printing: “word: pony -> 2″.

6. Conclusion

Тази статия обсъжда как да създадете приложение за обработка на първичен поток, използвайки Apache Kafka като източник на данни и библиотеката KafkaStreams като библиотека за обработка на потоци.

Всички тези примери и кодови фрагменти могат да бъдат намерени в проекта GitHub - това е проект на Maven, така че трябва да е лесно да се импортира и да се изпълнява както е.