Hazelcast JET在Spring Boot上运行

19-01-10 banq
              

Hazelcast JET目前是分布式计算框架领域的新成员。根据Hazelcast团队的说法,他们甚至比Apache SparkApache Flink更快。查看基准。让我们看看如何使用Hazelcast JET和Spring Boot,当然我会像往常一样构建一个简单的演示。

Hazelcast JET和Spring Boot依赖项设置:

<dependency>
    <groupId>com.hazelcast.jet</groupId>
    <artifactId>hazelcast-jet</artifactId>
    <version>0.3.2-SNAPSHOT</version>
</dependency>

并且让我们尝试创建一个Hazelcast JET实例并最终将其关闭,以便验证我们是否能够正确启动Hazelcast JET:

try {
        JetInstance instance = Jet.newJetInstance();
 } finally {
        Jet.shutdownAll();
 }

不幸的是,Spring Boot应用程序内部的Hazelcast JET实例创建将崩溃:

java.lang.NoSuchFieldError: JET
    at com.hazelcast.jet.impl.config.XmlJetConfigBuilder.getXmlType(XmlJetConfigBuilder.java:128) ~[hazelcast-jet-0.3.2-SNAPSHOT.jar!/:0.3.2-SNAPSHOT]
    at com.hazelcast.config.AbstractXmlConfigHelper.getNamespaceType(AbstractXmlConfigHelper.java:136) ~[hazelcast-3.7.6.jar!/:3.7.6]
    at com.hazelcast.config.AbstractXmlConfigHelper.<init>(AbstractXmlConfigHelper.java:72) ~[hazelcast-3.7.6.jar!/:3.7.6]
    at com.hazelcast.config.AbstractConfigBuilder.<init>(AbstractConfigBuilder.java:59) ~[hazelcast-3.7.6.jar!/:3.7.6]
    at com.hazelcast.jet.impl.config.XmlJetConfigBuilder.<init>(XmlJetConfigBuilder.java:67) ~[hazelcast-jet-0.3.2-SNAPSHOT.jar!/:0.3.2-SNAPSHOT]
    at com.hazelcast.jet.impl.config.XmlJetConfigBuilder.getConfig(XmlJetConfigBuilder.java:80) ~[hazelcast-jet-0.3.2-SNAPSHOT.jar!/:0.3.2-SNAPSHOT]

好吧,不要责怪Hazelcast JET团队,只是Spring Boot已经内置了Hazelcast IMDG(Spring Boot 1.5.3版本为3.7.6),其中这个版本包含了hazelcast ConfigType枚举,但没有包含JET字段。就这样。解决此问题的方法是在您的pom中升级Hazelcast IMDG。对我有用的是将hazelcast.version属性设置为:

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <hazelcast.version>3.8</hazelcast.version>
</properties>

在这个解决方法之后,一切都应该工作正常,你应该能够看到他们漂亮的ascii艺术:-)

Hazelcast JET架构

在进入JET演示之前,让我们描述一下它的架构。Hazelcast JET基于将计算工作组合到DAG图中。这不是革命性的东西,因为Apache Spark也基于DAG工作。但是让我们更深入地了解Hazelcast JET DAG术语::

  • Vertex 是JET作业的一个工作单元。把它作为计算工作的一步。一个Vertex 包含一个或多个您实现的处理器(类AbstractProcessor)来执行您想要的逻辑。每个Vertex 可以有更多的处理器。Number基于本地和全局并行设置。你可以有SINK Vertex (只有输入,不发射任何东西),INTERNAL Vertex (有输入并向其序数发射输出)和SOURCE Vertex (仅发射)。
  • DAG是Vertex 的边缘连接器。每次创建JET作业时,都必须将处理器实现与Edges连接。

为了向您展示一个示例,我们可以构建一个简单的JET Job,它将RabbitMQ代理中的数据注入到分布式Hazelcast Map中。我们的工作将包含两个Vertex ,第一个Vertex 将发出来自RabbitMQ的项目,第二个Vertex 将是简单的Sink进入Map。

private static Job createJetJob(JetInstance instance) {
        DAG dag = new DAG();
        Properties props = props(
                "server", "localhost",
                "user", "guest",
                "password", "guest");
        Vertex source = dag.newVertex("source", readRabbitMQ(props, "jetInputQueue"));
        Vertex sink = dag.newVertex("sink", writeMap("sink"));
        dag.edge(between(source, sink));
        return instance.newJob(dag);
    }

详细代码

