本篇內(nèi)容主要講解“GraphX的基礎(chǔ)知識(shí)有哪些”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“GraphX的基礎(chǔ)知識(shí)有哪些”吧!
創(chuàng)新互聯(lián)公司自2013年創(chuàng)立以來(lái),先為徐水等服務(wù)建站,徐水等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為徐水企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問(wèn)題。
Spark GraphX是一個(gè)分布式圖處理框架,Spark GraphX基于Spark平臺(tái)提供對(duì)圖計(jì)算和圖挖掘簡(jiǎn)潔易用的而豐富多彩的接口,極大的方便了大家對(duì)分布式圖處理的需求。Spark GraphX由于底層是基于Spark來(lái)處理的,所以天然就是一個(gè)分布式的圖處理系統(tǒng)。圖的分布式或者并行處理其實(shí)是把這張圖拆分成很多的子圖,然后我們分別對(duì)這些子圖進(jìn)行計(jì)算,計(jì)算的時(shí)候可以分別迭代進(jìn)行分階段的計(jì)算,即對(duì)圖進(jìn)行并行計(jì)算。
設(shè)計(jì)GraphX時(shí),點(diǎn)分割和GAS都已成熟,在設(shè)計(jì)和編碼中針對(duì)它們進(jìn)行了優(yōu)化,并在功能和性能之間尋找最佳的平衡點(diǎn)。如同Spark本身,每個(gè)子模塊都有一個(gè)核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一種點(diǎn)和邊都帶屬性的有向多重圖。它擴(kuò)展了Spark RDD的抽象,有Table和Graph兩種視圖,而只需要一份物理存儲(chǔ)。兩種視圖都有自己獨(dú)有的操作符,從而獲得了靈活操作和執(zhí)行效率。
類成員
在GraphX中,圖的基礎(chǔ)類為Garph,它包含兩個(gè)RDD:一個(gè)為邊RDD,另一個(gè)為頂點(diǎn)RDD??梢杂媒o定的邊RDD和頂點(diǎn)RDD構(gòu)建一個(gè)圖。一旦構(gòu)建好圖,就可以用edges()和vertices()來(lái)訪問(wèn)邊和頂點(diǎn)的集合。VD和ED代表了用戶自定義的頂點(diǎn)和邊類,對(duì)應(yīng)的圖是參數(shù)化類型的泛類型Graph[VD,ED]。GraphX中圖必須要有頂點(diǎn)和邊屬性。GraphX中Vertice和Edge持有VerticeId值,而不是頂點(diǎn)的引用。圖在集群中是分布式存儲(chǔ)的,不屬于單個(gè)JVM,因此一條邊的頂點(diǎn)可能在不同的集群節(jié)點(diǎn)上。
頂點(diǎn): Vertice(VertexId, VD)
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)]
抽象值成員innerJoin
leftJoin
mapValues
···
具體值成員collect
count
distinct
filter
foreach
groupBy
isEmpty
persist
map
reduce
sortBy
toString
···
邊: Edge(VertexId, VertexId, ED)
class Edge[ED](srcId:VertexId, dstId:VertexId, attire:E
abstract class EdgeRDD[ED] extends RDD[Edge[ED]]
class EdgeTriplet[VD, ED] extends Edge[ED]
值成員Attr
srcId
srcAttr
dstId
dstAttr
抽象值成員innerJoin
mapValues
reverse
具體值成員++
aggregate
cache
collect
count
distinct
filter
foreach
groupBy
isEmpty
map
persist
reduce
sortBy
toString
···
圖: Graph(VD, ED)
abstract class Graph[VD,ED] extend Serializable
class GraphOps[VD,ED] extends Serializable
值成員collectEdges
collectNeiborIds
collectNeibors
degrees
filter
inDegrees
joinVertices
numEdges
numVertices
outDegrees
pageRank
personalizedPageRank
pickRandomVertex
pregel
triangleCount
···
抽象值成員cache
edges
mapEdges
mapTriplets
mapVertices
mask
outerJoinVertices
persist
reverse
subgraph
triplets
vertices
···
具體值成員aggregateMessages
mapEdges
mapTriplets
···
GraphX實(shí)例
引用
import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD
構(gòu)圖
有很多方式從一個(gè)原始文件、RDD構(gòu)造一個(gè)屬性圖。最一般的方法是利用Graph object。 下面的代碼從RDD集合生成屬性圖。
// 假設(shè)SparkContext已經(jīng)被構(gòu)造 val sc: SparkContext // 創(chuàng)建點(diǎn)RDD val users: RDD[(VertexId, (String, String))] =sc.parallelize( Array((3L, ("rxin", "student")), (7L, ("jgonzal","postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // 創(chuàng)建邊RDD val relationships: RDD[Edge[String]] = sc.parallelize( Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi") Edge(5L, 0L, "colleague"))) // 定義一個(gè)默認(rèn)用戶,避免有不存在用戶的關(guān)系 val defaultUser = ("John Doe", "Missing") // 構(gòu)造Graph val graph = Graph(users, relationships, defaultUser)
緩存
//緩存。默認(rèn)情況下,緩存在內(nèi)存的圖會(huì)在內(nèi)存緊張的時(shí)候被強(qiáng)制清理,采用的是LRU算法 graph.cache() graph.persist(StorageLevel.MEMORY_ONLY) graph.unpersistVertices(true)
點(diǎn)、邊和三元組
下面的代碼用到了Edge樣本類。邊有一個(gè)srcId和dstId分別對(duì)應(yīng)于源和目標(biāo)頂點(diǎn)的標(biāo)示符。另外,Edge類有一個(gè)attr成員用來(lái)存儲(chǔ)邊屬性??梢苑謩e用graph.vertices和graph.edges成員將一個(gè)圖解構(gòu)為相應(yīng)的頂點(diǎn)和邊。graph.vertices返回一個(gè)VertexRDD[(String, String)],它繼承于 RDD[(VertexID, (String, String))]。所以我們可以用scala的case表達(dá)式解構(gòu)這個(gè)元組。另一方面,graph.edges返回一個(gè)包含Edge[String]對(duì)象的EdgeRDD,我們也可以用到case類的類型構(gòu)造器。
除了屬性圖的頂點(diǎn)和邊視圖,GraphX也包含了一個(gè)三元組視圖,三元視圖邏輯上將頂點(diǎn)和邊的屬性保存為一個(gè)RDD[EdgeTriplet[VD, ED]],它包含EdgeTriplet類的實(shí)例。EdgeTriplet類繼承于Edge類,并且加入了srcAttr和dstAttr成員,這兩個(gè)成員分別包含源和目的的屬性。我們可以用一個(gè)三元組視圖渲染字符串集合用來(lái)描述用戶之間的關(guān)系。
// 找出職業(yè)為postdoc的人 graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.collect // 計(jì)算源頂點(diǎn)ID大于目標(biāo)頂點(diǎn)ID的邊的數(shù)量 graph.edges.filter(e => e.srcId > e.dstId).count graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count // 使用三元組視圖描述關(guān)系事實(shí) val facts: RDD[String] = graph.triplets.map(triplet =>triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1) facts.collect.foreach(println(_))
度、入度、出度
正如RDDs有基本的操作map, filter和reduceByKey一樣,屬性圖也有基本的集合操作,這些操作采用用戶自定義的函數(shù)并產(chǎn)生包含轉(zhuǎn)換特征和結(jié)構(gòu)的新圖。定義在Graph中的 核心操作是經(jīng)過(guò)優(yōu)化的實(shí)現(xiàn)。表示為核心操作的組合的便捷操作定義在GraphOps中。然而, 因?yàn)橛蠸cala的隱式轉(zhuǎn)換,定義在GraphOps中的操作可以作為Graph的成員自動(dòng)使用。例如,我們可以通過(guò)下面的方式計(jì)算每個(gè)頂點(diǎn)(定義在GraphOps中)的入度。區(qū)分核心圖操作和GraphOps的原因是為了在將來(lái)支持不同的圖表示。每個(gè)圖表示都必須提供核心操作的實(shí)現(xiàn)并重用很多定義在GraphOps中的有用操作。
val degrees: VertexRDD[Int] = graph.degrees; degrees.collect().foreach(println) val inDegrees: VertexRDD[Int] = graph.inDegrees inDegrees.collect().foreach(println) val outDegrees: VertexRDD[Int] = graph.outDegrees outDegrees.collect().foreach(println)
屬性操作:修改頂點(diǎn)和邊的屬性
屬性操作每個(gè)操作都產(chǎn)生一個(gè)新的圖,這個(gè)新的圖包含通過(guò)用戶自定義的map操作修改后的頂點(diǎn)或邊的屬性。Map操作根據(jù)原圖的一些特性得到新圖,原圖結(jié)構(gòu)是不變的。這些操作的一個(gè)重要特征是它允許所得圖形重用原有圖形的結(jié)構(gòu)索引(indices)。下面的兩行代碼在邏輯上是等價(jià)的,但是第一個(gè)不是圖操作,它不保存結(jié)構(gòu)索引,所以不會(huì)從GraphX系統(tǒng)優(yōu)化中受益。Map操作根據(jù)原圖的一些特性得到新圖,原圖結(jié)構(gòu)是不變的。這些操作經(jīng)常用來(lái)初始化的圖形,用作特定計(jì)算或者用來(lái)處理項(xiàng)目不需要的屬性。例如,給定一個(gè)圖,這個(gè)圖的頂點(diǎn)特征包含出度,我們?yōu)镻ageRank初始化它。
//頂點(diǎn)轉(zhuǎn)換,頂點(diǎn)age+1 //RDD操作,再構(gòu)造新圖,不保存結(jié)構(gòu)索引,不會(huì)被系統(tǒng)優(yōu)化 val newVertices = graph.vertices.map { case (id, attr) => (id, (attr._1 + "-1", attr._2 + "-2")) } val newGraph2 = Graph(newVertices, graph.edges) //圖Map操作,被系統(tǒng)優(yōu)化 val newGraph3 = graph.mapVertices((id, attr) => (id, (attr._1 + "-1", attr._2 + "-2")))
//構(gòu)造一個(gè)新圖,頂點(diǎn)屬性是出度 val inputGraph: Graph[Int, String] = graph.outerJoinVertices( graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0)) //根據(jù)頂點(diǎn)屬性為出度的圖構(gòu)造一個(gè)新圖,依據(jù)PageRank算法初始化邊與點(diǎn) val outputGraph: Graph[Double, Double] =inputGraph.mapTriplets( triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
//創(chuàng)建一個(gè)新圖,頂點(diǎn) VD 的數(shù)據(jù)類型為 User,并從 graph 做類型轉(zhuǎn)換 case class User(name: String, pos: String, inDeg: Int, outDeg: Int) val initialUserGraph: Graph[User, String] = graph.mapVertices { case (id, (name, age)) => User(name, pos, 0, 0)} //initialUserGraph 與 inDegrees、outDegrees(RDD)進(jìn)行連接,并修改 initialUserGraph中 inDeg 值、outDeg 值 val userGraph = initialUserGraph.outerJoinVertices( initialUserGraph.inDegrees) { case (id, u, inDegOpt) => User(u.name, u.pos, inDegOpt.getOrElse(0), u.outDeg) }.outerJoinVertices( initialUserGraph.outDegrees) { case (id, u, outDegOpt) => User(u.name, u.pos, u.inDeg, outDegOpt.getOrElse(0)) } userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg} outDeg: ${v._2.outDeg}")) //出度和入讀相同的人員 userGraph.vertices.filter { case (id, u) => u.inDeg == u.outDeg }.collect.foreach { case (id, property) => println(property.name) }
自定義類型
Join 操作
map 操作
結(jié)構(gòu)操作
//由已定義的頂點(diǎn)構(gòu)成的子圖 val subGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing" ) subGraph.vertices.collect().foreach(println(_)) subGraph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1).collect().foreach(println(_))
//圖的反向操作,新的圖形的所有邊的方向相反,不修改頂點(diǎn)或邊性屬性、不改變的邊的數(shù)目,它可以有效地實(shí)現(xiàn)不必要的數(shù)據(jù)移動(dòng)或復(fù)制 var rGraph = graph.reverse
//Mask操作也是根據(jù)輸入圖構(gòu)造一個(gè)新圖,達(dá)到一個(gè)限制制約的效果 val ccGraph = graph.connectedComponents() val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing") val validCCGraph = ccGraph.mask(validGraph)
Mask
圖反向
子圖
聚合操作
//計(jì)算年齡大于自己的關(guān)注者的總?cè)藬?shù)和總年齡 val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)]( //Map函數(shù) triplet => { if (triplet.srcAttr > triplet.dstAttr) { Iterator((triplet.dstId, (1, triplet.srcAttr))) } else { Iterator.empty } }, //Reduce函數(shù) (a, b) => (a._1 + b._1, a._2 + b._2) ) //計(jì)算年齡大于自己的關(guān)注者的平均年齡 val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues((id, value) => value match {case (count, totalAge) => totalAge / count }) avgAgeOfOlderFollowers.collect.foreach(println(_)) //定義一個(gè)Reduce函數(shù)來(lái)計(jì)算圖中較大度的點(diǎn) def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b } val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max) println(s"maxInDegree: $maxInDegree") val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) println(s"maxOutDegree: $maxOutDegree") val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max) println(s"maxDegrees: $maxDegrees")
相鄰聚合
//計(jì)算鄰居相關(guān)函數(shù),這些操作是相當(dāng)昂貴的,需要大量的重復(fù)信息作為他們的通信,因此相同的計(jì)算還是推薦用mapReduceTriplets val neighboorIds:VertexRDD[Array[VertexId]] = graph.collectNeighborIds(EdgeDirection.Out) val neighboors:VertexRDD[Array[(VertexId, Double)]]= graph.collectNeighbors(EdgeDirection.Out);
Pregel API
//Pregel API。計(jì)算單源最短路徑 //通過(guò)GraphGenerators構(gòu)建一個(gè)隨機(jī)圖 val numVertices = 100 val numEParts = 2 val mu = 4.0 val sigma = 1.3 val graph2 = GraphGenerators.logNormalGraph(sc, numVertices, numEParts, mu, sigma).mapEdges(e=> e.attr.toDouble) //定義一個(gè)源值 點(diǎn) val sourceId: VertexId = 42 //初始化圖的所有點(diǎn),除了與指定的源值點(diǎn)相同值的點(diǎn)為0.0以外,其他點(diǎn)為無(wú)窮大 val initialGraph = graph2.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) //Pregel有兩個(gè)參數(shù)列表,第一個(gè)參數(shù)列表包括的是:初始化消息、迭代較大數(shù)、邊的方向(Out)。第二個(gè)參數(shù)列表包括的是:用戶定義的接受消息、計(jì)算消息、聯(lián)合合并消息的函數(shù)。 val sssp = initialGraph.pregel(Double.PositiveInfinity)( //點(diǎn)程序 (id, dist, newDist) => math.min(dist, newDist), //發(fā)送消息 triplet => { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, //合并消息 (a, b) => math.min(a, b) ) println(sssp.vertices.collect.mkString("\n"))
主要圖算法
val pageRankGraph = graph2.pageRank(0.001) pageRankGraph.vertices.sortBy(_._2,false).saveAsTextFile( "/user/hadoop/data/temp/graph/graph.pr") pageRankGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
val connectedComponentsGraph = graph2.connectedComponents() connectedComponentsGraph.vertices.sortBy(_._2, false).saveAsTextFile( "/user/hadoop/data/temp/graph/graph.cc") connectedComponentsGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
//TriangleCount主要用途之一是用于社區(qū)發(fā)現(xiàn) 保持sourceId小于destId val graph3 = GraphLoader.edgeListFile(sc, path, true) val triangleCountGraph = graph3.triangleCount() triangleCountGraph.vertices.sortBy(_._2,false).saveAsTextFile( "/user/hadoop/data/temp/graph/graph.tc") triangleCountGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)
TriangleCount
Connected Components
PageRank
其他操作
var path = "/user/Hadoop/data/temp/graph/graph.txt" var minEdgePartitions = 1 var canonicalOrientation = false // if sourceId < destId this value is true val graph2 = GraphLoader.edgeListFile(sc, path, canonicalOrientation, minEdgePartitions,StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY)
//通過(guò)GraphGenerators構(gòu)建一個(gè)隨機(jī)圖 val numVertices = 100 val numEParts = 2 val mu = 4.0 val sigma = 1.3 val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc, numVertices, numEParts, mu, sigma).mapVertices((id, _) => id.toDouble) graph.triplets.collect.foreach(triplet => println(triplet.srcId + "-" + triplet.srcAttr + "-" + triplet.attr + "-" + triplet.dstId + "-" + triplet.dstAttr))
val setA: VertexRDD[Int] = VertexRDD( sc.parallelize(0L until 100L).map(id => (id, 1))) val rddB: RDD[(VertexId, Double)] = sc.parallelize( 0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0))) val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _) val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
到此,相信大家對(duì)“GraphX的基礎(chǔ)知識(shí)有哪些”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
新聞標(biāo)題:GraphX的基礎(chǔ)知識(shí)有哪些
文章位置:http://chinadenli.net/article44/gopjhe.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供小程序開發(fā)、面包屑導(dǎo)航、用戶體驗(yàn)、定制網(wǎng)站、自適應(yīng)網(wǎng)站、企業(yè)建站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
網(wǎng)頁(yè)設(shè)計(jì)公司知識(shí)