始终考虑将RDD转换为Dataframe数据帧,执行请求的操作并保存结果......这是节省时间最多的方法。那么让我们深入研究Dataframe数据帧测试
数据workers:
1,Jerry,man,USA |
每个工人workers都有工作场所workplace :
1,developer |
import org.apache.spark.sql.functions._; |
任务1.内联接表达式
第一种可能的解决方案是使用join 表达式的join 函数:
val allDF = workerDF.join(workplacesDF,workerDF(“id”)=== workplacesDF(“id”)); |
输出将是:
+---+-------+------+-------+---+---------------+ |
那么输出是完全正确的...... 但是有两次列出的join 列 ...... 让我们看看如何解决这个问题....
任务2.仅列出一次join 列的内部联接
以前的输出是正确的,但如何删除重复连接列?只是...
val resultDF = workerDF.join(workplacesDF, Seq("id")); |
输出将是:
+---+-------+------+-------+---------------+ |
任务3.使用select projection的内部连接
另一种解决方案是使用连接表达式 并使用select projection确定输出列...
val resultDF = workerDF.join(workplacesDF, workerDF("id")===workplacesDF("id"), "inner"). |
输出应该是:
+---+-------+-------+---------------+ |
任务4.使用SQL查询的内连接
如果使用函数不合适,可以使用SQL查询来获取数据:
workerDF.registerTempTable("worker"); |
输出:
+------+---+-------+-------+---------------+ |
正如你所看到的,我们真的执行了LEFT JOIN,因为Melania没有部门。
任务5.内部联接和过滤功能(Scala表达式)
让我们联系工人和他们的部门,并列出只有“QA”的人......
val resultDF = workerDF.join(workplacesDF,Seq(“id”))。filter(workplacesDF(“department”)===“qa”); |
输出:
+---+------+------+-------+----------+ |
任务6.内部联接和过滤功能(SQL表达式)
同样,如果您对Scala语法的过滤函数不满意,可以使用类似SQL的语法:
val resultDF = workerDF.join(workplacesDF, Seq("id")).filter("department = 'qa'"); |
不知何故,我更喜欢scala语法,但输出再次相同:
+---+------+------+-------+----------+ |
任务7.显示没有部门的工人
为了获得没有部门的工人,我们需要执行LEFT JOIN并仅过滤具有NULL部门的行 :
val resultDF = workerDF.join(workplacesDF, Seq("id"), "left").filter(workplacesDF("department").isNull); |
输出:
+---+-------+------+-------+----------+ |
任务8.如果worker是“qa”,添加名为“isQA”的列为“1”
要解决这个任务,我们将使用带有when子句的withColumn函数:
val resultDF = workerDF.join(workplacesDF, Seq("id")).withColumn("isQA", |
输出:
+---+-------+------+-------+---------------+----+ |
任务9.哪个部门的工作人员最多(scala语法)
这是经典的简单SQL任务,Spark解决方案看起来很相似。我们需要按“部门”对行进行分组,然后对该组执行计数,然后进行排序。
val resultDF = workerDF.join(workplacesDF, Seq("id")). |
现在输出应该是:
+----------+-----------+ |
任务10.哪个部门拥有最多的工作人员(SQL语法)
如果你是SQL熟练的开发人员,那么就没有必要评论这个解决方案......
val resultDF = sqlContext.sql("select wp.department, count(*) as dep_workers from |
输出:
+----------+-----------+ |