Apache Spark Dataframe Join语法教程

19-01-08 banq
    

始终考虑将RDD转换为Dataframe数据帧,执行请求的操作并保存结果......这是节省时间最多的方法。那么让我们深入研究Dataframe数据帧测试

数据workers:

1,Jerry,man,USA
2,Cathy,female,GBR
3,Teresa,female,GBR
4,Rut,female,USA
5,Roasie,female,AUS
6,Garry,man,GBR
7,Adam,man,GER
8,John,man,GBR
9,Jerremy,man,AUS
10,Angela,female,AUS
11,Ivanka,female,USA
12,Melania,female,USA

每个工人workers都有工作场所workplace :

1,developer
2,qa
3,product_manager
4,qa
5,qa
6,developer
7,team_leader
8,developer
9,team_leader
10,qa
11,developer
让我们准备
dataframes:

import org.apache.spark.sql.functions._;

case class worker(id: Int, name: String, sex: String, country: String);

case class workplace (id: Int, department: String);

val workers = sc.textFile("/tests/workers.txt");

val workplaces = sc.textFile("/tests/workplaces.txt");

val workerRDD = workers.map(line=>new worker(line.split(",")(0).toInt,
                                  line.split(",")(1),
                                  line.split(",")(2),
                                  line.split(",")(3)));
val workerDF = workerRDD.toDF();

val workplacesRDD = workplaces.map(line=>new workplace(line.split(",")(0).toInt,
                                                       line.split(",")(1)));

val workplacesDF = workplacesRDD.toDF();

任务1.内联接表达式

第一种可能的解决方案是使用join 表达式的join 函数: 

val allDF = workerDF.join(workplacesDF,workerDF(“id”)=== workplacesDF(“id”)); 
allDF.show();

输出将是:

+---+-------+------+-------+---+---------------+
| id|   name|   sex|country| id|     department|
+---+-------+------+-------+---+---------------+
|  1|  Jerry|   man|    USA|  1|      developer|
|  2|  Cathy|female|    GBR|  2|             qa|
|  3| Teresa|female|    GBR|  3|product_manager|
|  4|    Rut|female|    USA|  4|             qa|
|  5| Roasie|female|    AUS|  5|             qa|
|  6|  Garry|   man|    GBR|  6|      developer|
|  7|   Adam|   man|    GER|  7|    team_leader|
|  8|   John|   man|    GBR|  8|      developer|
|  9|Jerremy|   man|    AUS|  9|    team_leader|
| 10| Angela|female|    AUS| 10|             qa|
| 11| Ivanka|female|    USA| 11|      developer|
+---+-------+------+-------+---+---------------+

那么输出是完全正确的...... 但是有两次列出的join 列 ...... 让我们看看如何解决这个问题....

任务2.仅列出一次join 列的内部联接

以前的输出是正确的,但如何删除重复连接列?只是...

val resultDF = workerDF.join(workplacesDF, Seq("id"));
resultDF.show();

输出将是:

+---+-------+------+-------+---------------+
| id|   name|   sex|country|     department|
+---+-------+------+-------+---------------+
|  1|  Jerry|   man|    USA|      developer|
|  2|  Cathy|female|    GBR|             qa|
|  3| Teresa|female|    GBR|product_manager|
|  4|    Rut|female|    USA|             qa|
|  5| Roasie|female|    AUS|             qa|
|  6|  Garry|   man|    GBR|      developer|
|  7|   Adam|   man|    GER|    team_leader|
|  8|   John|   man|    GBR|      developer|
|  9|Jerremy|   man|    AUS|    team_leader|
| 10| Angela|female|    AUS|             qa|
| 11| Ivanka|female|    USA|      developer|
+---+-------+------+-------+---------------+

任务3.使用select projection的内部连接

另一种解决方案是使用连接表达式 并使用select projection确定输出列...

val resultDF = workerDF.join(workplacesDF, workerDF("id")===workplacesDF("id"), "inner").
    select(workerDF("id"), workerDF("name"), workerDF("country"), workplacesDF("department"));

resultDF.show(); 

输出应该是:

+---+-------+-------+---------------+
| id|   name|country|     department|
+---+-------+-------+---------------+
|  1|  Jerry|    USA|      developer|
|  2|  Cathy|    GBR|             qa|
|  3| Teresa|    GBR|product_manager|
|  4|    Rut|    USA|             qa|
|  5| Roasie|    AUS|             qa|
|  6|  Garry|    GBR|      developer|
|  7|   Adam|    GER|    team_leader|
|  8|   John|    GBR|      developer|
|  9|Jerremy|    AUS|    team_leader|
| 10| Angela|    AUS|             qa|
| 11| Ivanka|    USA|      developer|
+---+-------+-------+---------------+

任务4.使用SQL查询的内连接

如果使用函数不合适,可以使用SQL查询来获取数据: 

workerDF.registerTempTable("worker");
workplacesDF.registerTempTable("workplaces");

val resultDF = sqlContext.sql("select w.sex, w.id, w.name, w.country, wp.department from worker w LEFT JOIN workplaces wp ON w.id = wp.id");

