Apache Spark是一个分布式计算平台,在当今非常流行,特别是因为与Hadoop mapreduce相比性能要好得多,Spark比基于磁盘的hadoop mapreduce 快了近100倍。让我们测试它并从头开始创建maven Apache Spark应用程序......
Apache Spark可以以三种模式运行:
我们将在这里测试第一个变体,其中包含经典的单词计数示例,用于保存在Hadoop HDFS文件系统中的文件
先决条件:
- 安装了Hadoop,我的Mac上有Hadoop 2.6.0。要进行安装,请按照此处的说明进行操作... 在Mac上安装Hadoop
- 将文本文件上传到HDFS。有关如何使用HDFS的命令,请参阅HDFS命令指南
- 安装了Apache Spark。Spark可以在预构建版本中下载,也可以手动构建。在Mac OS X中,安装Apache Spark的最简单方法是使用自制程序。我有Apache Spark 1.3.0,可以很好地与Hadoop 2.6.0配合使用。请参阅Spark独立模式
- git clone https://bitbucket.org/tomask79/apache-spark-maven.git并运行mvn clean install
准备和启动Apache Spark和Hadoop集群
在进行任何测试之前,我们需要以下内容:
- 我们需要运行Apache Spark master ...从文件夹/usr/local/Cellar/apache-spark/1.3.0/libexec/sbin运行以下命令:./start-master.sh
这将启动Apache Spark集群的主节点。Spark提供了出色的Web控制台来查看主节点属性...在浏览器中,点击以下URL:
http://localhost:8080/
为了能够在Apache Spark集群中运行任何我们的东西,我们需要在apache spark run以下命令的bin文件夹中添加至少一个worker来执行此操作:
spark-class org.apache.spark.deploy.worker.Worker spark://tomask79.local:7077
spark://tomask79.local:7077是我Mac上的主节点,你要根据localhost:8080中列出的主节点URL将其更改为你的主节点。
再次访问localhost:8080,您应该看到您的worker在employees表中列出...完美,Apache Spark集群已准备就绪!
我们要在HDFS文件系统上加载文件,所以我们需要启动Hadoop集群,从文件夹/usr/local/Cellar/hadoop/2.6.0/libexec/sbin 启动hadoop集群:./start-all.sh
将应用程序提交到Apache Cluster并检查代码
在将每个Apache Spark应用程序提交到Apache集群之前,需要对其进行正确配置,让我们检查一下配置代码:
SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.cores.max", "1"); sparkConf.set("spark.executor.memory", "2048M"); sparkConf.set("spark.driver.memory", "1024M"); sparkConf.setAppName("JavaWordCount"); sparkConf.setMaster("spark://tomask79.local:7077"); |
要查看Apache Spark应用程序属性的完整列表,请参阅以下链接:Spark配置
我们只是说使用setMaster方法设置主节点URL是必需的。其他选项是可选的。但是控制Apache Spark节点应用程序的资源量是很好的。否则你可能会遇到新奇的Apache Spark问题:
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory |
第二个非常重要的配置是以下代码:
JavaSparkContext ctx = new JavaSparkContext(sparkConf); ctx.addJar("/Users/tomask79/Documents/workspace/apache-spark/spark/target/spark-0.0.1-SNAPSHOT.jar"); |
使用此代码,您将告诉Apache Spark驱动程序,他应该将jar带到哪个节点以运行代码计算(banq注:计算代码送到数据附近,云计算是数据送到计算代码附近)。否则你最终得到:
15/11/22 12:03:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.112): java.lang.ClassNotFoundException: com.myspark.test.spark.JavaWordCount$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) |
其余的代码是执行mapreduce字数统计逻辑的简单RDD命令,没什么大不了的,要理解它,请访问以下页面:Hadoop wordcount
现在您应该能够将JavaWordCount应用程序提交到Apache Spark集群,如果一切顺利,从IDE启动JavaWordCount类后,您应该看到输出。
关于maven依赖关系的建议
首先,请确保使用与安装的Apache Spark相同版本的spark_core依赖项。例如,我安装了Apache Spark 1.3.0,因此我的示例在pom.xml中使用
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.0</version> </dependency> |
我有与hadoop 2.6.0客户端的servlet API兼容性问题,所以我使用2.2.0 hadoop客户端,它可以与Apache Spark 1.3.0核心一起使用:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.2.0</version> </dependency> |
用Apache Spark编程
我们将尝试使用JavaRDD API解决简单任务。
Apache Spark JavaRDD和任务解决:如果你不知道map-reduce概念那么你就无法理解Apache Spark。为了证明这一点,让我们解决简单的任务。假设我们有以下带有温度的城市文本文件:
Prague 35 Madrid 40 Berlin 20 Paris 15 Rome 25 |
位于Apache Hadoop HDFS文件系统,我们需要编写简单的JavaRDD Apache Spark程序来打印具有温度低于整个平均温度的城市。
JavaRDD API和MapReduce有区别吗?
要解决前面提到的任务,我们需要将问题分成以下几部分:
- 首先,我们需要编写JavaRDD程序来计算温度和平均值的总和。
- 然后我们要打印温度低于计算平均值的行。
MapReduce解决方案概念:
如果我们使用Spring Data for Hadoop或为map-reduce程序指定的简单Apache Hadoop API,那么我们的解决方案将是:
- Map函数将创建键[K,V] ='reducer',town.temperature
- Reduce功能将接收先前的键并将整个组的温度相加并计算平均温度。
- 链式map-reduce减少任务将打印温度低于平均值的城镇的结果。
Apache Spark JavaRDD解决方案:
- 首先,我们需要通过将map函数应用于输入RDD集来获取所有行的JavaRDD温度集:
JavaRDD<String> parsedTemperatures = lines.map(new Function<String, String>() { private static final long serialVersionUID = 1L; public String call(String v1) throws Exception { final String arr[] = SPACE.split(v1); System.out.println("Reading temperature ["+arr[1]+"] from "+v1); return arr[1]; } }); |
- 然后我们需要将此RDD集转换为CONSTANT.row.temperature表单以将数据准备到reducer中:
JavaPairRDD<String, Integer> forGroup = parsedTemperatures.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String t) throws Exception { return new Tuple2<String, Integer>("reducer", Integer.parseInt(t)); } }); |
- 有了这个数据集,我们就为reducer准备了数据,它将聚合所有温度
JavaPairRDD<String, Integer> counts = forGroup.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("Agregatting "+v1+" plus "+v2); return v1 + v2; } }); |
(很像map-reduce概念)
要了解Spark减速器的工作原理,请查看日志:
Reading temperature [35] from Prague 35 Reading temperature [40] from Madrid 40 Agregatting 35 plus 40 Reading temperature [20] from Berlin 20 Agregatting 75 plus 20 Reading temperature [15] from Paris 15 Agregatting 95 plus 15 Reading temperature [25] from Rome 25 Agregatting 110 plus 25 |
Spark实际上并行运行前三个函数map,mapToPair和reduceByKey!DAG图形分析器组合Spark任务的好处之一!
解决方案的第二部分是打印温度低于平均温度的所有城镇:
Tuple2<String, Integer> sumTemperatures = counts.first(); final Integer sum = sumTemperatures._2; final long count = parsedTemperatures.count(); final double avg = (double) sum / count; System.out.println("Average temperature "+avg); JavaRDD<String> result = lines.filter(new Function<String, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(String v1) throws Exception { final String arr[] = SPACE.split(v1); long temperature = Long.parseLong(arr[1]); return temperature <= avg; } }); List<String> resultList = result.collect(); for (String item: resultList) { System.out.println("Result item: "+item); } |
让我们解释一下这段代码:
- 通过counts.first()我们从reducer中读取所有温度的总和
- 我们使用count函数来获取JavaRDD输入集中所有行的计数。
- 我们使用JavaRDD过滤功能来过滤掉温度高于平均值的城镇。
- 我们使用JavaRDD collect函数来打印结果。
如果你运行这个程序,你应该得到如下结果:
16/03/03 21:02:26 INFO DAGScheduler: Job 1 finished: count at AvgTemperatureAnalyzer.java:85, took 0,094561 s Average temperature 27.0 . . Result item: Berlin 20 Result item: Paris 15 Result item: Rome 25 16/03/03 21:02:26 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null} |
结论
从我的观点来看,Apache Spark比map-reduce编程更加友好,即使概念是相同的。我打赌你明白我们需要通过JavaRDD输入进行多次迭代,但是使用map-reduce你需要弄清楚如何将前一个map reduce任务的结果传递给下一个,Apache Spark一个输入迭代以新的RDD设置,您可以在其中应用其他功能,从主节点驱动的所有内容......这不是很酷吗?
代码
让我们看看如何使用Apache Spark Shared Variables使代码看起来更干净。
Apache Spark中的共享变量
通常在编写像map这样的Spark动作时,驱动程序中的任何传递的变量都会转换为远程工作人员的本地副本。并且远程工作人员对它们的任何更改都不会传播回驱动程序。为了克服这个事实,Apache Spark提供了共享变量的概念。我们有两种类型:
1. 广播变量
当您需要跨多个Spark操作的相同数据时,广播变量非常有用。通过调用SparkContext.broadcast(v),“v”数据然后仅以序列化形式发送给远程工作者,这加快了整个过程。
2.Acumulators累加器
变量只能是我们“添加”的东西,因此它们完全可用于并行处理。重要的是要记住,远程工作人员无法读取其值,只能添加它们。最终,只有驱动程序可以使用其值进行操作。
使用共享变量的演示
Acumulators累加器是解决我们所有行的所有温度总和问题的完美之选。没有必要使用map-reduce概念,就像我在之前的Spark编程部分中所做的那样,我们将简单地使用RDD forEach函数将解析后的温度发送到累加器。
final Accumulator<Integer> tempSum = ctx.accumulator(0); lines.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; public void call(String t) throws Exception { tempSum.add(Integer.valueOf(SPACE.split(t)[1])); } }); |
然后计算平均温度如下:
final Integer sum = tempSum.value(); final long count = lines.count(); final double avg = (double) sum / count; |
如果您以通常的方式编译并运行此repo,您应该看到以下结果,就像之前的情况一样:
Average temperature 27.0 . . Result item: Berlin 20 Result item: Paris 15 Result item: Rome 25 |
点击标题见原文
猜你喜欢