方法readRabbitMQ返回一个JET Processor实现,用于从RabbitMQ读取消息。该实现的灵感来自官方的ReadKafkaP处理器。我刚刚以轮询方式重写了连接器以与RabbitMQ代理进行通信。读取部分的消息在AbstractProcessor.complete方法中。要了解这种方法,您需要了解两件事:

  • 重复调用方法,直到返回true。由于我的作业中的第一个Vertex 是Source,因此我的ReadRabbitMQP.complete方法始终返回false。
  • 来自RabbitMQ代理的轮询消息需要通过Traverser接口返回。另请参阅Traversers util类以获得方便的方法。

轮询RabbitMQ消息的完整方法:

@Override
    public boolean complete() {
        System.out.println("....Invoking RabbitMQ vertex processor complete...");
        if (emitCooperatively(traverser)) {
            final Message message =
                    this.rabbitTemplate.receive(this.queueNames[0]);
            if (message != null) {
                System.out.println("Message payload: " + new String(message.getBody()));
                final List<Message> list = new ArrayList<>();
                list.add(message);
                Random rn = new Random();
                traverser = traverseStream(list.stream()).map(r ->
                        entry(String.valueOf(rn.nextInt()), new String(r.getBody()))
                );
            }
        }
        return false;
    }

Hazelcast JET和工作分配到集群

将来自RabbitMQ代理的流式传输写入Hazelcast JET时,接收器Vertex 中的每个处理器都将成为消费者,从而完成相同的工作。

来自AMPQ队列的有效负载将在JET处理器之间进行负载平衡。无论如何,为特定Vertex 创建处理器的方式是ProcessorMetaSupplierProcessorSupplier实现的工作。

简而言之,JET使用ProcessorMetaSupplier实现为DAG图中的每个Vertex 获取ProcessorSupplier。然后将ProcessorSupplier发送到Vertex ,根据并行度设置创建处理器。我强烈建议您阅读JET文档中的NumberGenerator示例,这有助于了解JET如何创建处理器。

在RabbitMQ流媒体的情况下,我再说一遍,每个处理器都将做同样的工作,因此:

private static final class MetaSupplier<K, V> implements ProcessorMetaSupplier {

        static final long serialVersionUID = 1L;
        private final String[] queueNames;
        private Properties properties;

        private MetaSupplier(String[] queueNames, Properties properties) {
            this.queueNames = queueNames;
            this.properties = properties;
        }

        @Override
        public Function<Address, ProcessorSupplier> get(List<Address> addresses) {
            return address -> new Supplier<>(queueNames, properties);
        }
    }

    private static class Supplier<K, V> implements ProcessorSupplier {

        static final long serialVersionUID = 1L;

        private final String[] queueNames;
        private final Properties properties;
        private transient List<Processor> processors;

        Supplier(String[] topicIds, Properties properties) {
            this.properties = properties;
            this.queueNames = topicIds;
        }

        @Override
        public List<Processor> get(int count) {
            return processors = range(0, count)
                    .mapToObj(i -> new ReadRabbitMQP<>(queueNames, properties))
                    .collect(toList());
        }

        @Override
        public void complete(Throwable error) {
            processors.stream()
                    .filter(p -> p instanceof ReadRabbitMQP)
                    .map(p -> (ReadRabbitMQP) p)
                    .forEach(p -> Util.uncheckRun(p::close));
        }
    }

测试此演示

  • git clone https://bitbucket.org/tomask79/spring-hazelcast-jet-streaming.git
  • mvn clean install
  • 在RabbitMQ中创建一个名为“jetInputQueue”的队列
  • java -jar target / demo-0.0.1-SNAPSHOT.jar

现在将一些消息发送到“jetInputQueue”输入4条以后输出:

Received 4 entries in 78080 milliseconds.
....Invoking RabbitMQ vertex processor complete...
....Invoking RabbitMQ vertex processor complete...
Received 4 entries in 78182 milliseconds.
....Invoking RabbitMQ vertex processor complete...
....Invoking RabbitMQ vertex processor complete...
Received 4 entries in 78283 milliseconds.
....Invoking RabbitMQ vertex processor complete...
....Invoking RabbitMQ vertex processor complete...
Received 4 entries in 78385 milliseconds.
....Invoking RabbitMQ vertex processor complete...
....Invoking RabbitMQ vertex processor complete...
Received 4 entries in 78486 milliseconds.
....Invoking RabbitMQ vertex processor complete..

我对Hazelcast JET的看法

优点:

  • 令人难以置信的性能。与Apache Spark相比,甚至更好一点。
  • 非常编码友好的DAG API。仅仅两天后,我就能够编写JET DAG作业。

缺点:

  • 更好的容错能力。目前,如果我的RabbitMQ JET消费者死亡,那么整个工作将因数据丢失而中止.. :(
  • 我想看看Apache Hive对Hazelcast JET的支持。就像编写SQL样式语句一样,Hive会为我生成JET DAG,就像它与Apache Spark一样。

无论如何JET引起了我的注意,我期待着下一个版本。

点击标题看原文