Apache Spark SQL的高级Join连接技术

19-01-08 banq
    

让我们有以下输入样本数据:

people.txt -> saved at HDFS under /tests/people.txt

1,Jerry,man,USA
2,Cathy,female,GBR
3,Teresa,female,GBR
4,Rut,female,USA
5,Roasie,female,AUS
6,Tomas,man,CZE

 peoples orders -> saved at HDFS under /tests/orders.txt

1,pizza,2500
1,beer,50
1,food,3000
2,pizza,3000
2,drink,24000
2,vine,40000
3,pizza,50
3,white-wine,4000
3,bread,20
4,pizza,4000
4,food,3000
4,vodka,30
5,cola,4000
5,bread,50000
5,sangria,300

让我们训练一下Apache Spark SQL

1.从组中取最大行。

任务:为people.txt中的每一行获取最昂贵的订单。结果格式:name, country, price.

case class people(id: Int, name: String, gender: String, country: String);

case class order(id: Int, item: String, price: Long)

val peopleDF = sc.textFile("/tests/people.txt").map(
    line=>new people(line.split(",")(0).toInt, 
                     line.split(",")(1),
                     line.split(",")(2),
                     line.split(",")(3))).toDF();

val orderDF = sc.textFile("/tests/orders.txt").map(
        line=>new order(line.split(",")(0).toInt,
                    line.split(",")(1),
                    line.split(",")(2).toLong)
            ).toDF();

peopleDF.registerTempTable("people");
orderDF.registerTempTable("order");

val resultDF = sqlContext.sql(" select p.name, p.country, gr.max_p FROM people p INNER JOIN "+
                          " (select o.id, max(o.price) as max_p FROM order o GROUP BY o.id) gr ON gr.id = p.id "    
);

resultDF.show();

Zeppelin output:

defined class people
defined class order
peopleDF: org.apache.spark.sql.DataFrame = [id: int, name: string, gender: string, country: string]
orderDF: org.apache.spark.sql.DataFrame = [id: int, item: string, price: bigint]
resultDF: org.apache.spark.sql.DataFrame = [name: string, country: string, max_p: bigint]
+------+-------+-----+
|  name|country|max_p|
+------+-------+-----+
| Jerry|    USA| 3000|
| Cathy|    GBR|40000|
|Teresa|    GBR| 4000|
|   Rut|    USA| 4000|
|Roasie|    AUS|50000|
+------+-------+-----+

这很简单,因为我们不需要任何额外的数据来获得最大价格订单。

2. LEFT JOIN测试

任务:从people.txt获取所有没有订购的行。

case class people(id: Int, name: String, gendre: String, country: String);

case class order(id: Int, item: String, price: Long)

val peopleDF = sc.textFile("/tests/people.txt").map(
    line=>new people(line.split(",")(0).toInt, 
                     line.split(",")(1),
                     line.split(",")(2),
                     line.split(",")(3))).toDF();

val orderDF = sc.textFile("/tests/orders.txt").map(
    line=>new order(line.split(",")(0).toInt,
                    line.split(",")(1),
                    line.split(",")(2).toLong)
            ).toDF();

peopleDF.registerTempTable("people");
orderDF.registerTempTable("order");

val resultDF = sqlContext.sql(" select p.name, p.country from people p LEFT JOIN order o ON p.id = o.id WHERE o.id is null ");

resultDF.show();

Zeppelin output:

defined class people
defined class order
peopleDF: org.apache.spark.sql.DataFrame = [id: int, name: string, gendre: string, country: string]
orderDF: org.apache.spark.sql.DataFrame = [id: int, item: string, price: bigint]
resultDF: org.apache.spark.sql.DataFrame = [name: string, country: string]
+-----+-------+
| name|country|
+-----+-------+
|Tomas|    CZE|
+-----+-------+

经典LEFT JOIN的东西,从左边的表得到一切,无需引用右表。

3. 从GROUP中获取前N行

现在好玩的开始了!

任务:

在people.txt给我前两个最昂贵的订单每一行。

import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.functions._;

case class people(id: Int, name: String, gendre: String, country: String);

case class order(id: Int, item: String, price: Long)

val peopleDF = sc.textFile("/tests/people.txt").map(
    line=>new people(line.split(",")(0).toInt, 
                     line.split(",")(1),
                     line.split(",")(2),
                     line.split(",")(3))).toDF();

val orderDF = sc.textFile("/tests/orders.txt").map(
    line=>new order(line.split(",")(0).toInt,
                    line.split(",")(1),
                    line.split(",")(2).toLong)
            ).toDF();

peopleDF.registerTempTable("people");
orderDF.registerTempTable("order");

val window = Window.partitionBy("id").orderBy(col("price").desc);

val indexedGroupDF = orderDF.withColumn("r", row_number().over(window)).where(col("r") <= 2);

indexedGroupDF.registerTempTable("grouped");

val resultDF = sqlContext.sql("select p.name, r.item, r.price from people p INNER JOIN grouped r ON p.id = r.id");

resultDF.show();

Zeppelin output:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
defined class people
defined class order
peopleDF: org.apache.spark.sql.DataFrame = [id: int, name: string, gendre: string, country: string]
orderDF: org.apache.spark.sql.DataFrame = [id: int, item: string, price: bigint]
window: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@5d181d1e
indexedGroupDF: org.apache.spark.sql.DataFrame = [id: int, item: string, price: bigint, r: int]
resultDF: org.apache.spark.sql.DataFrame = [name: string, item: string, price: bigint]
+------+----------+-----+
|  name|      item|price|
+------+----------+-----+
| Jerry|      food| 3000|
| Jerry|     pizza| 2500|
| Cathy|      vine|40000|
| Cathy|     drink|24000|
|Teresa|white-wine| 4000|
|Teresa|     pizza|   50|
|   Rut|     pizza| 4000|
|   Rut|      food| 3000|
|Roasie|     bread|50000|
|Roasie|      cola| 4000|
+------+----------+-----+

  • 窗口函数按人员ID分区订单,从而导致创建组。我们按价格下降了这些组。
  • 通过row_number()函数,我们索引了组中的每一行。
  • 在where函数中,我们仅限制组中前两行的数据帧。
  • 最后的结果DF只是以请求的格式打印。

点击标题见原文!