输出:

+------+---+-------+-------+---------------+
|   sex| id|   name|country|     department|
+------+---+-------+-------+---------------+
|   man|  1|  Jerry|    USA|      developer|
|female|  2|  Cathy|    GBR|             qa|
|female|  3| Teresa|    GBR|product_manager|
|female|  4|    Rut|    USA|             qa|
|female|  5| Roasie|    AUS|             qa|
|   man|  6|  Garry|    GBR|      developer|
|   man|  7|   Adam|    GER|    team_leader|
|   man|  8|   John|    GBR|      developer|
|   man|  9|Jerremy|    AUS|    team_leader|
|female| 10| Angela|    AUS|             qa|
|female| 11| Ivanka|    USA|      developer|
|female| 12|Melania|    USA|           null|
+------+---+-------+-------+---------------+

正如你所看到的,我们真的执行了LEFT JOIN,因为Melania没有部门。

任务5.内部联接和过滤功能(Scala表达式)

让我们联系工人和他们的部门,并列出只有“QA”的人......

val resultDF = workerDF.join(workplacesDF,Seq(“id”))。filter(workplacesDF(“department”)===“qa”); 
resultDF.show();

输出:

+---+------+------+-------+----------+
| id|  name|   sex|country|department|
+---+------+------+-------+----------+
|  2| Cathy|female|    GBR|        qa|
|  4|   Rut|female|    USA|        qa|
|  5|Roasie|female|    AUS|        qa|
| 10|Angela|female|    AUS|        qa|
+---+------+------+-------+----------+

任务6.内部联接和过滤功能(SQL表达式)

同样,如果您对Scala语法的过滤函数不满意,可以使用类似SQL的语法:

val resultDF = workerDF.join(workplacesDF, Seq("id")).filter("department = 'qa'");
resultDF.show();

不知何故,我更喜欢scala语法,但输出再次相同:

+---+------+------+-------+----------+
| id|  name|   sex|country|department|
+---+------+------+-------+----------+
|  2| Cathy|female|    GBR|        qa|
|  4|   Rut|female|    USA|        qa|
|  5|Roasie|female|    AUS|        qa|
| 10|Angela|female|    AUS|        qa|
+---+------+------+-------+----------+

任务7.显示没有部门的工人

为了获得没有部门的工人,我们需要执行LEFT JOIN并仅过滤具有NULL部门的行 :

val resultDF = workerDF.join(workplacesDF, Seq("id"), "left").filter(workplacesDF("department").isNull);
resultDF.show();

输出:

+---+-------+------+-------+----------+
| id|   name|   sex|country|department|
+---+-------+------+-------+----------+
| 12|Melania|female|    USA|      null|
+---+-------+------+-------+----------+

任务8.如果worker是“qa”,添加名为“isQA”的列为“1”

要解决这个任务,我们将使用带有when子句的withColumn函数:

val resultDF = workerDF.join(workplacesDF, Seq("id")).withColumn("isQA", 
when(workplacesDF("department")==="qa", lit("1")).otherwise(lit("0")));

resultDF.show();

输出:

+---+-------+------+-------+---------------+----+
| id|   name|   sex|country|     department|isQA|
+---+-------+------+-------+---------------+----+
|  1|  Jerry|   man|    USA|      developer|   0|
|  2|  Cathy|female|    GBR|             qa|   1|
|  3| Teresa|female|    GBR|product_manager|   0|
|  4|    Rut|female|    USA|             qa|   1|
|  5| Roasie|female|    AUS|             qa|   1|
|  6|  Garry|   man|    GBR|      developer|   0|
|  7|   Adam|   man|    GER|    team_leader|   0|
|  8|   John|   man|    GBR|      developer|   0|
|  9|Jerremy|   man|    AUS|    team_leader|   0|
| 10| Angela|female|    AUS|             qa|   1|
| 11| Ivanka|female|    USA|      developer|   0|
+---+-------+------+-------+---------------+----+

任务9.哪个部门的工作人员最多(scala语法)

这是经典的简单SQL任务,Spark解决方案看起来很相似。我们需要按“部门”对行进行分组,然后对该组执行计数,然后进行排序。

val resultDF = workerDF.join(workplacesDF, Seq("id")).
groupBy("department").agg(count("*").as("dep_workers")).orderBy(desc("dep_workers"));

resultDF.limit(1).show();

现在输出应该是:

+----------+-----------+
|department|dep_workers|
+----------+-----------+
| developer|          4|
+----------+-----------+

任务10.哪个部门拥有最多的工作人员(SQL语法)

如果你是SQL熟练的开发人员,那么就没有必要评论这个解决方案......

val resultDF = sqlContext.sql("select wp.department, count(*) as dep_workers from 
worker w INNER JOIN workplaces wp ON w.id = wp.id GROUP BY wp.department");

resultDF.limit(1).show();

输出:

+----------+-----------+
|department|dep_workers|
+----------+-----------+
| developer|       4|
+----------+-----------+