Apache Spark:数据框,数据集和RDD之间的区别 - Baeldung


Apache Spark是一个快速的分布式数据处理系统。它执行内存中的数据处理,并使用内存中的缓存和优化的执行,从而实现快速性能。它为流行的编程语言(例如Scala,Python,Java和R)提供了高级API。
在本快速教程中,我们将介绍Spark的三个基本概念:数据帧,数据集和RDD。
 
数据框DataFrame
从Spark 1.3开始,Spark SQL引入了表格形式的数据抽象,称为DataFrame。从那时起,它已成为Spark中最重要的功能之一。当我们要处理结构化和半结构化的分布式数据时,此API很有用。
在第3节中,我们将讨论弹性分布式数据集(RDD)。 DataFrame以比RDD更有效的方式存储数据,这是因为它们使用RDD的不变,内存中,弹性,分布式和并行功能,但它们也将架构应用于数据。DataFrames还可以将SQL代码转换为优化的低级RDD操作。
我们可以通过三种方式创建DataFrame:

  • 转换现有的RDD
  • 运行SQL查询
  • 加载外部数据

Spark团队在2.0版中引入了SparkSession,它统一了所有不同的上下文,从而确保开发人员无需担心创建不同的上下文:
SparkSession session = SparkSession.builder()
  .appName("TouristDataFrameExample")
  .master("local<li>")
  .getOrCreate();
 
DataFrameReader dataFrameReader = session.read();

我们将分析Tourist.csv文件:

Dataset<Row> data = dataFrameReader.option("header", "true")
  .csv("data/Tourist.csv");

由于Spark 2.0 DataFrame成为Row类型的数据集,因此我们可以将DataFrame用作Dataset <Row>的别名。
我们可以选择感兴趣的特定列。我们还可以过滤和分组给定的列:

data.select(col("country"), col("year"), col("value"))
  .show();
 
data.filter(col("country").equalTo("Mexico"))
  .show();
 
data.groupBy(col("country"))
  .count()
  .show();

 

数据集Datasets
数据集是一组强类型的结构化数据。它们提供了熟悉的面向对象编程风格以及类型安全性的好处,因为数据集可以在编译时检查语法并捕获错误。
数据集是DataFrame的扩展,因此我们可以将DataFrame视为数据集的无类型视图。
Spark团队在Spark 1.6中发布了Dataset API,正如他们提到的那样:“ Spark Datasets的目标是提供一个API,使用户可以轻松地表达对象域上的转换,同时还提供Spark SQL执行的性能和鲁棒性优势。发动机”。
首先,我们需要创建一个TouristData类型的类:

public class TouristData {
    private String region;
    private String country;
    private String year;
    private String series;
    private Double value;
    private String footnotes;
    private String source;
    // ... getters and setters
}

要将每个记录映射到指定的类型,我们将需要使用编码器。编码器在Java对象和Spark的内部二进制格式之间进行转换:
// SparkSession initialization and data load
Dataset<Row> responseWithSelectedColumns = data.select(col("region"), 
  col("country"), col("year"), col("series"), col("value").cast("double"), 
  col("footnotes"), col("source"));
 
Dataset<TouristData> typedDataset = responseWithSelectedColumns
  .as(Encoders.bean(TouristData.class));

与DataFrame一样,我们可以按特定的列进行过滤和分组:

typedDataset.filter((FilterFunction) record -> record.getCountry()
  .equals("Norway"))
  .show();
 
typedDataset.groupBy(typedDataset.col("country"))
  .count()
  .show();

我们还可以进行操作,例如按列匹配特定范围进行过滤或计算特定列的总和,以获取其总值:
typedDataset.filter((FilterFunction) record -> record.getYear() != null 
  && (Long.valueOf(record.getYear()) > 2010 
  && Long.valueOf(record.getYear()) < 2017)).show();
 
typedDataset.filter((FilterFunction) record -> record.getValue() != null 
  && record.getSeries()
    .contains("expenditure"))
    .groupBy("country")
    .agg(sum("value"))
    .show();

 
RDD
弹性分布式数据集或RDD是Spark的主要编程抽象。它代表了不可变,有弹性和分布式的元素的集合。
一个RDD封装了一个大型数据集,Spark将自动在整个集群中分布RDD中包含的数据,并并行化我们对其执行的操作。
我们只能通过稳定存储中的数据操作或其他RDD上的操作来创建RDD。
当我们处理大量数据并且数据分布在群集计算机上时,容错能力至关重要。RDD由于Spark内置的故障恢复机制而具有弹性。Spark依赖于以下事实:RDD会记住它们的创建方式,以便我们可以轻松地追溯沿袭来恢复分区。
我们可以对RDD执行两种类型的操作:Transformations和Actions。
  • 转变

我们可以将转换应用于RDD以操纵其数据。执行完此操作后,我们将得到一个全新的RDD,因为RDD是不可变的对象。
我们将检查如何实现Map和Filter,这是两种最常见的转换。
首先,我们需要创建一个JavaSparkContext并从Tourist.csv文件中将数据作为RDD加载:
SparkConf conf = new SparkConf().setAppName("uppercaseCountries")
  .setMaster("local<li>");
JavaSparkContext sc = new JavaSparkContext(conf);
 
JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");

接下来,让我们应用map函数从每个记录中获取国家的名称,并将名称转换为大写。我们可以将此新生成的数据集保存为磁盘上的文本文件:
JavaRDD<String> upperCaseCountries = tourists.map(line -> {
    String[] columns = line.split(COMMA_DELIMITER);
    return columns[1].toUpperCase();
}).distinct();
 
upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

如果只想选择一个特定国家/地区,则可以对原始游客RDD应用过滤功能:
JavaRDD<String> touristsInMexico = tourists
  .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));
 
touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

  • Action

在对数据进行一些计算之后,动作将返回最终值或将结果保存到磁盘。
Spark中经常使用的两个动作是Count和Reduce。
让我们在CSV文件中计算国家总数:
// Spark Context initialization and data load
JavaRDD<String> countries = tourists.map(line -> {
    String[] columns = line.split(COMMA_DELIMITER);
    return columns[1];
}).distinct();
 
Long numberOfCountries = countries.count();

现在,我们将按国家/地区计算总支出。我们需要过滤描述中包含支出的记录。
代替使用JavaRDD,我们将使用JavaPairRDD。一对RDD是一种RDD,可以存储键值对。接下来让我们检查一下:

JavaRDD<String> touristsExpenditure = tourists
  .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));
 
JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure
  .mapToPair(line -> {
      String[] columns = line.split(COMMA_DELIMITER);
      return new Tuple2<>(columns[1], Double.valueOf(columns[6]));
});
 
List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd
  .reduceByKey((x, y) -> x + y)
  .collect();


 综上所述,当我们需要特定于域的API,需要聚合,求和或SQL查询等高级表达式时,应使用DataFrames或Datasets。或者当我们想要在编译时进行类型安全时。
另一方面,当数据是非结构化的并且不需要实现特定的架构时,或者在需要低级的转换和操作时,我们应该使用RDD。
与往常一样,所有代码示例都可以在GitHub上获得