Изграждане на тръбопровод за данни с Kafka, Spark Streaming и Cassandra

1. Общ преглед

Apache Kafka е мащабируема платформа с висока производителност и ниска латентност, която позволява четене и запис на потоци от данни като система за съобщения . Можем да започнем с Kafka в Java доста лесно.

Spark Streaming е част от платформата Apache Spark, която позволява мащабируема, висока производителност, устойчива на грешки обработка на потоци от данни . Въпреки че е написана в Scala, Spark предлага API за Java, с които да работи.

Apache Cassandra е разпределено хранилище за данни NoSQL с широка колона . Повече подробности за Касандра можете да намерите в предишната ни статия.

В този урок ще ги комбинираме, за да създадем силно мащабируем и устойчив на грешки конвейер за данни за поток от данни в реално време .

2. Инсталации

За да започнем, ще се нуждаем от Kafka, Spark и Cassandra, инсталирани локално на нашата машина, за да стартираме приложението. Ще видим как да разработим конвейер за данни, използвайки тези платформи, докато вървим напред.

Ние обаче ще оставим всички конфигурации по подразбиране, включително портове за всички инсталации, които ще помогнат за безпроблемното провеждане на урока.

2.1. Кафка

Инсталирането на Kafka на нашата локална машина е доста лесно и може да бъде намерено като част от официалната документация. Ще използваме версията 2.1.0 на Kafka.

Освен това Kafka изисква Apache Zookeeper да стартира, но за целите на този урок ще използваме екземпляра Zookeeper с един възел, пакетиран с Kafka.

След като успеем да стартираме Zookeeper и Kafka локално, следвайки официалното ръководство, можем да пристъпим към създаването на нашата тема, наречена „съобщения“:

 $KAFKA_HOME$\bin\windows\kafka-topics.bat --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic messages

Обърнете внимание, че горният скрипт е за платформа на Windows, но има подобни скриптове и за Unix-подобни платформи.

2.2. Искра

Spark използва клиентските библиотеки на Hadoop за HDFS и YARN. Следователно може да бъде много сложно да се съберат съвместимите версии на всички тях . Официалното изтегляне на Spark обаче е предварително пакетирано с популярни версии на Hadoop. За този урок ще използваме пакет 2.3.0 „предварително изграден за Apache Hadoop 2.7 и по-нови версии“.

След като се разопакова правилният пакет на Spark, наличните скриптове могат да се използват за подаване на заявления. Ще видим това по-късно, когато разработим приложението си в Spring Boot.

2.3. Касандра

DataStax предлага обществено издание на Cassandra за различни платформи, включително Windows. Можем да изтеглим и инсталираме това на нашата локална машина много лесно, следвайки официалната документация. Ще използваме версия 3.9.0.

След като успеем да инсталираме и стартираме Cassandra на нашата локална машина, можем да пристъпим към създаването на нашето пространство от клавиши и таблица. Това може да се направи с помощта на CQL Shell, която се доставя с нашата инсталация:

CREATE KEYSPACE vocabulary WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; USE vocabulary; CREATE TABLE words (word text PRIMARY KEY, count int);

Имайте предвид, че създадохме пространство от имена, наречено речник и таблица в него, наречено думи с две колони, дума и брой .

3. Зависимости

Можем да интегрираме зависимости Kafka и Spark в нашето приложение чрез Maven. Ще изтеглим тези зависимости от Maven Central:

  • Core Spark
  • SQL Spark
  • Streaming Spark
  • Стрийминг на Kafka Spark
  • Касандра Искра
  • Касандра Java Spark

И ние можем да ги добавим към нашия pom съответно:

 org.apache.spark spark-core_2.11 2.3.0 provided   org.apache.spark spark-sql_2.11 2.3.0 provided   org.apache.spark spark-streaming_2.11 2.3.0 provided   org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.0   com.datastax.spark spark-cassandra-connector_2.11 2.3.0   com.datastax.spark spark-cassandra-connector-java_2.11 1.5.2 

Имайте предвид, че някои от тези зависимости са маркирани като предоставени в обхвата. Това е така, защото те ще бъдат предоставени от инсталацията на Spark, където ще подадем заявлението за изпълнение с помощта на spark-submit.

4. Искрено поточно предаване - интеграционни стратегии на Kafka

На този етап си струва да поговорим накратко за интеграционните стратегии за Spark и Kafka.

Kafka представи нов потребителски API между версии 0.8 и 0.10. Следователно, съответните пакети Spark Streaming са налични и за двете версии на брокера. Важно е да изберете правилния пакет в зависимост от наличния брокер и желаните функции.

4.1. Spark Streaming Kafka 0.8

Версията 0.8 е API за стабилна интеграция с опции за използване на приемник или директен подход . Няма да навлизаме в подробности за тези подходи, които можем да намерим в официалната документация. Тук е важно да се отбележи, че този пакет е съвместим с Kafka Broker версии 0.8.2.1 или по-нова.

4.2. Spark Streaming Kafka 0.10

Понастоящем това е в експериментално състояние и е съвместимо само с Kafka Broker версии 0.10.0 или по-нова. Този пакет предлага само директен подход, като сега използва новия потребителски API на Kafka . Повече подробности за това можем да намерим в официалната документация. Важно е, че той не е обратно съвместим с по-старите версии на Kafka Broker .

Моля, имайте предвид, че за този урок ще използваме пакета 0.10. Зависимостта, спомената в предишния раздел, се отнася само до това.

