Mantis是一个用于构建流处理应用程序(作业)的平台。它提供了一种简便的方法来管理作业的部署和生命周期。此外,它有助于这些作业之间的资源分配,发现和通信。
因此,开发人员可以始终专注于实际的业务逻辑,同时始终获得强大且可扩展的平台的支持,以运行其高容量,低延迟,无阻塞的应用程序。
螳螂的工作包括三个不同的部分:
- source,负责从外部源检索所述数据
- 一个或多个阶段stages,负责处理传入的事件流
- sink收集处理过的数据
现在让我们探索它们。
加入 mantis-runtime jackson-databind 依赖:
<dependency> <groupId>io.mantisrx</groupId> <artifactId>mantis-runtime</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
|
现在,为了设置工作的数据源,让我们实现Mantis Source接口:
public class RandomLogSource implements Source<String> { @Override public Observable<Observable<String>> call(Context context, Index index) { return Observable.just( Observable .interval(250, TimeUnit.MILLISECONDS) .map(this::createRandomLogEvent)); } private String createRandomLogEvent(Long tick) { // generate a random log entry string ... } }
|
如我们所见,它只是每秒多次生成随机日志条目。
创建一个Mantis作业
该作业只是从我们的RandomLogSource收集日志事件。稍后,我们将添加组和聚合转换以获得更复杂和有趣的结果。
首先,让我们创建一个LogEvent实体:
public class LogEvent implements JsonType { private Long index; private String level; private String message; // ... }
|
然后,让我们添加我们的TransformLogStage。可实现ScalarComputation接口并拆分日志条目以构建LogEvent。此外,它还会过滤出任何错误的格式化字符串:
public class TransformLogStage implements ScalarComputation<String, LogEvent> { @Override public Observable<LogEvent> call(Context context, Observable<String> logEntry) { return logEntry .map(log -> log.split("#")) .filter(parts -> parts.length == 3) .map(LogEvent::new); } }
|
运行作业
public class LogCollectingJob extends MantisJobProvider<LogEvent> { @Override public Job<LogEvent> getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), new ScalarToScalar.Config<>()) .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString))) .metadata(new Metadata.Builder().build()) .create(); } }
|
它扩展了MantisJobProvider。首先,它从我们的RandomLogSource获取数据,并将TransformLogStage应用于获取的数据。最后,它将处理后的数据发送到内置的接收器,该接收器热切地通过SSE订阅和传递数据。
现在,让我们将作业配置为在启动时本地执行:@SpringBootApplication public class MantisApplication implements CommandLineRunner { // ... @Override public void run(String... args) { LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance()); } }
|
让我们运行该应用程序。我们将看到类似以下的日志消息:Serving modern HTTP SSE server sink on port: 86XX
|
现在让我们使用curl连接到接收器:
$ curl localhost:86XX data: {"index":86,"level":"WARN","message":"login attempt"} data: {"index":87,"level":"ERROR","message":"user created"} data: {"index":88,"level":"INFO","message":"user created"} data: {"index":89,"level":"INFO","message":"login attempt"} data: {"index":90,"level":"INFO","message":"user created"} data: {"index":91,"level":"ERROR","message":"user created"} data: {"index":92,"level":"WARN","message":"login attempt"} data: {"index":93,"level":"INFO","message":"user created"} ...
|
配置接收器sink
到目前为止,我们已经使用内置接收器来收集处理过的数据。让我们看看是否可以通过提供自定义接收器为我们的场景增加更多的灵活性。
例如,如果我们想按消息过滤日志怎么办?
让我们创建一个实现Sink <LogEvent>接口的LogSink:
public class LogSink implements Sink<LogEvent> { @Override public void call(Context context, PortRequest portRequest, Observable<LogEvent> logEventObservable) { SelfDocumentingSink<LogEvent> sink = new ServerSentEventsSink.Builder<LogEvent>() .withEncoder(LogEvent::toJsonString) .withPredicate(filterByLogMessage()) .build(); logEventObservable.subscribe(); sink.call(context, portRequest, logEventObservable); } private Predicate<LogEvent> filterByLogMessage() { return new Predicate<>("filter by message", parameters -> { if (parameters != null && parameters.containsKey("filter")) { return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0)); } return logEvent -> true; }); } }
|
在此接收器实现中,我们配置了一个谓词,该谓词使用filter参数仅检索包含filter参数中设置的文本的日志:
$ curl localhost:8874?filter=login data: {"index":93,"level":"ERROR","message":"login attempt"} data: {"index":95,"level":"INFO","message":"login attempt"} data: {"index":97,"level":"ERROR","message":"login attempt"} ...
|
注意Mantis还提供了一种强大的查询语言MQL,可用于以SQL方式查询,转换和分析流数据。
阶段Stage链接
现在,让我们假设有兴趣知道在给定的时间间隔内有多少个ERROR,WARN或INFO日志条目。为此,我们将在工作中再增加两个阶段并将它们链接在一起。
首先,让我们创建一个GroupLogStage。
此阶段是ToGroupComputation实现,该实现从现有TransformLogStage接收LogEvent流数据。之后,它将按日志级别对条目进行分组并将其发送到下一个阶段:
public class GroupLogStage implements ToGroupComputation<LogEvent, String, LogEvent> { @Override public Observable<MantisGroup<String, LogEvent>> call(Context context, Observable<LogEvent> logEvent) { return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log)); } public static ScalarToGroup.Config<LogEvent, String, LogEvent> config(){ return new ScalarToGroup.Config<LogEvent, String, LogEvent>() .description("Group event data by level") .codec(JacksonCodecs.pojo(LogEvent.class)) .concurrentInput(); } }
|
我们还通过提供描述,用于序列化输出的编解码器,创建了一个自定义阶段配置,并允许通过使用 parallelInput()同时运行此阶段的调用方法。
需要注意的一件事是,此阶段是水平可伸缩的。意味着我们可以根据需要运行该阶段的多个实例。还值得一提的是,当在Mantis集群中部署时,此阶段会将数据发送到下一阶段,以便属于特定组的所有事件将落入下一阶段的同一工作人员。
在继续进行下一步之前,我们首先添加一个LogAggregate实体:
public class LogAggregate implements JsonType { private final Integer count; private final String level; }
|
现在,让我们创建链中的最后一个阶段。
此阶段实现GroupToScalarComputation并将日志组流转换为标量LogAggregate。通过计算每种类型的日志在流中出现多少次来完成此操作。此外,它还有一个LogAggregationDuration参数,可用于控制聚合窗口的大小:
public class CountLogStage implements GroupToScalarComputation<String, LogEvent, LogAggregate> { private int duration; @Override public void init(Context context) { duration = (int)context.getParameters().get("LogAggregationDuration", 1000); } @Override public Observable<LogAggregate> call(Context context, Observable<MantisGroup<String, LogEvent>> mantisGroup) { return mantisGroup .window(duration, TimeUnit.MILLISECONDS) .flatMap(o -> o.groupBy(MantisGroup::getKeyValue) .flatMap(group -> group.reduce(0, (count, value) -> count = count + 1) .map((count) -> new LogAggregate(count, group.getKey())) )); } public static GroupToScalar.Config<String, LogEvent, LogAggregate> config(){ return new GroupToScalar.Config<String, LogEvent, LogAggregate>() .description("sum events for a log level") .codec(JacksonCodecs.pojo(LogAggregate.class)) .withParameters(getParameters()); } public static List<ParameterDefinition<?>> getParameters() { List<ParameterDefinition<?>> params = new ArrayList<>(); params.add(new IntParameter() .name("LogAggregationDuration") .description("window size for aggregation in milliseconds") .validator(Validators.range(100, 10000)) .defaultValue(5000) .build()); return params; } }
|
public class LogAggregationJob extends MantisJobProvider<LogAggregate> { @Override public Job<LogAggregate> getJobInstance() { return MantisJob .source(new RandomLogSource()) .stage(new TransformLogStage(), TransformLogStage.stageConfig()) .stage(new GroupLogStage(), GroupLogStage.config()) .stage(new CountLogStage(), CountLogStage.config()) .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString))) .metadata(new Metadata.Builder().build()) .create(); } }
|
一旦运行该应用程序并执行我们的新作业,就可以看到每隔几秒钟检索一次的日志计数:
$ curl localhost:8133 data: {"count":3,"level":"ERROR"} data: {"count":13,"level":"INFO"} data: {"count":4,"level":"WARN"} data: {"count":8,"level":"ERROR"} data: {"count":5,"level":"INFO"} data: {"count":7,"level":"WARN"} ...
|
综上所述,在本文中,我们已经了解了Netflix Mantis是什么以及它的用途。此外,我们研究了主要概念,使用它们来构建作业,并探索了针对不同场景的自定义配置。
与往常一样,完整的代码可以在GitHub上找到