Apache Spark 2一些使用案例


1. registerTempTable 与createOrReplaceTempView
以下面数据JSON为案例people.json:

{
    "name": "John"
    
"age": "28"
    
"country"
    
"UK"
}
{
    
"name": "Cathy"
    
"age": "30"
    
"country": "AUS"
}
{
    
"name": "Mark"
    
"age": "50"
    
"country": "USA"
}

输入:

%spark2

val peopleJsonDF = sqlContext.read.format("json").load("/tests/people.json");

peopleJsonDF.createOrReplaceTempView(
"jsonPeople");

sqlContext.sql(
"select * from jsonPeople where age > 30").show();

Zeppelin output:

peopleJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field]
+---+-------+----+
|age|country|name|
+---+-------+----+
| 50|    USA|Mark|
+---+-------+----+

结果:registerTempTable是Sparn 1.x语法,createOrReplaceTempView是Spark 2.x语法。两者都做同样的事情。

2. Apache Spark 2中的Spark-csv
任务:从JSON文件中获取数据,进行一些排序并将结果保存为CSV文件。Apache Spark 2在这里非常方便!

%spark2

val peopleJsonDF = sqlContext.read.option("multiline", true).format("json").load("/tests/people.json");

peopleJsonDF.createOrReplaceTempView(
"jsonPeople");

val orderedJsonDF = sqlContext.sql(
" select * from jsonPeople ORDER BY age DESC ");

orderedJsonDF.show();

orderedJsonDF.write.mode(
"overwrite").option("sep", "|").option("header", true).format("csv").save("/tests/csvresult");

Zeppelin output:

peopleJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field]
orderedJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field]
+---+-------+-----+
|age|country| name|
+---+-------+-----+
| 50|    USA| Mark|
| 30|    AUS|Cathy|
| 28|     UK| John|
+---+-------+-----+

HDFS output:

[root@sandbox ~]# hdfs dfs -ls /tests/csvresult
Found 5 items
-rw-r--r--   1 zeppelin hdfs          0 2018-09-04 17:14 /tests/csvresult/_SUCCESS
-rw-r--r--   1 zeppelin hdfs         29 2018-09-04 17:14 /tests/csvresult/part-00000-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv
-rw-r--r--   1 zeppelin hdfs         30 2018-09-04 17:14 /tests/csvresult/part-00001-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv
-rw-r--r--   1 zeppelin hdfs         28 2018-09-04 17:14 /tests/csvresult/part-00002-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv
-rw-r--r--   1 zeppelin hdfs          0 2018-09-04 17:14 /tests/csvresult/part-00003-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv

3.从CSV中推断模式并将结果保存到ORC中并通过Hive将其恢复
让我们将之前的结果'/ tests / csvresult'作为此任务的输入。

%spark2

import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types._;

var hiveContext = new HiveContext(sc);

var csvDF = sqlContext.read
            .option("header", true)
            .option(
"sep", "|")
            .option(
"inferSchema", true)
            .format(
"csv").load("/tests/csvresult");

csvDF.createOrReplaceTempView(
"csvTable");


var sortedDF = sqlContext.sql(
"select age,country,name from csvTable order by age desc");

sortedDF.write.mode(
"overwrite").format("orc").save("/tests/orcresult");

hiveContext.sql(
" DROP TABLE IF EXISTS people ");
hiveContext.sql(
"CREATE EXTERNAL TABLE people (age INT, country String, name String) "+
                       
" STORED AS ORC "+
                       
" LOCATION '/tests/orcresult'"
);

hiveContext.sql(
"select * from people").show();

Zeppelin output:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
warning: there was one deprecation warning; re-run with -deprecation for details
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@4fc938b8
csvDF: org.apache.spark.sql.DataFrame = [age: int, country: string ... 1 more field]
sortedDF: org.apache.spark.sql.DataFrame = [age: int, country: string ... 1 more field]
res135: org.apache.spark.sql.DataFrame = []
res136: org.apache.spark.sql.DataFrame = []

+---+-------+-----+
|age|country| name|
+---+-------+-----+
| 50|    USA| Mark|
| 30|    AUS|Cathy|
| 28|     UK| John|
+---+-------+-----+

4.删除格式错误的记录

假设有文件malformed.csv:

50|USA|Mark
30|AUS|Cathy
28|UK

并且任务是删除格式错误的记录并创建新的DF:

%spark2

val schema = StructType(Array(
  StructField("age", IntegerType, false),
  StructField(
"country", StringType, false),
  StructField(
"name", StringType, false)
));

val malformedDF = sqlContext.read.format(
"csv")
                        .schema(schema)
                        .option(
"mode", "DROPMALFORMED")
                        .option(
"header", false)
                        .option(
"sep", "|")
                        .load(
"/tests/malformed.csv");

malformedDF.show();

Zeppelin output:

schema: org.apache.spark.sql.types.StructType = StructType(StructField(age,IntegerType,false), StructField(country,StringType,false), StructField(name,StringType,false))
malformedDF: org.apache.spark.sql.DataFrame = [age: int, country: string ... 1 more field]
+---+-------+-----+
|age|country| name|
+---+-------+-----+
| 50|    USA| Mark|
| 30|    AUS|Cathy|
+---+-------+-----+