Hazelcast IMDG和Spark 2实现大数据项目 — tomask79


将HBase中的数据放入Hazelcast IMDG,从Spark以RDD方式访问,这是一个非常广泛使用的解决方案。
先决条件

  • Spring Boot演示应用程序(入门版1.5.9),其数据存储在Hazelcast IMap(hazelcast-app文件夹)中
  • 好用且老式的Hortonworks Sandbox 2.6.5(因为您安装了它并可以使用)
  • 打包成胖jar的Spark 2任务以RDD方式访问Hazelcast IMap(spark-hazelcast文件夹中的sbt项目)
  • 作为从Spark 2到Hazelcast的连接器,我将使用Greg Luck的用于Hazelcast的Spark连接器

Spring Boot演示应用程序
hazelcast-app的核心是LifecycleListener,它会在Hazelcast群集启动后立即使用数据初始化IMap。

package com.example;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleListener;

/**
 * @author tomask79
 */

public class NodeLifecycleListener implements LifecycleListener {

    private String hazelcastInstanceName;

   
/**
     * @param instanceName
     */

    public NodeLifecycleListener(final String instanceName) {
        this.hazelcastInstanceName = instanceName;
    }

    @Override
    public void stateChanged(LifecycleEvent event) {

        switch(event.getState()) {
            case STARTED: {
                System.out.println(
"Cluster is started, putting test data into distributed map!");
                preloadHZMapOnClusterStart();
                break;
            }
            default: {
                System.out.println(event.toString());
            }
        }
    }

    private void preloadHZMapOnClusterStart() {
        getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
                put(
"1234HZC", 100.0);
        getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
                put(
"5344HZC", 1500.0);
        getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
                put(
"7662HZC", 1300.0);
        getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
                put(
"8626HZC", 1400.0);
        getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
                put(
"7277HZC", 1500.0);
        getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP).
                put(
"6636HZC", 1500.0);
    }

    private HazelcastInstance getHazelcastInstance(final String name) {
        return Hazelcast.getHazelcastInstanceByName(name);
    }
}

不要错过Hazelcast的STARTED事件直接在注册监听器配置

 @Bean
    public Config config() {
        Config config = new Config();

        config.setInstanceName(HZArtifactAPI.HAZELCAST_INSTANCE);
        config.setProperty("hazelcast.wait.seconds.before.join","10");

        config.getGroupConfig().setName(
"mygroup");
        config.getGroupConfig().setPassword(
"mypassword");

        config.getNetworkConfig().setPortAutoIncrement(true);
        config.getNetworkConfig().setPort(10555);
        config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(true);

        config.addListenerConfig(
                new ListenerConfig( new NodeLifecycleListener(HZArtifactAPI.HAZELCAST_INSTANCE) ));

        SSLConfig sslConfig = new SSLConfig();
        sslConfig.setEnabled(false);
        config.getNetworkConfig().setSSLConfig(sslConfig);

        return config;
    }

好吧,现在让我们编译并运行Spring boot应用程序并测试Hazelcast IMap:

tomask79:hazelcast-app tomask79$ pwd
/Users/tomask79/workspace/spark-hazelcast-integration/hazelcast-app
tomask79:hazelcast-app tomask79$ mvn clean install
tomask79:hazelcast-app tomask79$ java -jar spring-microservice-service1/target/service1-0.0.1-SNAPSHOT.war

为了测试IMap是否正常工作以及是否公开了数据,我添加了简单的REST控制器,以便在另一个终端中运行:

tomask79:hazelcast-app tomask79$ curl http://localhost:8082/payments/7662HZC
1300.0
tomask79:hazelcast-app tomask79$ curl http:
//localhost:8082/payments/1234HZC
100.0

Spring Boot演示应用程序准备就绪!

