Apache Spark是一个分布式计算平台,在当今非常流行,特别是因为与Hadoop mapreduce相比性能要好得多,Spark比基于磁盘的hadoop mapreduce 快了近100倍。让我们测试它并从头开始创建maven Apache Spark应用程序......
Apache Spark可以以三种模式运行:
我们将在这里测试第一个变体,其中包含经典的单词计数示例,用于保存在Hadoop HDFS文件系统中的文件
先决条件:
- 安装了Hadoop,我的Mac上有Hadoop 2.6.0。要进行安装,请按照此处的说明进行操作... 在Mac上安装Hadoop
- 将文本文件上传到HDFS。有关如何使用HDFS的命令,请参阅HDFS命令指南
- 安装了Apache Spark。Spark可以在预构建版本中下载,也可以手动构建。在Mac OS X中,安装Apache Spark的最简单方法是使用自制程序。我有Apache Spark 1.3.0,可以很好地与Hadoop 2.6.0配合使用。请参阅Spark独立模式
- git clone https://bitbucket.org/tomask79/apache-spark-maven.git并运行mvn clean install
准备和启动Apache Spark和Hadoop集群
在进行任何测试之前,我们需要以下内容:
- 我们需要运行Apache Spark master ...从文件夹/usr/local/Cellar/apache-spark/1.3.0/libexec/sbin运行以下命令:./start-master.sh
这将启动Apache Spark集群的主节点。Spark提供了出色的Web控制台来查看主节点属性...在浏览器中,点击以下URL:
http://localhost:8080/
为了能够在Apache Spark集群中运行任何我们的东西,我们需要在apache spark run以下命令的bin文件夹中添加至少一个worker来执行此操作:
spark-class org.apache.spark.deploy.worker.Worker spark://tomask79.local:7077
spark://tomask79.local:7077是我Mac上的主节点,你要根据localhost:8080中列出的主节点URL将其更改为你的主节点。
再次访问localhost:8080,您应该看到您的worker在employees表中列出...完美,Apache Spark集群已准备就绪!
我们要在HDFS文件系统上加载文件,所以我们需要启动Hadoop集群,从文件夹/usr/local/Cellar/hadoop/2.6.0/libexec/sbin 启动hadoop集群:./start-all.sh
将应用程序提交到Apache Cluster并检查代码
在将每个Apache Spark应用程序提交到Apache集群之前,需要对其进行正确配置,让我们检查一下配置代码:
SparkConf sparkConf = new SparkConf(); |
要查看Apache Spark应用程序属性的完整列表,请参阅以下链接:Spark配置
我们只是说使用setMaster方法设置主节点URL是必需的。其他选项是可选的。但是控制Apache Spark节点应用程序的资源量是很好的。否则你可能会遇到新奇的Apache Spark问题:
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory |
第二个非常重要的配置是以下代码:
JavaSparkContext ctx = new JavaSparkContext(sparkConf); |
使用此代码,您将告诉Apache Spark驱动程序,他应该将jar带到哪个节点以运行代码计算(banq注:计算代码送到数据附近,云计算是数据送到计算代码附近)。否则你最终得到:
15/11/22 12:03:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.112): java.lang.ClassNotFoundException: com.myspark.test.spark.JavaWordCount$1 |
其余的代码是执行mapreduce字数统计逻辑的简单RDD命令,没什么大不了的,要理解它,请访问以下页面:Hadoop wordcount
现在您应该能够将JavaWordCount应用程序提交到Apache Spark集群,如果一切顺利,从IDE启动JavaWordCount类后,您应该看到输出。
关于maven依赖关系的建议
首先,请确保使用与安装的Apache Spark相同版本的spark_core依赖项。例如,我安装了Apache Spark 1.3.0,因此我的示例在pom.xml中使用
<dependency> |
我有与hadoop 2.6.0客户端的servlet API兼容性问题,所以我使用2.2.0 hadoop客户端,它可以与Apache Spark 1.3.0核心一起使用:
<dependency> |
用Apache Spark编程
我们将尝试使用JavaRDD API解决简单任务。
Apache Spark JavaRDD和任务解决:如果你不知道map-reduce概念那么你就无法理解Apache Spark。为了证明这一点,让我们解决简单的任务。假设我们有以下带有温度的城市文本文件:
Prague 35 |
位于Apache Hadoop HDFS文件系统,我们需要编写简单的JavaRDD Apache Spark程序来打印具有温度低于整个平均温度的城市。
JavaRDD API和MapReduce有区别吗?
要解决前面提到的任务,我们需要将问题分成以下几部分:
- 首先,我们需要编写JavaRDD程序来计算温度和平均值的总和。
- 然后我们要打印温度低于计算平均值的行。
MapReduce解决方案概念:
如果我们使用Spring Data for Hadoop或为map-reduce程序指定的简单Apache Hadoop API,那么我们的解决方案将是:
- Map函数将创建键[K,V] ='reducer',town.temperature
- Reduce功能将接收先前的键并将整个组的温度相加并计算平均温度。
- 链式map-reduce减少任务将打印温度低于平均值的城镇的结果。
Apache Spark JavaRDD解决方案:
- 首先,我们需要通过将map函数应用于输入RDD集来获取所有行的JavaRDD温度集:
JavaRDD<String> parsedTemperatures = lines.map(new Function<String, String>() { |
- 然后我们需要将此RDD集转换为CONSTANT.row.temperature表单以将数据准备到reducer中:
JavaPairRDD<String, Integer> forGroup = parsedTemperatures.mapToPair( |
- 有了这个数据集,我们就为reducer准备了数据,它将聚合所有温度
JavaPairRDD<String, Integer> counts = forGroup.reduceByKey( |
(很像map-reduce概念)
要了解Spark减速器的工作原理,请查看日志:
Reading temperature [35] from Prague 35 |
Spark实际上并行运行前三个函数map,mapToPair和reduceByKey!DAG图形分析器组合Spark任务的好处之一!
解决方案的第二部分是打印温度低于平均温度的所有城镇:
Tuple2<String, Integer> sumTemperatures = counts.first(); |
让我们解释一下这段代码:
- 通过counts.first()我们从reducer中读取所有温度的总和
- 我们使用count函数来获取JavaRDD输入集中所有行的计数。
- 我们使用JavaRDD过滤功能来过滤掉温度高于平均值的城镇。
- 我们使用JavaRDD collect函数来打印结果。
如果你运行这个程序,你应该得到如下结果:
16/03/03 21:02:26 INFO DAGScheduler: Job 1 finished: count at AvgTemperatureAnalyzer.java:85, took 0,094561 s |
结论
从我的观点来看,Apache Spark比map-reduce编程更加友好,即使概念是相同的。我打赌你明白我们需要通过JavaRDD输入进行多次迭代,但是使用map-reduce你需要弄清楚如何将前一个map reduce任务的结果传递给下一个,Apache Spark一个输入迭代以新的RDD设置,您可以在其中应用其他功能,从主节点驱动的所有内容......这不是很酷吗?
让我们看看如何使用Apache Spark Shared Variables使代码看起来更干净。
Apache Spark中的共享变量
通常在编写像map这样的Spark动作时,驱动程序中的任何传递的变量都会转换为远程工作人员的本地副本。并且远程工作人员对它们的任何更改都不会传播回驱动程序。为了克服这个事实,Apache Spark提供了共享变量的概念。我们有两种类型:
1. 广播变量
当您需要跨多个Spark操作的相同数据时,广播变量非常有用。通过调用SparkContext.broadcast(v),“v”数据然后仅以序列化形式发送给远程工作者,这加快了整个过程。
2.Acumulators累加器
变量只能是我们“添加”的东西,因此它们完全可用于并行处理。重要的是要记住,远程工作人员无法读取其值,只能添加它们。最终,只有驱动程序可以使用其值进行操作。
使用共享变量的演示
Acumulators累加器是解决我们所有行的所有温度总和问题的完美之选。没有必要使用map-reduce概念,就像我在之前的Spark编程部分中所做的那样,我们将简单地使用RDD forEach函数将解析后的温度发送到累加器。
final Accumulator<Integer> tempSum = ctx.accumulator(0); |
然后计算平均温度如下:
final Integer sum = tempSum.value(); |
如果您以通常的方式编译并运行此repo,您应该看到以下结果,就像之前的情况一样:
Average temperature 27.0 |
点击标题见原文