Въведение в Netflix Mantis

1. Общ преглед

В тази статия ще разгледаме платформата Mantis, разработена от Netflix.

Ще изследваме основните концепции на Mantis, като създадем, стартираме и проучим задача за обработка на потоци.

2. Какво е Mantis?

Mantis е платформа за изграждане на приложения за обработка на потоци (работни места). Той осигурява лесен начин за управление на разгръщането и жизнения цикъл на работните места. Освен това улеснява разпределението на ресурсите, откриването и комуникацията между тези работни места.

Следователно разработчиците могат да се съсредоточат върху действителната бизнес логика, като през цялото време имат поддръжката на стабилна и мащабируема платформа, за да изпълняват своите големи обеми, ниска латентност, неблокиращи приложения.

Работата на Mantis се състои от три отделни части:

  • на източника , който отговаря за извличане на данни от външен източник
  • един или повече етапи , отговорни за обработката на входящите потоци от събития
  • и мивка, която събира обработените данни

Нека сега изследваме всеки от тях.

3. Настройка и зависимости

Нека започнем с добавяне на зависимостите mantis-runtime и jackson-databind :

 io.mantisrx mantis-runtime   com.fasterxml.jackson.core jackson-databind 

Сега, за да настроим източника на данни за нашата работа, нека внедрим интерфейса Mantis Source :

public class RandomLogSource implements Source { @Override public Observable
    
      call(Context context, Index index) { return Observable.just( Observable .interval(250, TimeUnit.MILLISECONDS) .map(this::createRandomLogEvent)); } private String createRandomLogEvent(Long tick) { // generate a random log entry string ... } }
    

Както виждаме, той просто генерира случайни записи в дневника няколко пъти в секунда.

4. Първата ни работа

Нека сега създадем работа на Mantis, която просто събира регистрационни събития от нашия RandomLogSource . По-късно ще добавим трансформации на групи и агрегации за по-сложен и интересен резултат.

За начало нека създадем обект LogEvent :

public class LogEvent implements JsonType { private Long index; private String level; private String message; // ... }

След това нека добавим нашата TransformLogStage.

Това е прост етап, който прилага интерфейса ScalarComputation и разделя запис в дневника, за да изгради LogEvent . Също така, той филтрира всички грешно форматирани низове:

public class TransformLogStage implements ScalarComputation { @Override public Observable call(Context context, Observable logEntry) { return logEntry .map(log -> log.split("#")) .filter(parts -> parts.length == 3) .map(LogEvent::new); } }

4.1. Изпълнение на заданието

Към този момент имаме достатъчно градивни елементи, за да съберем нашата работа с Mantis:

public class LogCollectingJob extends MantisJobProvider { @Override public Job getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), new ScalarToScalar.Config()) .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString))) .metadata(new Metadata.Builder().build()) .create(); } }

Нека разгледаме по-отблизо нашата работа.

Както виждаме, той разширява MantisJobProvider. Първоначално той извлича данни от нашия RandomLogSource и прилага TransformLogStage към извлечените данни. И накрая, той изпраща обработените данни до вградената мивка, която с нетърпение се абонира и доставя данни през SSE.

Сега, нека конфигурираме нашата работа да се изпълнява локално при стартиране:

@SpringBootApplication public class MantisApplication implements CommandLineRunner { // ... @Override public void run(String... args) { LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance()); } }

Нека да стартираме приложението. Ще видим дневник съобщение като:

... Serving modern HTTP SSE server sink on port: 86XX

Нека сега се свържем с мивката с помощта на къдрене :

$ curl localhost:86XX data: {"index":86,"level":"WARN","message":"login attempt"} data: {"index":87,"level":"ERROR","message":"user created"} data: {"index":88,"level":"INFO","message":"user created"} data: {"index":89,"level":"INFO","message":"login attempt"} data: {"index":90,"level":"INFO","message":"user created"} data: {"index":91,"level":"ERROR","message":"user created"} data: {"index":92,"level":"WARN","message":"login attempt"} data: {"index":93,"level":"INFO","message":"user created"} ...

4.2. Конфигуриране на мивката

Досега използвахме вградената мивка за събиране на обработените ни данни. Нека видим дали можем да добавим повече гъвкавост към нашия сценарий, като предоставим персонализирана мивка.

Ами ако например искаме да филтрираме дневниците по съобщение ?

Нека създадем LogSink, който реализира интерфейса Sink :

public class LogSink implements Sink { @Override public void call(Context context, PortRequest portRequest, Observable logEventObservable) { SelfDocumentingSink sink = new ServerSentEventsSink.Builder() .withEncoder(LogEvent::toJsonString) .withPredicate(filterByLogMessage()) .build(); logEventObservable.subscribe(); sink.call(context, portRequest, logEventObservable); } private Predicate filterByLogMessage() { return new Predicate("filter by message", parameters -> { if (parameters != null && parameters.containsKey("filter")) { return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0)); } return logEvent -> true; }); } }