Spark 2任务以RDD访问Hazelcast IMap
在将Spark Connector用于Hazelcast时,请确保按照先决条件中的说明使用Hazelcast 3.7.x或更高版本。由于使用3.6.4,有一段时间我无法将Spark 2任务与Hazelcast连接。作为演示,我将在Hazelcast IMap上获取RDD并在其上运行RDD collect

package com.example

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{Row, SparkSession}

import com.hazelcast.spark.connector.{toSparkContextFunctions}

object HazelcastConnectorTest {

  def runCode() = {
      val conf = new SparkConf()
          .set("hazelcast.server.addresses", "127.0.0.1:10555")
          .set(
"hazelcast.server.groupName", "mygroup")
          .set(
"hazelcast.server.groupPass", "mypassword")
          .set(
"hazelcast.spark.valueBatchingEnabled", "true")
          .set(
"hazelcast.spark.readBatchSize", "5000")
          .set(
"hazelcast.spark.writeBatchSize", "5000")

      val sc = new SparkContext(conf)

      val rddFromMap = sc.fromHazelcastMap(
"payments_map")

      rddFromMap.collect().foreach(println)
  }
}

IMap名称“ payments_map”等于已经提到的NodeLifecycleListener中的HZArtifactAPI.PAYMENTS_MAP常量。

测试一切
仓库有两个文件夹,一个是hazelcast-app,一个是maven项目,另一个是spark scala sbt项目:
构建hazelcast-app:

tomask79:hazelcast-app tomask79$ pwd
/Users/tomask79/workspace/spark-hazelcast-integration/hazelcast-app
tomask79:hazelcast-app tomask79$ mvn clean install


构建Spark sbt项目:

tomask79:spark-hazelcast tomask79$ pwd
/Users/tomask79/workspace/spark-hazelcast-integration/spark-hazelcast
tomask79:spark-hazelcast tomask79$ sbt assembly

现在,让我们启动Hortonworks沙箱,然后将Spark胖jar上传到其中:

tomask79:scala-2.11 tomask79$ pwd
/Users/tomask79/workspace/spark-hazelcast-integration/spark-hazelcast/target/scala-2.11
tomask79:scala-2.11 tomask79$ scp -P 2222 apache-spark-2-scala-starter-template-assembly-1.0.jar root@localhost:/root
root@localhost's password: 
apache-spark-2-scala-starter-template-assembly-1.0.jar        

现在,如果将VirtualBox用于Sandbox,并且要访问在外部运行的Hazelcast应用程序,则必须设置端口转发。我不想弄乱它,所以我也将hazelcast-app WAR文件上传到了沙箱中...由您决定。

启动hazelcast-app:

[root@sandbox-hdp ~]# java -jar service1-0.0.1-SNAPSHOT.war 

再次让我们在另一个终端中对其进行测试:

[root@sandbox-hdp ~]# curl http://localhost:8082/payments/7662HZC
1300.0 

看起来不错...

启动Spark 2任务以访问Hazelcast IMDG

[root@sandbox-hdp ~]# spark-submit --class com.example.Main --master yarn-client apache-spark-2-scala-starter-template-assembly-1.0.jar

并且在输出中,您应该看到在Spark 2中可见的IMap“ payments_map”的内容:

19/09/01 20:10:16 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/09/01 20:10:16 INFO DAGScheduler: ResultStage 0 (collect at HazelcastConnectorTest.scala:25) finished in 12.410 s
19/09/01 20:10:16 INFO DAGScheduler: Job 0 finished: collect at HazelcastConnectorTest.scala:25, took 12.682620 s
(7277HZC,1500.0)
(8626HZC,1400.0)
(5344HZC,1500.0)
(1234HZC,100.0)
(7662HZC,1300.0)
(6636HZC,1500.0)

Hazelcast的Spark连接器似乎正在工作。太好了,如果您在Hazelcast IMDG中有一些数据,并且需要在大数据Hadoop Warhouse处理中使用它们,那么我认为此连接器非常有用。

点击标题获取源码