Apache Spark编程教程


Apache Spark是一个分布式计算平台,在当今非常流行,特别是因为与Hadoop mapreduce相比性能要好得多,Spark比基于磁盘的hadoop mapreduce 快了近100倍。让我们测试它并从头开始创建maven Apache Spark应用程序......
Apache Spark可以以三种模式运行:

  • Apache Spark应用程序访问文件系统(本地文件系统,HDFS)。
  • Apache Spark应用程序访问HBase等分布式系统。
  • Yarn上的 Apache Spark应用程序。

我们将在这里测试第一个变体,其中包含经典的单词计数示例,用于保存在Hadoop HDFS文件系统中的文件

先决条件:

  • 安装了Hadoop,我的Mac上有Hadoop 2.6.0。要进行安装,请按照此处的说明进行操作... 在Mac上安装Hadoop
  • 将文本文件上传到HDFS。有关如何使用HDFS的命令,请参阅HDFS命令指南
  • 安装了Apache Spark。Spark可以在预构建版本中下载,也可以手动构建。在Mac OS X中,安装Apache Spark的最简单方法是使用自制程序。我有Apache Spark 1.3.0,可以很好地与Hadoop 2.6.0配合使用。请参阅Spark独立模式
  • git clone https://bitbucket.org/tomask79/apache-spark-maven.git并运行mvn clean install

准备和启动Apache Spark和Hadoop集群

在进行任何测试之前,我们需要以下内容:

  • 我们需要运行Apache Spark master ...从文件夹/usr/local/Cellar/apache-spark/1.3.0/libexec/sbin运行以下命令:./start-master.sh

这将启动Apache Spark集群的主节点。Spark提供了出色的Web控制台来查看主节点属性...在浏览器中,点击以下URL:
http://localhost:8080/

为了能够在Apache Spark集群中运行任何我们的东西,我们需要在apache spark run以下命令的bin文件夹中添加至少一个worker来执行此操作:
spark-class org.apache.spark.deploy.worker.Worker spark://tomask79.local:7077

spark://tomask79.local:7077是我Mac上的主节点,你要根据localhost:8080中列出的主节点URL将其更改为你的主节点。
再次访问localhost:8080,您应该看到您的worker在employees表中列出...完美,Apache Spark集群已准备就绪!

我们要在HDFS文件系统上加载文件,所以我们需要启动Hadoop集群,从文件夹/usr/local/Cellar/hadoop/2.6.0/libexec/sbin 启动hadoop集群:./start-all.sh

将应用程序提交到Apache Cluster并检查代码
在将每个Apache Spark应用程序提交到Apache集群之前,需要对其进行正确配置,让我们检查一下配置代码:

SparkConf sparkConf = new SparkConf();
    sparkConf.set("spark.cores.max", "1");
    sparkConf.set(
"spark.executor.memory", "2048M");
    sparkConf.set(
"spark.driver.memory", "1024M");
    sparkConf.setAppName(
"JavaWordCount");
    sparkConf.setMaster(
"spark://tomask79.local:7077");

要查看Apache Spark应用程序属性的完整列表,请参阅以下链接:Spark配置
我们只是说使用setMaster方法设置主节点URL是必需的。其他选项是可选的。但是控制Apache Spark节点应用程序的资源量是很好的。否则你可能会遇到新奇的Apache Spark问题:

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

第二个非常重要的配置是以下代码:

JavaSparkContext ctx = new JavaSparkContext(sparkConf);
ctx.addJar("/Users/tomask79/Documents/workspace/apache-spark/spark/target/spark-0.0.1-SNAPSHOT.jar");

使用此代码,您将告诉Apache Spark驱动程序,他应该将jar带到哪个节点以运行代码计算(banq注:计算代码送到数据附近,云计算是数据送到计算代码附近)。否则你最终得到:

15/11/22 12:03:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.112): java.lang.ClassNotFoundException: com.myspark.test.spark.JavaWordCount$1
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)


其余的代码是执行mapreduce字数统计逻辑的简单RDD命令,没什么大不了的,要理解它,请访问以下页面:Hadoop wordcount

现在您应该能够将JavaWordCount应用程序提交到Apache Spark集群,如果一切顺利,从IDE启动JavaWordCount类后,您应该看到输出。

关于maven依赖关系的建议
首先,请确保使用与安装的Apache Spark相同版本的spark_core依赖项。例如,我安装了Apache Spark 1.3.0,因此我的示例在pom.xml中使用

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>

我有与hadoop 2.6.0客户端的servlet API兼容性问题,所以我使用2.2.0 hadoop客户端,它可以与Apache Spark 1.3.0核心一起使用:

<dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.2.0</version>
    </dependency>

用Apache Spark编程​​​​​​​
我们将尝试使用JavaRDD API解决简单任务。

Apache Spark JavaRDD和任务解决:如果你不知道map-reduce概念那么你就无法理解Apache Spark。为了证明这一点,让我们解决简单的任务。假设我们有以下带有温度的城市文本文件:

Prague 35
Madrid 40
Berlin 20
Paris 15
Rome 25

位于Apache Hadoop HDFS文件系统,我们需要编写简单的JavaRDD Apache Spark程序来打印具有温度低于整个平均温度的城市。

JavaRDD API和MapReduce有区别吗?
要解决前面提到的任务,我们需要将问题分成以下几部分:

  • 首先,我们需要编写JavaRDD程序来计算温度和平均值的总和。
  • 然后我们要打印温度低于计算平均值的行。

MapReduce解决方案概念:
如果我们使用Spring Data for Hadoop或为map-reduce程序指定的简单Apache Hadoop API,那么我们的解决方案将是:

  • Map函数将创建键[K,V] ='reducer',town.temperature
  • Reduce功能将接收先前的键并将整个组的温度相加并计算平均温度。
  • 链式map-reduce减少任务将打印温度低于平均值的城镇的结果。

