将HBase中的数据放入Hazelcast IMDG,从Spark以RDD方式访问,这是一个非常广泛使用的解决方案。
先决条件
- Spring Boot演示应用程序(入门版1.5.9),其数据存储在Hazelcast IMap(hazelcast-app文件夹)中
- 好用且老式的Hortonworks Sandbox 2.6.5(因为您安装了它并可以使用)
- 打包成胖jar的Spark 2任务以RDD方式访问Hazelcast IMap(spark-hazelcast文件夹中的sbt项目)
- 作为从Spark 2到Hazelcast的连接器,我将使用Greg Luck的用于Hazelcast的Spark连接器
Spring Boot演示应用程序
hazelcast-app的核心是LifecycleListener,它会在Hazelcast群集启动后立即使用数据初始化IMap。
package com.example;
import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.LifecycleEvent; import com.hazelcast.core.LifecycleListener;
/** * @author tomask79 */ public class NodeLifecycleListener implements LifecycleListener {
private String hazelcastInstanceName;
/** * @param instanceName */ public NodeLifecycleListener(final String instanceName) { this.hazelcastInstanceName = instanceName; }
@Override public void stateChanged(LifecycleEvent event) {
switch(event.getState()) { case STARTED: { System.out.println("Cluster is started, putting test data into distributed map!"); preloadHZMapOnClusterStart(); break; } default: { System.out.println(event.toString()); } } }
private void preloadHZMapOnClusterStart() { getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP). put("1234HZC", 100.0); getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP). put("5344HZC", 1500.0); getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP). put("7662HZC", 1300.0); getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP). put("8626HZC", 1400.0); getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP). put("7277HZC", 1500.0); getHazelcastInstance(hazelcastInstanceName).getMap(HZArtifactAPI.PAYMENTS_MAP). put("6636HZC", 1500.0); }
private HazelcastInstance getHazelcastInstance(final String name) { return Hazelcast.getHazelcastInstanceByName(name); } }
|
不要错过Hazelcast的STARTED事件直接在注册监听器配置:
@Bean public Config config() { Config config = new Config();
config.setInstanceName(HZArtifactAPI.HAZELCAST_INSTANCE); config.setProperty("hazelcast.wait.seconds.before.join","10");
config.getGroupConfig().setName("mygroup"); config.getGroupConfig().setPassword("mypassword");
config.getNetworkConfig().setPortAutoIncrement(true); config.getNetworkConfig().setPort(10555); config.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(true);
config.addListenerConfig( new ListenerConfig( new NodeLifecycleListener(HZArtifactAPI.HAZELCAST_INSTANCE) ));
SSLConfig sslConfig = new SSLConfig(); sslConfig.setEnabled(false); config.getNetworkConfig().setSSLConfig(sslConfig);
return config; }
|
好吧,现在让我们编译并运行Spring boot应用程序并测试Hazelcast IMap:
tomask79:hazelcast-app tomask79$ pwd /Users/tomask79/workspace/spark-hazelcast-integration/hazelcast-app tomask79:hazelcast-app tomask79$ mvn clean install tomask79:hazelcast-app tomask79$ java -jar spring-microservice-service1/target/service1-0.0.1-SNAPSHOT.war
|
为了测试IMap是否正常工作以及是否公开了数据,我添加了简单的REST控制器,以便在另一个终端中运行:
tomask79:hazelcast-app tomask79$ curl http://localhost:8082/payments/7662HZC 1300.0 tomask79:hazelcast-app tomask79$ curl http://localhost:8082/payments/1234HZC 100.0
|
Spring Boot演示应用程序准备就绪!Spark 2任务以RDD访问Hazelcast IMap
在将Spark Connector用于Hazelcast时,请确保按照先决条件中的说明使用Hazelcast 3.7.x或更高版本。由于使用3.6.4,有一段时间我无法将Spark 2任务与Hazelcast连接。作为演示,我将在Hazelcast IMap上获取RDD并在其上运行RDD collect
package com.example
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.{Row, SparkSession}
import com.hazelcast.spark.connector.{toSparkContextFunctions}
object HazelcastConnectorTest {
def runCode() = { val conf = new SparkConf() .set("hazelcast.server.addresses", "127.0.0.1:10555") .set("hazelcast.server.groupName", "mygroup") .set("hazelcast.server.groupPass", "mypassword") .set("hazelcast.spark.valueBatchingEnabled", "true") .set("hazelcast.spark.readBatchSize", "5000") .set("hazelcast.spark.writeBatchSize", "5000")
val sc = new SparkContext(conf)
val rddFromMap = sc.fromHazelcastMap("payments_map")
rddFromMap.collect().foreach(println) } }
|
IMap名称“ payments_map”等于已经提到的NodeLifecycleListener中的HZArtifactAPI.PAYMENTS_MAP常量。测试一切
仓库有两个文件夹,一个是hazelcast-app,一个是maven项目,另一个是spark scala sbt项目:
构建hazelcast-app:
tomask79:hazelcast-app tomask79$ pwd /Users/tomask79/workspace/spark-hazelcast-integration/hazelcast-app tomask79:hazelcast-app tomask79$ mvn clean install
|
构建Spark sbt项目:
tomask79:spark-hazelcast tomask79$ pwd /Users/tomask79/workspace/spark-hazelcast-integration/spark-hazelcast tomask79:spark-hazelcast tomask79$ sbt assembly
|
现在,让我们启动Hortonworks沙箱,然后将Spark胖jar上传到其中:
tomask79:scala-2.11 tomask79$ pwd /Users/tomask79/workspace/spark-hazelcast-integration/spark-hazelcast/target/scala-2.11 tomask79:scala-2.11 tomask79$ scp -P 2222 apache-spark-2-scala-starter-template-assembly-1.0.jar root@localhost:/root root@localhost's password: apache-spark-2-scala-starter-template-assembly-1.0.jar
|
现在,如果将VirtualBox用于Sandbox,并且要访问在外部运行的Hazelcast应用程序,则必须设置端口转发。我不想弄乱它,所以我也将hazelcast-app WAR文件上传到了沙箱中...由您决定。
启动hazelcast-app:
[root@sandbox-hdp ~]# java -jar service1-0.0.1-SNAPSHOT.war
|
再次让我们在另一个终端中对其进行测试:
[root@sandbox-hdp ~]# curl http://localhost:8082/payments/7662HZC 1300.0
|
看起来不错...
启动Spark 2任务以访问Hazelcast IMDG
[root@sandbox-hdp ~]# spark-submit --class com.example.Main --master yarn-client apache-spark-2-scala-starter-template-assembly-1.0.jar
|
并且在输出中,您应该看到在Spark 2中可见的IMap“ payments_map”的内容:
19/09/01 20:10:16 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 19/09/01 20:10:16 INFO DAGScheduler: ResultStage 0 (collect at HazelcastConnectorTest.scala:25) finished in 12.410 s 19/09/01 20:10:16 INFO DAGScheduler: Job 0 finished: collect at HazelcastConnectorTest.scala:25, took 12.682620 s (7277HZC,1500.0) (8626HZC,1400.0) (5344HZC,1500.0) (1234HZC,100.0) (7662HZC,1300.0) (6636HZC,1500.0)
|
Hazelcast的Spark连接器似乎正在工作。太好了,如果您在Hazelcast IMDG中有一些数据,并且需要在大数据Hadoop Warhouse处理中使用它们,那么我认为此连接器非常有用。
点击标题获取源码