В тази реализация на мивка конфигурирахме предикат, който използва параметъра на филтъра само за извличане на регистрационни файлове, които съдържат текста, зададен в параметъра на филтъра :

$ curl localhost:8874?filter=login data: {"index":93,"level":"ERROR","message":"login attempt"} data: {"index":95,"level":"INFO","message":"login attempt"} data: {"index":97,"level":"ERROR","message":"login attempt"} ...

Забележка Mantis предлага и мощен език за заявки, MQL, който може да се използва за заявки, трансформиране и анализ на поточни данни по SQL начин.

5. Сценична верига

Нека сега предположим, че се интересуваме от това колко грешки , ПРЕДУПРЕЖДЕНИЕ или ИНФОРМАЦИОННИ записи имаме в даден интервал от време. За това ще добавим още два етапа към нашата работа и ще ги свържем заедно.

5.1. Групиране

Първо, нека създадем GroupLogStage.

This stage is a ToGroupComputation implementation that receives a LogEvent stream data from the existing TransformLogStage. After that, it groups entries by logging level and sends them to the next stage:

public class GroupLogStage implements ToGroupComputation { @Override public Observable
    
      call(Context context, Observable logEvent) { return logEvent.map(log -> new MantisGroup(log.getLevel(), log)); } public static ScalarToGroup.Config config(){ return new ScalarToGroup.Config() .description("Group event data by level") .codec(JacksonCodecs.pojo(LogEvent.class)) .concurrentInput(); } }
    

We've also created a custom stage config by providing a description, the codec to use for serializing the output, and allowed this stage's call method to run concurrently by using concurrentInput().

One thing to note is that this stage is horizontally scalable. Meaning we can run as many instances of this stage as needed. Also worth mentioning, when deployed in a Mantis cluster, this stage sends data to the next stage so that all events belonging to a particular group will land on the same worker of the next stage.

5.2. Aggregating

Before we move on and create the next stage, let's first add a LogAggregate entity:

public class LogAggregate implements JsonType { private final Integer count; private final String level; }

Now, let's create the last stage in the chain.

This stage implements GroupToScalarComputation and transforms a stream of log groups to a scalar LogAggregate. It does this by counting how many times each type of log appears in the stream. In addition, it also has a LogAggregationDuration parameter, which can be used to control the size of the aggregation window:

public class CountLogStage implements GroupToScalarComputation { private int duration; @Override public void init(Context context) { duration = (int)context.getParameters().get("LogAggregationDuration", 1000); } @Override public Observable call(Context context, Observable
    
      mantisGroup) { return mantisGroup .window(duration, TimeUnit.MILLISECONDS) .flatMap(o -> o.groupBy(MantisGroup::getKeyValue) .flatMap(group -> group.reduce(0, (count, value) -> count = count + 1) .map((count) -> new LogAggregate(count, group.getKey())) )); } public static GroupToScalar.Config config(){ return new GroupToScalar.Config() .description("sum events for a log level") .codec(JacksonCodecs.pojo(LogAggregate.class)) .withParameters(getParameters()); } public static List
     
       getParameters() { List
      
        params = new ArrayList(); params.add(new IntParameter() .name("LogAggregationDuration") .description("window size for aggregation in milliseconds") .validator(Validators.range(100, 10000)) .defaultValue(5000) .build()); return params; } }
      
     
    

5.3. Configure and Run the Job

The only thing left to do now is to configure our job:

public class LogAggregationJob extends MantisJobProvider { @Override public Job getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), TransformLogStage.stageConfig()) .stage(new GroupLogStage(), GroupLogStage.config()) .stage(new CountLogStage(), CountLogStage.config()) .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString))) .metadata(new Metadata.Builder().build()) .create(); } }

As soon as we run the application and execute our new job, we can see the log counts being retrieved every few seconds:

$ curl localhost:8133 data: {"count":3,"level":"ERROR"} data: {"count":13,"level":"INFO"} data: {"count":4,"level":"WARN"} data: {"count":8,"level":"ERROR"} data: {"count":5,"level":"INFO"} data: {"count":7,"level":"WARN"} ...

6. Conclusion

To sum up, in this article, we've seen what Netflix Mantis is and what it can be used for. Furthermore, we looked at the main concepts, used them to build jobs, and explored custom configurations for different scenarios.

Както винаги, пълният код е достъпен в GitHub.