Apache Spark JavaRDD解决方案:
  • 首先,我们需要通过将map函数应用于输入RDD集来获取所有行的JavaRDD温度集:

JavaRDD<String> parsedTemperatures = lines.map(new Function<String, String>() {
        private static final long serialVersionUID = 1L;

        public String call(String v1) throws Exception {
            final String arr[] = SPACE.split(v1);
            System.out.println("Reading temperature ["+arr[1]+"] from "+v1);
            return arr[1];
        }
    });

  • 然后我们需要将此RDD集转换为CONSTANT.row.temperature表单以将数据准备到reducer中:

JavaPairRDD<String, Integer> forGroup = parsedTemperatures.mapToPair(
            new PairFunction<String, 
            String, Integer>() {
        private static final long serialVersionUID = 1L;

        public Tuple2<String, Integer> call(String t) throws Exception {
            return new Tuple2<String, Integer>("reducer", Integer.parseInt(t));
        }
    });

  • 有了这个数据集,我们就为reducer准备了数据,它将聚合所有温度

JavaPairRDD<String, Integer> counts = forGroup.reduceByKey(
            new Function2<Integer, Integer, Integer>() {
        private static final long serialVersionUID = 1L;

        public Integer call(Integer v1, Integer v2) throws Exception {
              System.out.println("Agregatting "+v1+" plus "+v2);
              return v1 + v2;
        }
    });

(很像map-reduce概念)
要了解Spark减速器的工作原理,请查看日志:
Reading temperature [35] from Prague 35
Reading temperature [40] from Madrid 40
Agregatting 35 plus 40
Reading temperature [20] from Berlin 20
Agregatting 75 plus 20
Reading temperature [15] from Paris 15
Agregatting 95 plus 15
Reading temperature [25] from Rome 25
Agregatting 110 plus 25

Spark实际上并行运行前三个函数map,mapToPair和reduceByKey!DAG图形分析器组合Spark任务的好处之一!
解决方案的第二部分是打印温度低于平均温度的所有城镇:

Tuple2<String, Integer> sumTemperatures = counts.first();    
    final Integer sum = sumTemperatures._2;
    final long count = parsedTemperatures.count();
    final double avg = (double) sum / count;
    System.out.println("Average temperature "+avg);

    JavaRDD<String> result = lines.filter(new Function<String, Boolean>() {
        private static final long serialVersionUID = 1L;

        public Boolean call(String v1) throws Exception {
            final String arr[] = SPACE.split(v1);
            long temperature = Long.parseLong(arr[1]);
            return temperature <= avg;
        }
    });

    List<String> resultList = result.collect();
    for (String item: resultList) {
        System.out.println(
"Result item: "+item);
    }

让我们解释一下这段代码:
  • 通过counts.first()我们从reducer中读取所有温度的总和
  • 我们使用count函数来获取JavaRDD输入集中所有行的计数。
  • 我们使用JavaRDD过滤功能来过滤掉温度高于平均值的城镇。
  • 我们使用JavaRDD collect函数来打印结果。

如果你运行这个程序,你应该得到如下结果:
16/03/03 21:02:26 INFO DAGScheduler: Job 1 finished: count at AvgTemperatureAnalyzer.java:85, took 0,094561 s

Average temperature 27.0
.
.
Result item: Berlin 20
Result item: Paris 15
Result item: Rome 25

16/03/03 21:02:26 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}

结论
从我的观点来看,Apache Spark比map-reduce编程更加友好,即使概念是相同的。我打赌你明白我们需要通过JavaRDD输入进行多次迭代,但是使用map-reduce你需要弄清楚如何将前一个map reduce任务的结果传递给下一个,Apache Spark一个输入迭代以新的RDD设置,您可以在其中应用其他功能,从主节点驱动的所有内容......这不是很酷吗?

代码

让我们看看如何使用Apache Spark Shared Variables使代码看起来更干净。

Apache Spark中的共享变量
通常在编写像map这样的Spark动作时,驱动程序中的任何传递的变量都会转换为远程工作人员的本地副本。并且远程工作人员对它们的任何更改都不会传播回驱动程序。为了克服这个事实,Apache Spark提供了共享变量的概念。我们有两种类型:
1. 广播变量
当您需要跨多个Spark操作的相同数据时,广播变量非常有用。通过调用SparkContext.broadcast(v),“v”数据然后仅以序列化形式发送给远程工作者,这加快了整个过程。
2.Acumulators累加器
变量只能是我们“添加”的东西,因此它们完全可用于并行处理。重要的是要记住,远程工作人员无法读取其值,只能添加它们。最终,只有驱动程序可以使用其值进行操作。

使用共享变量的演示
Acumulators累加器是解决我们所有行的所有温度总和问题的完美之选。没有必要使用map-reduce概念,就像我在之前的Spark编程部分中所做的那样,我们将简单地使用RDD forEach函数将解析后的温度发送到累加器。
 

final Accumulator<Integer> tempSum = ctx.accumulator(0);

    lines.foreach(new VoidFunction<String>() {
        private static final long serialVersionUID = 1L;

        public void call(String t) throws Exception {
            tempSum.add(Integer.valueOf(SPACE.split(t)[1]));
        }
    });

然后计算平均温度如下:

final Integer sum = tempSum.value();
    final long count = lines.count();
    final double avg = (double) sum / count;


如果您以通常的方式编译并运行此repo,您​​应该看到以下结果,就像之前的情况一样:

Average temperature 27.0
.
.
Result item: Berlin 20
Result item: Paris 15
Result item: Rome 25

点击标题见原文