Spark GraphX简介与教程


GraphX是 Apache Spark 用于图形和图形并行计算的 API。GraphX 在单个系统内统一了 ETL(提取、转换和加载)过程、探索性分析和迭代图计算。图的用法可以在 Facebook 的朋友、LinkedIn 的连接、互联网的路由器、天体物理学中星系和恒星之间的关系以及谷歌的地图中看到。尽管图计算的概念看起来很简单,但图的应用在灾难检测、银行、股票市场、银行和地理系统中的用例实际上是无限的.
  
什么是图Graph?
图是一种数学结构,相当于一组对象,其中一些对象对在某种意义上是相关的。这些关系可以使用形成图的边和顶点来表示。顶点代表对象,边显示这些对象之间的各种关系。
在计算机科学中,图是一种抽象的数据类型,旨在实现数学中的无向图和有向图概念,特别是图论领域。图数据结构还可以将某些边值与每条边相关联,例如符号标签或数字属性(成本、容量、长度、等等。)。
 
图计算的用例
以下用例给出了图计算的视角以及使用图实现其他解决方案的进一步范围。

  1. 灾害检测系统  图可用于检测飓风、地震、海啸、森林火灾和火山等灾害,从而提供警告以提醒人们。
  2. Page Rank :Page Rank 可用于在任何网络(如论文引用网络或社交媒体网络)中寻找影响者。
  3. 金融欺诈检测 图分析可用于监控金融交易并检测参与金融欺诈和洗钱的人员。
  4. 商业分析 图表与机器学习一起使用时,有助于了解客户的购买趋势。例如优步、麦当劳等。
  5. 地理信息系统 图被广泛用于开发地理信息系统的功能,如流域划分和天气预报。
  6. 谷歌Pregel  Pregel 是 Google 的可扩展和容错平台,具有足够灵活的 API 来表达任意图 算法。

 
什么是 Spark GraphX?
GraphX是用于图形和图形并行计算的 Spark API。它包括越来越多的图形算法和构建器,以简化图形分析任务。
GraphX 使用弹性分布式属性图扩展了 Spark RDD。 属性图是一个有向多重图,可以有多个平行边。每条边和顶点都有与之关联的用户定义属性。平行边允许多个相同顶点之间的关系。
 
Spark GraphX 特性
以下是 Spark GraphX 的特点:
  1. 灵活性:Spark GraphX 适用于图形和计算。GraphX 在单个系统中统一了 ETL(提取、转换和加载)、探索性分析和迭代图计算。我们可以查看与图和集合相同的数据,使用 RDD 有效地转换和连接图,并使用 Pregel API 编写自定义迭代图算法。
  2. 速度:Spark GraphX 提供与最快的专业图形处理系统相当的性能。它可与最快的图形系统相媲美,同时保留了 Spark 的灵活性、容错性和易用性。
  3. 不断增长的算法库:我们可以从 Spark GraphX 提供的不断增长的图算法库中进行选择。一些流行的算法是页面排名、连通分量、标签传播、S​​VD++、强连通分量和三角形计数。

  
用例子理解GraphX
我们现在将通过一个例子来理解 Spark GraphX 的概念。

查看图表,我们可以提取有关人(顶点Vertices)和他们之间的关系(边)的信息。此处的图表代表 Twitter 用户以及他们在 Twitter 上关注的人。例如,Bob 在 Twitter 上关注了 Davide 和 Alice。
 
让我们使用 Apache Spark 实现相同的功能。首先,我们将为 GraphX 导入必要的类。
//Importing the necessary classes 
import org.apache.spark._
import org.apache.spark.rdd.RDD 
import org.apache.spark.util.IntParam 
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators 

显示顶点Vertices: 此外,我们现在将显示用户(顶点Vertices)的所有姓名和年龄。

val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
graph.vertices.filter { case (id, (name, age)) => age > 30 }
.collect.foreach { case (id, (name, age)) => println(s"$name is $age")}

上述代码的输出如下:

David is 42 
Fran is 50 
Ed is 55 
Charlie is 65

显示边:让我们看看推特上谁喜欢谁。

for (triplet <- graph.triplets.collect)
{
println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")
}

输出结果:

Bob likes Alice
Bob likes David
Charlie likes Bob
Charlie likes Fran
David likes Alice
Ed likes Bob
Ed likes Charlie
Ed likes Fran

现在我们已经了解了 GraphX 的基础知识,让我们深入一点,并对其进行一些高级计算。
关注者数量:我们图表中的每个用户都有不同数量的关注者。让我们看看每个用户的所有关注者。

// Defining a class to more clearly model the user property
case class User(name: String, age: Int, inDeg: Int, outDeg: Int)
// Creating a user Graph
val initialUserGraph: Graph[User, Int] = graph.mapVertices{ case (id, (name, age)) => User(name, age, 0, 0) }
 
// Filling in the degree information
val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {
case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)
}.outerJoinVertices(initialUserGraph.outDegrees) {
case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg, outDegOpt.getOrElse(0))
}
for ((id, property) <- userGraph.vertices.collect) {
println(s
"User $id is called ${property.name} and is liked by ${property.inDeg} people.")
}

上述代码的输出如下:

用户 1名为Alice ,被2个人喜欢。
用户 2名为Bob ,被2个人喜欢。
用户 3名为Charlie ,被1人喜欢。
用户 4名为David ,被1人喜欢。
用户 5名为Ed ,被0人喜欢。
用户 6名为Fran ,被点赞2人。


最老的追随者:我们还可以根据追随者的特征对追随者进行排序。让我们按年龄找出每个用户的最老粉丝。

// Finding the oldest follower for each user
val oldestFollower: VertexRDD[(String, Int)] = userGraph.mapReduceTriplets[(String, Int)](
// For each edge send a message to the destination vertex with the attribute of the source vertex
 edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),
 
// To combine messages take the message for the older follower
 (a, b) => if (a._2 > b._2) a else b
 )

上述代码的输出如下:

大卫是爱丽丝最年长的追随者。
查理是鲍勃最年长的追随者。
Ed是查理最年长的追随者。
鲍勃是大卫最年长的追随者。
Ed没有任何追随者。
查理是最古老的追随者弗兰。

 
用例:使用 Spark GraphX 进行飞行数据分析
现在我们已经了解了 Spark GraphX 的核心概念,让我们使用 GraphX 解决一个现实生活中的问题。这将有助于让我们有信心在未来开展任何 Spark 项目。
问题陈述:要使用 Spark GraphX 分析实时航班数据,提供近乎实时的计算结果并使用 Google Data Studio 将结果可视化。

  • 计算航线总数
  • 计算并排序最长的航线
  • 显示顶点度数最高的机场
  • 根据 PageRank 列出最重要的机场
  • 列出飞行成本最低的航线

我们将使用 Spark GraphX 进行上述计算,并使用 Google Data Studio 将结果可视化。
。。。。
详细点击标题