Въведение в Hazelcast Jet

1. Въведение

В този урок ще научим за Hazelcast Jet. Това е разпределена машина за обработка на данни, предоставена от Hazelcast, Inc. и е изградена върху Hazelcast IMDG.

Ако искате да научите за Hazelcast IMDG, ето статия за започване.

2. Какво е Hazelcast Jet?

Hazelcast Jet е разпределена машина за обработка на данни, която третира данните като потоци. Той може да обработва данни, които се съхраняват в база данни или файлове, както и данните, които се предават от сървър на Kafka.

Нещо повече, той може да изпълнява агрегирани функции върху безкрайни потоци от данни, като разделя потоците на подмножества и прилага агрегиране върху всяко подмножество. Тази концепция е известна като прозорец в терминологията Jet.

Можем да разположим Jet в клъстер от машини и след това да му изпратим нашите задачи за обработка на данни. Jet ще накара всички членове на клъстера автоматично да обработят данните. Всеки член на клъстера консумира част от данните и това улеснява мащабирането до всяко ниво на производителност.

Ето типичните случаи на използване на Hazelcast Jet:

  • Обработка на потоци в реално време
  • Бърза групова обработка
  • Обработка на Java 8 потоци по разпределен начин
  • Обработка на данни в Microservices

3. Настройка

За да настроим Hazelcast Jet в нашата среда, просто трябва да добавим една зависимост на Maven към нашия pom.xml .

Ето как го правим:

 com.hazelcast.jet hazelcast-jet 4.2 

Включването на тази зависимост ще изтегли файл от 10 Mb jar, който ни предоставя цялата инфраструктура, от която се нуждаем, за да изградим разпределен конвейер за обработка на данни.

Най-новата версия на Hazelcast Jet можете да намерите тук.

4. Примерно приложение

За да научим повече за Hazelcast Jet, ще създадем примерно приложение, което взема въведени изречения и дума, която да се намери в тези изречения и връща броя на посочената дума в тези изречения.

4.1. Тръбопроводът

Pipeline формира основната конструкция за Jet приложение. Обработката в конвейер следва следните стъпки:

  • чете данни от източник
  • трансформира данните
  • запис на данни в мивка

За нашето приложение конвейерът ще чете от разпределен списък , ще приложи трансформацията на групиране и агрегиране и накрая ще напише в разпределена карта .

Ето как пишем нашия конвейер:

private Pipeline createPipeLine() { Pipeline p = Pipeline.create(); p.readFrom(Sources.list(LIST_NAME)) .flatMap(word -> traverseArray(word.toLowerCase().split("\\W+"))) .filter(word -> !word.isEmpty()) .groupingKey(wholeItem()) .aggregate(counting()) .writeTo(Sinks.map(MAP_NAME)); return p; }

След като прочетем от източника, обхождаме данните и ги разделяме около пространството с помощта на регулярен израз. След това филтрираме празните места.

Накрая групираме думите, обобщаваме ги и записваме резултатите в Карта.

4.2. Работата

Сега, когато нашият конвейер е дефиниран, ние създаваме работа за изпълнение на конвейера.

Ето как пишем функцията countWord, която приема параметри и връща броя:

public Long countWord(List sentences, String word) { long count = 0; JetInstance jet = Jet.newJetInstance(); try { List textList = jet.getList(LIST_NAME); textList.addAll(sentences); Pipeline p = createPipeLine(); jet.newJob(p).join(); Map counts = jet.getMap(MAP_NAME); count = counts.get(word); } finally { Jet.shutdownAll(); } return count; }

Първо създаваме екземпляр Jet, за да създадем нашата работа и да използваме конвейера. След това копираме входния списък в разпределен списък, така че да е достъпен за всички инстанции.

След това изпращаме работа, използвайки тръбопровода, който сме изградили по-горе. Методът newJob () връща изпълнима задача, която се стартира от Jet асинхронно. Методът за присъединяване изчаква заданието да завърши и извежда изключение, ако заданието е завършено с грешка.

Когато задачата завърши, резултатите се извличат в разпределена карта, както дефинирахме в нашия конвейер. И така, получаваме картата от екземпляра на Jet и получаваме броя на думите срещу нея.

И накрая, изключихме екземпляра Jet. Важно е да го изключите, след като изпълнението ни приключи, тъй като екземпляр Jet стартира свои собствени нишки . В противен случай процесът ни в Java ще остане жив дори след излизане от метода ни.

Ето единичен тест, който тества кода, който сме написали за Jet:

@Test public void whenGivenSentencesAndWord_ThenReturnCountOfWord() { List sentences = new ArrayList(); sentences.add("The first second was alright, but the second second was tough."); WordCounter wordCounter = new WordCounter(); long countSecond = wordCounter.countWord(sentences, "second"); assertEquals(3, countSecond); }

5. Заключение

В тази статия научихме за Hazelcast Jet. За да научите повече за него и неговите характеристики, вижте ръководството.

Както обикновено, кодът за примерите, използвани в тази статия, може да бъде намерен в Github.