5. Разработване на тръбопровод за данни

We'll create a simple application in Java using Spark which will integrate with the Kafka topic we created earlier. The application will read the messages as posted and count the frequency of words in every message. This will then be updated in the Cassandra table we created earlier.

Let's quickly visualize how the data will flow:

5.1. Getting JavaStreamingContext

Firstly, we'll begin by initializing the JavaStreamingContext which is the entry point for all Spark Streaming applications:

SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("WordCountingApp"); sparkConf.set("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = new JavaStreamingContext( sparkConf, Durations.seconds(1));

5.2. Getting DStream from Kafka

Now, we can connect to the Kafka topic from the JavaStreamingContext:

Map kafkaParams = new HashMap(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList("messages"); JavaInputDStream
    
      messages = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams));
    

Please note that we've to provide deserializers for key and value here. For common data types like String, the deserializer is available by default. However, if we wish to retrieve custom data types, we'll have to provide custom deserializers.

Here, we've obtained JavaInputDStream which is an implementation of Discretized Streams or DStreams, the basic abstraction provided by Spark Streaming. Internally DStreams is nothing but a continuous series of RDDs.

5.3. Processing Obtained DStream

We'll now perform a series of operations on the JavaInputDStream to obtain word frequencies in the messages:

JavaPairDStream results = messages .mapToPair( record -> new Tuple2(record.key(), record.value()) ); JavaDStream lines = results .map( tuple2 -> tuple2._2() ); JavaDStream words = lines .flatMap( x -> Arrays.asList(x.split("\\s+")).iterator() ); JavaPairDStream wordCounts = words .mapToPair( s -> new Tuple2(s, 1) ).reduceByKey( (i1, i2) -> i1 + i2 );

5.4. Persisting Processed DStream into Cassandra

Finally, we can iterate over the processed JavaPairDStream to insert them into our Cassandra table:

wordCounts.foreachRDD( javaRdd -> { Map wordCountMap = javaRdd.collectAsMap(); for (String key : wordCountMap.keySet()) { List wordList = Arrays.asList(new Word(key, wordCountMap.get(key))); JavaRDD rdd = streamingContext.sparkContext().parallelize(wordList); javaFunctions(rdd).writerBuilder( "vocabulary", "words", mapToRow(Word.class)).saveToCassandra(); } } );

5.5. Running the Application

As this is a stream processing application, we would want to keep this running:

streamingContext.start(); streamingContext.awaitTermination();

6. Leveraging Checkpoints

In a stream processing application, it's often useful to retain state between batches of data being processed.

For example, in our previous attempt, we are only able to store the current frequency of the words. What if we want to store the cumulative frequency instead? Spark Streaming makes it possible through a concept called checkpoints.

We'll now modify the pipeline we created earlier to leverage checkpoints:

Please note that we'll be using checkpoints only for the session of data processing. This does not provide fault-tolerance. However, checkpointing can be used for fault tolerance as well.

There are a few changes we'll have to make in our application to leverage checkpoints. This includes providing the JavaStreamingContext with a checkpoint location:

streamingContext.checkpoint("./.checkpoint");

Here, we are using the local filesystem to store checkpoints. However, for robustness, this should be stored in a location like HDFS, S3 or Kafka. More on this is available in the official documentation.

Next, we'll have to fetch the checkpoint and create a cumulative count of words while processing every partition using a mapping function:

JavaMapWithStateDStream
    
      cumulativeWordCounts = wordCounts .mapWithState( StateSpec.function( (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2 output = new Tuple2(word, sum); state.update(sum); return output; } ) );
    

Once we get the cumulative word counts, we can proceed to iterate and save them in Cassandra as before.

Please note that while data checkpointing is useful for stateful processing, it comes with a latency cost. Hence, it's necessary to use this wisely along with an optimal checkpointing interval.

7. Understanding Offsets

If we recall some of the Kafka parameters we set earlier:

kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false);

These basically mean that we don't want to auto-commit for the offset and would like to pick the latest offset every time a consumer group is initialized. Consequently, our application will only be able to consume messages posted during the period it is running.

If we want to consume all messages posted irrespective of whether the application was running or not and also want to keep track of the messages already posted, we'll have to configure the offset appropriately along with saving the offset state, though this is a bit out of scope for this tutorial.

This is also a way in which Spark Streaming offers a particular level of guarantee like “exactly once”. This basically means that each message posted on Kafka topic will only be processed exactly once by Spark Streaming.

8. Deploying Application

We can deploy our application using the Spark-submit script which comes pre-packed with the Spark installation:

$SPARK_HOME$\bin\spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local[2] \target\spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies.jar

Моля, обърнете внимание, че бурканът, който създаваме с помощта на Maven, трябва да съдържа зависимостите, които не са маркирани, както е предвидено в обхвата.

След като изпратим това заявление и публикуваме някои съобщения в темата за Kafka, която създадохме по-рано, трябва да видим кумулативния брой думи, публикуван в таблицата Cassandra, която създадохме по-рано.

9. Заключение

В обобщение, в този урок научихме как да създадем прост конвейер за данни, използвайки Kafka, Spark Streaming и Cassandra. Също така научихме как да използваме контролно-пропускателните пунктове в Spark Streaming, за да поддържаме състоянието между партидите.

Както винаги, кодът за примерите е достъпен в GitHub.