尝试Spark和Apache Hive的一些方法和功能。
1. Spark和countByValue函数
让我们遵循以下RDD值:
var rddVal = sc.parallelize(Array(1,2,2,3,4,4,5,5,5,6)); |
我们的任务是创建新的RDD,其中key将是rddVal中的唯一项值,value将是rddVal项出现的次数。
countByValue是很好的工具:
%spark |
zeppelin output:
rddVal: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29 |
2. Spark和countByKey函数
有时我们有(密钥,值)RDD,我们想要计算所有密钥的出现次数。countByKey动作函数是一个很好的工具!
%spark |
zeppelin output:
rddKeyValues: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:29 |
3. Spark和keyBy函数
如果您有值的RDD,你想在应用一个函数到每一个元素的,函数结果应该是新RDD的item key,keyBy功能是你的朋友。
%spark |
zeppelin output:
multiply: (num: Int)Int |
4. Apache Hive和切换到另一个数据库
%spark |
zeppelin output:
import org.apache.spark.sql.hive.HiveContext |
5.数据帧分区
有时您被要求以某种特定格式将数据帧保存到HDFS中,并且您被迫使用动态分区。让我们来说明如何做到这一点:
输入文件workers.txt:
[root@sandbox ~]# cat workers.txt |
Spark code:
%spark |
zeppelin output:
defined class worker |
HDFS 更有趣结果:
[root@sandbox ~]# hdfs dfs -ls /tests/partition/result |
Spark为每个分区包含分组数据的每个分区创建了文件夹(例如):
[root@sandbox ~]# hdfs dfs -cat /tests/partition/result/country=USA/part-r-00000-9adc651a-1260-466d-ba37-720a0395d450 |
6.将分区的HDFS数据读回数据帧
%spark |
zeppelin output:
backDFJson: org.apache.spark.sql.DataFrame = [id: bigint, name: string, sex: string, country: string] |
7. Apache Hive和ORC表的动态分区
Apache Hive支持两种分区:
- 静态分区
- 动态分区
有关更多信息,我可以推荐以下博客。基本区别在于,当您将数据保存到静态分区表时,您必须使用区分分区的值来命名分区列。我是动态分区的情况,如果不存在则创建分区。不需要任何值,只需分区列。
测试任务:让我们创建按country动态分区的工作人员ORC表。并从之前创建的表“tableWorkers”中将数据保存到其中。
%spark |
注意代码“PARTITION(country)”,我们不需要输入确切的country,即动态分区。
Zeppelin output:
res165: org.apache.spark.sql.DataFrame = [key: string, value: string] |
8. Apache Hive在表格中描述分区
import org.apache.spark.sql.hive.HiveContext; |
Zeppelin output:
import org.apache.spark.sql.hive.HiveContext |
点击标题见原文