1. registerTempTable 与createOrReplaceTempView
以下面数据JSON为案例people.json:
{ "name": "John", "age": "28", "country": "UK" } { "name": "Cathy", "age": "30", "country": "AUS" } { "name": "Mark", "age": "50", "country": "USA" }
|
输入:%spark2
val peopleJsonDF = sqlContext.read.format("json").load("/tests/people.json");
peopleJsonDF.createOrReplaceTempView("jsonPeople");
sqlContext.sql("select * from jsonPeople where age > 30").show();
|
Zeppelin output:
peopleJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field] +---+-------+----+ |age|country|name| +---+-------+----+ | 50| USA|Mark| +---+-------+----+
|
结果:registerTempTable是Sparn 1.x语法,createOrReplaceTempView是Spark 2.x语法。两者都做同样的事情。
2. Apache Spark 2中的Spark-csv
任务:从JSON文件中获取数据,进行一些排序并将结果保存为CSV文件。Apache Spark 2在这里非常方便!
%spark2
val peopleJsonDF = sqlContext.read.option("multiline", true).format("json").load("/tests/people.json");
peopleJsonDF.createOrReplaceTempView("jsonPeople");
val orderedJsonDF = sqlContext.sql(" select * from jsonPeople ORDER BY age DESC ");
orderedJsonDF.show();
orderedJsonDF.write.mode("overwrite").option("sep", "|").option("header", true).format("csv").save("/tests/csvresult");
|
Zeppelin output:
peopleJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field] orderedJsonDF: org.apache.spark.sql.DataFrame = [age: string, country: string ... 1 more field] +---+-------+-----+ |age|country| name| +---+-------+-----+ | 50| USA| Mark| | 30| AUS|Cathy| | 28| UK| John| +---+-------+-----+
|
HDFS output:
[root@sandbox ~]# hdfs dfs -ls /tests/csvresult Found 5 items -rw-r--r-- 1 zeppelin hdfs 0 2018-09-04 17:14 /tests/csvresult/_SUCCESS -rw-r--r-- 1 zeppelin hdfs 29 2018-09-04 17:14 /tests/csvresult/part-00000-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv -rw-r--r-- 1 zeppelin hdfs 30 2018-09-04 17:14 /tests/csvresult/part-00001-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv -rw-r--r-- 1 zeppelin hdfs 28 2018-09-04 17:14 /tests/csvresult/part-00002-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv -rw-r--r-- 1 zeppelin hdfs 0 2018-09-04 17:14 /tests/csvresult/part-00003-4f4204f7-636b-4f7c-a8b7-a9a8a4c395cf.csv
|
3.从CSV中推断模式并将结果保存到ORC中并通过Hive将其恢复
让我们将之前的结果'/ tests / csvresult'作为此任务的输入。
%spark2
import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.types._;
var hiveContext = new HiveContext(sc);
var csvDF = sqlContext.read .option("header", true) .option("sep", "|") .option("inferSchema", true) .format("csv").load("/tests/csvresult");
csvDF.createOrReplaceTempView("csvTable");
var sortedDF = sqlContext.sql("select age,country,name from csvTable order by age desc");
sortedDF.write.mode("overwrite").format("orc").save("/tests/orcresult");
hiveContext.sql(" DROP TABLE IF EXISTS people "); hiveContext.sql("CREATE EXTERNAL TABLE people (age INT, country String, name String) "+ " STORED AS ORC "+ " LOCATION '/tests/orcresult'" );
hiveContext.sql("select * from people").show();
|
Zeppelin output:
import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types._ warning: there was one deprecation warning; re-run with -deprecation for details hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@4fc938b8 csvDF: org.apache.spark.sql.DataFrame = [age: int, country: string ... 1 more field] sortedDF: org.apache.spark.sql.DataFrame = [age: int, country: string ... 1 more field] res135: org.apache.spark.sql.DataFrame = [] res136: org.apache.spark.sql.DataFrame = []
+---+-------+-----+ |age|country| name| +---+-------+-----+ | 50| USA| Mark| | 30| AUS|Cathy| | 28| UK| John| +---+-------+-----+
|
4.删除格式错误的记录
假设有文件malformed.csv:
50|USA|Mark 30|AUS|Cathy 28|UK
|
并且任务是删除格式错误的记录并创建新的DF:
%spark2
val schema = StructType(Array( StructField("age", IntegerType, false), StructField("country", StringType, false), StructField("name", StringType, false) ));
val malformedDF = sqlContext.read.format("csv") .schema(schema) .option("mode", "DROPMALFORMED") .option("header", false) .option("sep", "|") .load("/tests/malformed.csv");
malformedDF.show();
|
Zeppelin output:
schema: org.apache.spark.sql.types.StructType = StructType(StructField(age,IntegerType,false), StructField(country,StringType,false), StructField(name,StringType,false)) malformedDF: org.apache.spark.sql.DataFrame = [age: int, country: string ... 1 more field] +---+-------+-----+ |age|country| name| +---+-------+-----+ | 50| USA| Mark| | 30| AUS|Cathy| +---+-------+-----+
|