Първи стъпки с поточна обработка с Spring Cloud Data Flow

1. Въведение

Spring Cloud Data Flow е облачен модел за програмиране и работа за микрослужби за данни, които могат да се композират.

С Spring Cloud Data Flow разработчиците могат да създават и организират конвейери за данни за често използвани случаи, като поглъщане на данни, анализ в реално време и импортиране / експортиране на данни.

Тези тръбопроводи за данни се предлагат в два вида, поточни и пакетни тръбопроводи за данни.

В първия случай неограничено количество данни се консумира или произвежда чрез междинен софтуер за съобщения. Докато във втория случай краткотрайната задача обработва краен набор от данни и след това се прекратява.

Тази статия ще се фокусира върху поточната обработка.

2. Архитектурен преглед

Ключовите компоненти на този тип архитектура са Приложения , Сървър за поток на данни и целевата среда на изпълнение.

Освен тези ключови компоненти, ние също обикновено имаме Shell на потока от данни и посредник за съобщения в рамките на архитектурата.

Нека разгледаме всички тези компоненти по-подробно.

2.1. Приложения

Обикновено конвейерът за поточно предаване на данни включва консумиращи събития от външни системи, обработка на данни и постоянство на полиглот. Тези фази обикновено се наричат терминология „ Източник“ , „ Процесор “ и „ Мивка“ в Spring Cloud :

  • Източник: е приложението, което консумира събития
  • Процесор: консумира данни от Източника , извършва някаква обработка върху него и излъчва обработените данни към следващото приложение в конвейера
  • Мивка: или консумира от източник или от процесор и записва данните в желания слой на постоянство

Тези приложения могат да бъдат пакетирани по два начина:

  • Uber-jar на Spring Boot, който се хоства в хранилище на maven, файл, http или друго изпълнение на ресурс Spring (този метод ще бъде използван в тази статия)
  • Докер

Много приложения, процесори и приемници за често срещани случаи (напр. Jdbc, hdfs, http, рутер) вече са предоставени и готови за използване от екипа на Spring Cloud Data Flow .

2.2. Времетраене

Също така е необходимо време за изпълнение, за да могат тези приложения да се изпълняват. Поддържаните времена за изпълнение са:

  • Облачна леярна
  • Прежда Apache
  • Кубернети
  • Apache Mesos
  • Локален сървър за разработка (който ще бъде използван в тази статия)

2.3. Сървър за поток на данни

Компонентът, който е отговорен за внедряването на приложения в изпълнение, е сървърът за поток на данни . Има данни Flow сървър изпълним буркан условие за всяка от целевите автономна работа.

Най- Flow сървър на данни е отговорен за тълкуване:

  • Поток DSL, който описва логическия поток от данни през множество приложения.
  • Манифест за внедряване, който описва картографирането на приложенията по време на изпълнение.

2.4. Черупка на потока от данни

Shell на потока от данни е клиент за сървъра за поток на данни. Черупката ни позволява да изпълним DSL командата, необходима за взаимодействие със сървъра.

Като пример DSL, за да опише потока от данни от http източник към jdbc мивка, ще бъде записан като „http | jdbc ”. Тези имена в DSL се регистрират в Data Flow Server и се отразяват върху артефакти на приложения, които могат да бъдат хоствани в хранилищата на Maven или Docker.

Spring предлага и графичен интерфейс, наречен Flo , за създаване и наблюдение на поточни потоци от данни. Използването му обаче е извън обсъждането на тази статия.

2.5. Брокер за съобщения

Както видяхме в примера от предишния раздел, ние използвахме символа на тръбата в дефиницията на потока от данни. Символът на тръбата представлява комуникацията между двете приложения чрез междинен софтуер за съобщения.

Това означава, че се нуждаем от посредник на съобщения, работещ и работещ в целевата среда.

Двата посредника за посреднически съобщения за съобщения, които се поддържат, са:

  • Apache Kafka
  • RabbitMQ

И така, сега, когато имаме преглед на архитектурните компоненти - време е да изградим първия си тръбопровод за обработка на потоци.

3. Инсталирайте посредник за съобщения

Както видяхме, приложенията в конвейер се нуждаят от междинен софтуер за съобщения, за да комуникират. За целите на тази статия ще използваме RabbitMQ .

За пълните подробности за инсталацията можете да следвате инструкциите на официалния сайт.

4. Сървърът за локален поток от данни

За да ускорим процеса на генериране на нашите приложения, ще използваме Spring Initializr; с негова помощ можем да получим нашите приложения Spring Boot за няколко минути.

След като стигнете до уебсайта, просто изберете група и име на артефакт .

След като направите това, кликнете върху бутона Генериране на проект, за да започнете изтеглянето на артефакта на Maven.

След като изтеглянето приключи, разархивирайте проекта и го импортирайте като проект на Maven в избраната от вас IDE.

Нека добавим Maven зависимост към проекта. Тъй като ще ни трябват библиотеки от локален сървър на потока данни, нека добавим зависимостта spring-cloud-starter-dataflow-server-local:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Сега трябва да анотираме основния клас Spring Boot с анотация @EnableDataFlowServer :

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowServerApplication.class, args); } } 

Това е всичко. Нашият сървър за локален поток от данни е готов за изпълнение:

mvn spring-boot:run

Приложението ще се стартира на порт 9393.

5. Черупката на потока от данни

Отново отидете на Spring Initializr и изберете име на група и артефакт .

След като изтеглим и импортираме проекта, нека добавим зависимост spring-cloud-dataflow-shell:

 org.springframework.cloud spring-cloud-dataflow-shell 

Сега трябва да добавим анотацията @EnableDataFlowShell към основния клас Spring Boot :

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication { public static void main(String[] args) { SpringApplication.run(SpringDataFlowShellApplication.class, args); } } 

