Apache Spark和Hive有用的功能

19-01-08 banq
         

尝试Spark和Apache Hive的一些方法和功能。

1. Spark和countByValue函数

让我们遵循以下RDD值:

var rddVal = sc.parallelize(Array(1,2,2,3,4,4,5,5,5,6));

我们的任务是创建新的RDD,其中key将是rddVal中的唯一项值,value将是rddVal项出现的次数。

countByValue是很好的工具:

%spark

var rddVal = sc.parallelize(Array(1,2,2,3,4,4,4,5,5,5,6));

val countedRDD = sc.parallelize(rddVal.countByValue().toSeq);

countedRDD.collect();

zeppelin output:

rddVal: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29
countedRDD: org.apache.spark.rdd.RDD[(Int, Long)] = ParallelCollectionRDD[7] at parallelize at <console>:31
res2: Array[(Int, Long)] = Array((5,3), (1,1), (6,1), (2,2), (3,1), (4,3))

2. Spark和countByKey函数

有时我们有(密钥,值)RDD,我们想要计算所有密钥的出现次数。countByKey动作函数是一个很好的工具!

%spark
var rddKeyValues = sc.parallelize(Array(("A", 99), ("A",88), ("B",22), ("C",33)));

val countedKeys = sc.parallelize(rddKeyValues.countByKey().toSeq);

countedKeys.collect();

zeppelin output:

rddKeyValues: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:29
countedKeys: org.apache.spark.rdd.RDD[(String, Long)] = ParallelCollectionRDD[13] at parallelize at <console>:31
res4: Array[(String, Long)] = Array((B,1), (A,2), (C,1))

3. Spark和keyBy函数

如果您有值的RDD,你想在应用一个函数到每一个元素的,函数结果应该是新RDD的item key,keyBy功能是你的朋友。

%spark

def multiply(num: Int):Int={
    return num*num;
}

val inputRDD = sc.parallelize(Array(1,2,3,4,5,6));

val resRDD = inputRDD.keyBy(multiply);

resRDD.collect();

zeppelin output:

multiply: (num: Int)Int
inputRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29
resRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[15] at keyBy at <console>:33
res5: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5), (36,6))

4. Apache Hive和切换到另一个数据库

%spark
import org.apache.spark.sql.hive.HiveContext;

val hc = new HiveContext(sc);

hc.sql("USE xademo");
hc.sql("SHOW TABLES").show();

zeppelin output:

import org.apache.spark.sql.hive.HiveContext
hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@235d1d35
res11: org.apache.spark.sql.DataFrame = [result: string]
+-------------------+-----------+
|          tableName|isTemporary|
+-------------------+-----------+
|call_detail_records|      false|
|   customer_details|      false|
|             genres|      false|
|            justone|      false|
|            mytable|      false|
|      mytablexademo|      false|
|   recharge_details|      false|
|          workersxa|      false|
+-------------------+-----------+

5.数据帧分区

有时您被要求以某种特定格式将数据帧保存到HDFS中,并且您被迫使用动态分区。让我们来说明如何做到这一点:

输入文件workers.txt:

[root@sandbox ~]# cat workers.txt 
1,Jerry,man,USA
2,Cathy,female,GBR
3,Teresa,female,GBR
4,Rut,female,USA
5,Roasie,female,AUS
6,Garry,man,GBR
7,Adam,man,GER
8,John,man,GBR
9,Jerremy,man,AUS
10,Angela,female,AUS
11,Ivanka,female,USA
12,Melania,female,USA

Spark code:

%spark

// Dynamic partitioning when saving from Dataframe to HDFS

case class worker(id: Int, name: String, sex: String, country: String);

val fileRDD = sc.textFile("/tests/workers.txt");

val workerDF = fileRDD.map(line=>new worker(line.split(",")(0).toInt, 
                                        line.split(",")(1), 
                                        line.split(",")(2), 
                                        line.split(",")(3))).toDF();

// save dataframe also into Hive for further use                                        
workerDF.saveAsTable("tableWorkers");

workerDF.write
        .mode("overwrite")
        .partitionBy("country")
        .json("/tests/partition/result");

zeppelin output:

defined class worker
fileRDD: org.apache.spark.rdd.RDD[String] = /tests/workers.txt MapPartitionsRDD[289] at textFile at <console>:114
workerDF: org.apache.spark.sql.DataFrame = [id: int, name: string, sex: string, country: string]
warning: there were 1 deprecation warning(s); re-run with -deprecation for details

