19-01-08
banq
尝试Spark和Apache Hive的一些方法和功能。
1. Spark和countByValue函数
让我们遵循以下RDD值:
var rddVal = sc.parallelize(Array(1,2,2,3,4,4,5,5,5,6)); |
我们的任务是创建新的RDD,其中key将是rddVal中的唯一项值,value将是rddVal项出现的次数。
countByValue是很好的工具:
%spark var rddVal = sc.parallelize(Array(1,2,2,3,4,4,4,5,5,5,6)); val countedRDD = sc.parallelize(rddVal.countByValue().toSeq); countedRDD.collect(); |
zeppelin output:
rddVal: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29 countedRDD: org.apache.spark.rdd.RDD[(Int, Long)] = ParallelCollectionRDD[7] at parallelize at <console>:31 res2: Array[(Int, Long)] = Array((5,3), (1,1), (6,1), (2,2), (3,1), (4,3)) |
2. Spark和countByKey函数
有时我们有(密钥,值)RDD,我们想要计算所有密钥的出现次数。countByKey动作函数是一个很好的工具!
%spark var rddKeyValues = sc.parallelize(Array(("A", 99), ("A",88), ("B",22), ("C",33))); val countedKeys = sc.parallelize(rddKeyValues.countByKey().toSeq); countedKeys.collect(); |
zeppelin output:
rddKeyValues: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:29 countedKeys: org.apache.spark.rdd.RDD[(String, Long)] = ParallelCollectionRDD[13] at parallelize at <console>:31 res4: Array[(String, Long)] = Array((B,1), (A,2), (C,1)) |
3. Spark和keyBy函数
如果您有值的RDD,你想在应用一个函数到每一个元素的,函数结果应该是新RDD的item key,keyBy功能是你的朋友。
%spark def multiply(num: Int):Int={ return num*num; } val inputRDD = sc.parallelize(Array(1,2,3,4,5,6)); val resRDD = inputRDD.keyBy(multiply); resRDD.collect(); |
zeppelin output:
multiply: (num: Int)Int inputRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29 resRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[15] at keyBy at <console>:33 res5: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5), (36,6)) |
4. Apache Hive和切换到另一个数据库
%spark import org.apache.spark.sql.hive.HiveContext; val hc = new HiveContext(sc); hc.sql("USE xademo"); hc.sql("SHOW TABLES").show(); |
zeppelin output:
import org.apache.spark.sql.hive.HiveContext hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@235d1d35 res11: org.apache.spark.sql.DataFrame = [result: string] +-------------------+-----------+ | tableName|isTemporary| +-------------------+-----------+ |call_detail_records| false| | customer_details| false| | genres| false| | justone| false| | mytable| false| | mytablexademo| false| | recharge_details| false| | workersxa| false| +-------------------+-----------+ |
5.数据帧分区
有时您被要求以某种特定格式将数据帧保存到HDFS中,并且您被迫使用动态分区。让我们来说明如何做到这一点:
输入文件workers.txt:
[root@sandbox ~]# cat workers.txt 1,Jerry,man,USA 2,Cathy,female,GBR 3,Teresa,female,GBR 4,Rut,female,USA 5,Roasie,female,AUS 6,Garry,man,GBR 7,Adam,man,GER 8,John,man,GBR 9,Jerremy,man,AUS 10,Angela,female,AUS 11,Ivanka,female,USA 12,Melania,female,USA |
Spark code:
%spark // Dynamic partitioning when saving from Dataframe to HDFS case class worker(id: Int, name: String, sex: String, country: String); val fileRDD = sc.textFile("/tests/workers.txt"); val workerDF = fileRDD.map(line=>new worker(line.split(",")(0).toInt, line.split(",")(1), line.split(",")(2), line.split(",")(3))).toDF(); // save dataframe also into Hive for further use workerDF.saveAsTable("tableWorkers"); workerDF.write .mode("overwrite") .partitionBy("country") .json("/tests/partition/result"); |
zeppelin output:
defined class worker fileRDD: org.apache.spark.rdd.RDD[String] = /tests/workers.txt MapPartitionsRDD[289] at textFile at <console>:114 workerDF: org.apache.spark.sql.DataFrame = [id: int, name: string, sex: string, country: string] warning: there were 1 deprecation warning(s); re-run with -deprecation for details |
HDFS 更有趣结果:
[root@sandbox ~]# hdfs dfs -ls /tests/partition/result Found 5 items -rw-r--r-- 1 zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/_SUCCESS drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=AUS drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=GBR drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=GER drwxr-xr-x - zeppelin hdfs 0 2018-08-13 23:35 /tests/partition/result/country=USA <p class="indent">[root@sandbox ~]# |
Spark为每个分区包含分组数据的每个分区创建了文件夹(例如):
[root@sandbox ~]# hdfs dfs -cat /tests/partition/result/country=USA/part-r-00000-9adc651a-1260-466d-ba37-720a0395d450 {"id":1,"name":"Jerry","sex":"man"} {"id":4,"name":"Rut","sex":"female"} |
6.将分区的HDFS数据读回数据帧
%spark val backDFJson = sqlContext.read.json("/tests/partition/result"); backDFJson.show(); |
zeppelin output:
backDFJson: org.apache.spark.sql.DataFrame = [id: bigint, name: string, sex: string, country: string] +---+-------+------+-------+ | id| name| sex|country| +---+-------+------+-------+ | 1| Jerry| man| USA| | 4| Rut|female| USA| | 11| Ivanka|female| USA| | 12|Melania|female| USA| | 5| Roasie|female| AUS| | 9|Jerremy| man| AUS| | 10| Angela|female| AUS| | 7| Adam| man| GER| | 2| Cathy|female| GBR| | 3| Teresa|female| GBR| | 6| Garry| man| GBR| | 8| John| man| GBR| +---+-------+------+-------+ |
7. Apache Hive和ORC表的动态分区
Apache Hive支持两种分区:
- 静态分区
- 动态分区
有关更多信息,我可以推荐以下博客。基本区别在于,当您将数据保存到静态分区表时,您必须使用区分分区的值来命名分区列。我是动态分区的情况,如果不存在则创建分区。不需要任何值,只需分区列。
测试任务:让我们创建按country动态分区的工作人员ORC表。并从之前创建的表“tableWorkers”中将数据保存到其中。
%spark // dynamic partitioning on Hive Table... import org.apache.spark.sql.hive.HiveContext; var hc = new HiveContext(sc); hc.sql(" DROP TABLE IF EXISTS WorkersPartitioned "); hc.sql(" CREATE TABLE WorkersPartitioned(id INT, name String, sex String) "+ " PARTITIONED BY (country STRING) "+ " STORED AS ORC " ); hc.sql(" SET set hive.exec.dynamic.partition=true "); hc.sql(" SET hive.exec.dynamic.partition.mode=nonstric "); hc.sql(" INSERT OVERWRITE TABLE WorkersPartitioned PARTITION(country) SELECT id, name, sex, country FROM tableWorkers "); hc.sql(" SELECT * FROM WorkersPartitioned ").show(); |
注意代码“PARTITION(country)”,我们不需要输入确切的country,即动态分区。
Zeppelin output:
res165: org.apache.spark.sql.DataFrame = [key: string, value: string] res166: org.apache.spark.sql.DataFrame = [] +---+-------+------+-------+ | id| name| sex|country| +---+-------+------+-------+ | 5| Roasie|female| AUS| | 9|Jerremy| man| AUS| | 10| Angela|female| AUS| | 2| Cathy|female| GBR| | 3| Teresa|female| GBR| | 6| Garry| man| GBR| | 8| John| man| GBR| | 7| Adam| man| GER| | 1| Jerry| man| USA| | 4| Rut|female| USA| | 11| Ivanka|female| USA| | 12|Melania|female| USA| +---+-------+------+-------+ |
8. Apache Hive在表格中描述分区
import org.apache.spark.sql.hive.HiveContext; val hc = new HiveContext(sc); hc.sql("show partitions WorkersPartitioned").show(); |
Zeppelin output:
import org.apache.spark.sql.hive.HiveContext hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@2a117cf4 +-----------+ | result| +-----------+ |country=AUS| |country=GBR| |country=GER| |country=USA| +-----------+ |
点击标题见原文
猜你喜欢