Сега можем да стартираме черупката:

mvn spring-boot:run

След като черупката се изпълни, можем да напишем командата за помощ в подканата, за да видим пълен списък с команди, които можем да изпълним.

6. Изходното приложение

По същия начин в Initializr сега ще създадем просто приложение и ще добавим зависимост на Stream Rabbit, наречена spring-cloud-starter-stream-rabbit:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

След това ще добавим анотацията @EnableBinding (Source.class) към основния клас Spring Boot :

@EnableBinding(Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeSourceApplication.class, args); } }

Сега трябва да определим източника на данните, които трябва да бъдат обработени. Този източник може да бъде всяко потенциално безкрайно натоварване (данни от сензора за интернет, 24/7 обработка на събития, поглъщане на данни за онлайн транзакции).

В нашето примерно приложение ние създаваме по едно събитие (за простота нов клеймо за време) на всеки 10 секунди с Poller .

В @InboundChannelAdapter анотацията изпраща съобщение до изходния канал на източника, като се използва върнатата стойност като полезният товар на съобщението:

@Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource timeMessageSource() { return () -> MessageBuilder.withPayload(new Date().getTime()).build(); } 

Нашият източник на данни е готов.

7. Приложението на процесора

След това ще създадем приложение и ще добавим зависимост от Stream Rabbit .

След това ще добавим анотацията @EnableBinding (Processor.class) към основния клас Spring Boot :

@EnableBinding(Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowTimeProcessorApplication.class, args); } }

След това трябва да дефинираме метод за обработка на данните, идващи от изходното приложение.

За да дефинираме трансформатор, трябва да анотираме този метод с анотация @Transformer :

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(Long timestamp) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy"); String date = dateFormat.format(timestamp); return date; }

Той преобразува клеймо за време от „входния“ канал до форматирана дата, която ще бъде изпратена до „изходния“ канал.

8. Приложението за мивка

Последното приложение, което е създадено, е приложението Sink.

Отново отидете на Spring Initializr и изберете група , име на артефакт . След изтеглянето на проекта нека добавим зависимост от Stream Rabbit .

След това добавете анотацията @EnableBinding (Sink.class) към основния клас Spring Boot :

@EnableBinding(Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication { public static void main(String[] args) { SpringApplication.run( SpringDataFlowLoggingSinkApplication.class, args); } }

Сега се нуждаем от метод за прихващане на съобщенията, идващи от приложението на процесора.

За да направите това, трябва да добавим анотацията @StreamListener (Sink.INPUT) към нашия метод:

@StreamListener(Sink.INPUT) public void loggerSink(String date) { logger.info("Received: " + date); }

Методът просто отпечатва клеймото, преобразувано във форматирана дата, в дневник.

9. Register a Stream App

The Spring Cloud Data Flow Shell allow us to Register a Stream App with the App Registry using the app register command.

We must provide a unique name, application type, and a URI that can be resolved to the app artifact. For the type, specify “source“, “processor“, or “sink“.

When providing a URI with the maven scheme, the format should conform to the following:

maven://:[:[:]]:

To register the Source, Processor and Sink applications previously created , go to the Spring Cloud Data Flow Shell and issue the following commands from the prompt:

app register --name time-source --type source --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-source:jar:0.0.1-SNAPSHOT app register --name time-processor --type processor --uri maven://com.baeldung.spring.cloud:spring-data-flow-time-processor:jar:0.0.1-SNAPSHOT app register --name logging-sink --type sink --uri maven://com.baeldung.spring.cloud:spring-data-flow-logging-sink:jar:0.0.1-SNAPSHOT 

10. Create and Deploy the Stream

To create a new stream definition go to the Spring Cloud Data Flow Shell and execute the following shell command:

stream create --name time-to-log --definition 'time-source | time-processor | logging-sink'

This defines a stream named time-to-log based on the DSL expression ‘time-source | time-processor | logging-sink'.

Then to deploy the stream execute the following shell command:

stream deploy --name time-to-log

The Data Flow Server resolves time-source, time-processor, and logging-sink to maven coordinates and uses those to launch the time-source, time-processor and logging-sink applications of the stream.

If the stream is correctly deployed you’ll see in the Data Flow Server logs that the modules have been started and tied together:

2016-08-24 12:29:10.516 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer: deploying app time-to-log.logging-sink instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink 2016-08-24 12:29:17.600 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-processor instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034556862/time-to-log.time-processor 2016-08-24 12:29:23.280 INFO 8096 --- [io-9393-exec-10] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time-source instance 0 Logs will be in PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034562861/time-to-log.time-source

11. Reviewing the Result

In this example, the source simply sends the current timestamp as a message each second, the processor format it and the log sink outputs the formatted timestamp using the logging framework.

The log files are located within the directory displayed in the Data Flow Server’s log output, as shown above. To see the result, we can tail the log:

tail -f PATH_TO_LOG/spring-cloud-dataflow-1276836171391672089/time-to-log-1472034549734/time-to-log.logging-sink/stdout_0.log 2016-08-24 12:40:42.029 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:01 2016-08-24 12:40:52.035 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:11 2016-08-24 12:41:02.030 INFO 9488 --- [r.time-to-log-1] s.c.SpringDataFlowLoggingSinkApplication : Received: 2016/08/24 11:40:21

12. Conclusion

In this article, we have seen how to build a data pipeline for stream processing through the use of Spring Cloud Data Flow.

Също така видяхме ролята на приложенията Source , Processor и Sink вътре в потока и как да включим и свържем този модул вътре в Data Flow Server чрез използването на Shell на потока от данни .

Примерният код може да бъде намерен в проекта GitHub.