HDFS 更有趣结果:

[root@sandbox ~]# hdfs dfs -ls /tests/partition/result
Found 5 items
-rw-r--r--   1 zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/_SUCCESS
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=AUS
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=GBR
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=GER
drwxr-xr-x   - zeppelin hdfs          0 2018-08-13 23:35 /tests/partition/result/country=USA
<p>[root@sandbox ~]#

Spark为每个分区包含分组数据的每个分区创建了文件夹(例如):

[root@sandbox ~]# hdfs dfs -cat /tests/partition/result/country=USA/part-r-00000-9adc651a-1260-466d-ba37-720a0395d450
{"id":1,"name":"Jerry","sex":"man"}
{"id":4,"name":"Rut","sex":"female"}

6.将分区的HDFS数据读回数据帧

%spark
val backDFJson = sqlContext.read.json("/tests/partition/result");

backDFJson.show();

zeppelin output:

backDFJson: org.apache.spark.sql.DataFrame = [id: bigint, name: string, sex: string, country: string]
+---+-------+------+-------+
| id|   name|   sex|country|
+---+-------+------+-------+
|  1|  Jerry|   man|    USA|
|  4|    Rut|female|    USA|
| 11| Ivanka|female|    USA|
| 12|Melania|female|    USA|
|  5| Roasie|female|    AUS|
|  9|Jerremy|   man|    AUS|
| 10| Angela|female|    AUS|
|  7|   Adam|   man|    GER|
|  2|  Cathy|female|    GBR|
|  3| Teresa|female|    GBR|
|  6|  Garry|   man|    GBR|
|  8|   John|   man|    GBR|
+---+-------+------+-------+

7. Apache Hive和ORC表的动态分区

Apache Hive支持两种分区:

  • 静态分区
  • 动态分区

有关更多信息,我可以推荐以下博客。基本区别在于,当您将数据保存到静态分区表时,您必须使用区分分区的值来命名分区列。我是动态分区的情况,如果不存在则创建分区。不需要任何值,只需分区列。

测试任务:让我们创建按country动态分区的工作人员ORC表。并从之前创建的表“tableWorkers”中将数据保存到其中。

%spark

// dynamic partitioning on Hive Table...
import org.apache.spark.sql.hive.HiveContext;

var hc = new HiveContext(sc);

hc.sql(" DROP TABLE IF EXISTS WorkersPartitioned ");
hc.sql(" CREATE TABLE WorkersPartitioned(id INT, name String, sex String) "+
   " PARTITIONED BY (country STRING) "+
   " STORED AS ORC "
);

hc.sql(" SET set hive.exec.dynamic.partition=true ");
hc.sql(" SET hive.exec.dynamic.partition.mode=nonstric ");

hc.sql(" INSERT OVERWRITE TABLE WorkersPartitioned PARTITION(country) SELECT id, name, sex, country FROM tableWorkers ");

hc.sql(" SELECT * FROM WorkersPartitioned ").show();

注意代码“PARTITION(country)”,我们不需要输入确切的country,即动态分区。

Zeppelin output:

res165: org.apache.spark.sql.DataFrame = [key: string, value: string]
res166: org.apache.spark.sql.DataFrame = []
+---+-------+------+-------+
| id|   name|   sex|country|
+---+-------+------+-------+
|  5| Roasie|female|    AUS|
|  9|Jerremy|   man|    AUS|
| 10| Angela|female|    AUS|
|  2|  Cathy|female|    GBR|
|  3| Teresa|female|    GBR|
|  6|  Garry|   man|    GBR|
|  8|   John|   man|    GBR|
|  7|   Adam|   man|    GER|
|  1|  Jerry|   man|    USA|
|  4|    Rut|female|    USA|
| 11| Ivanka|female|    USA|
| 12|Melania|female|    USA|
+---+-------+------+-------+

8. Apache Hive在表格中描述分区

import org.apache.spark.sql.hive.HiveContext;

val hc = new HiveContext(sc);

hc.sql("show partitions WorkersPartitioned").show();

Zeppelin output:

import org.apache.spark.sql.hive.HiveContext
hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@2a117cf4
+-----------+
|     result|
+-----------+
|country=AUS|
|country=GBR|
|country=GER|
|country=USA|
+-----------+

点击标题见原文