欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

GraphX的基礎(chǔ)知識(shí)有哪些

本篇內(nèi)容主要講解“GraphX的基礎(chǔ)知識(shí)有哪些”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“GraphX的基礎(chǔ)知識(shí)有哪些”吧!

創(chuàng)新互聯(lián)公司自2013年創(chuàng)立以來(lái),先為徐水等服務(wù)建站,徐水等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為徐水企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問(wèn)題。

        Spark GraphX是一個(gè)分布式圖處理框架,Spark GraphX基于Spark平臺(tái)提供對(duì)圖計(jì)算和圖挖掘簡(jiǎn)潔易用的而豐富多彩的接口,極大的方便了大家對(duì)分布式圖處理的需求。Spark GraphX由于底層是基于Spark來(lái)處理的,所以天然就是一個(gè)分布式的圖處理系統(tǒng)。圖的分布式或者并行處理其實(shí)是把這張圖拆分成很多的子圖,然后我們分別對(duì)這些子圖進(jìn)行計(jì)算,計(jì)算的時(shí)候可以分別迭代進(jìn)行分階段的計(jì)算,即對(duì)圖進(jìn)行并行計(jì)算。

       設(shè)計(jì)GraphX時(shí),點(diǎn)分割和GAS都已成熟,在設(shè)計(jì)和編碼中針對(duì)它們進(jìn)行了優(yōu)化,并在功能和性能之間尋找最佳的平衡點(diǎn)。如同Spark本身,每個(gè)子模塊都有一個(gè)核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一種點(diǎn)和邊都帶屬性的有向多重圖。它擴(kuò)展了Spark RDD的抽象,有Table和Graph兩種視圖,而只需要一份物理存儲(chǔ)。兩種視圖都有自己獨(dú)有的操作符,從而獲得了靈活操作和執(zhí)行效率。

GraphX基礎(chǔ)

類成員

       在GraphX中,圖的基礎(chǔ)類為Garph,它包含兩個(gè)RDD:一個(gè)為邊RDD,另一個(gè)為頂點(diǎn)RDD??梢杂媒o定的邊RDD和頂點(diǎn)RDD構(gòu)建一個(gè)圖。一旦構(gòu)建好圖,就可以用edges()和vertices()來(lái)訪問(wèn)邊和頂點(diǎn)的集合。VD和ED代表了用戶自定義的頂點(diǎn)和邊類,對(duì)應(yīng)的圖是參數(shù)化類型的泛類型Graph[VD,ED]。GraphX中圖必須要有頂點(diǎn)和邊屬性。GraphX中Vertice和Edge持有VerticeId值,而不是頂點(diǎn)的引用。圖在集群中是分布式存儲(chǔ)的,不屬于單個(gè)JVM,因此一條邊的頂點(diǎn)可能在不同的集群節(jié)點(diǎn)上。

  • 頂點(diǎn): Vertice(VertexId, VD)       

    abstract class VertexRDD[VD] extends RDD[(VertexId, VD)]

     

    • 抽象值成員
      innerJoin leftJoin mapValues ···

    • 具體值成員
      collect count distinct filter foreach groupBy isEmpty persist map reduce sortBy toString ···

  • 邊: Edge(VertexId, VertexId, ED)   

    class Edge[ED](srcId:VertexId, dstId:VertexId, attire:E

    abstract class EdgeRDD[ED] extends RDD[Edge[ED]]

     

    class EdgeTriplet[VD, ED] extends Edge[ED]

     

    • 值成員
      Attr srcId srcAttr dstId dstAttr

    • 抽象值成員
      innerJoin mapValues reverse

    • 具體值成員
      ++ aggregate cache collect count distinct filter foreach groupBy isEmpty map persist reduce sortBy toString ···

  • 圖: Graph(VD, ED)       

    abstract class Graph[VD,ED] extend Serializable

     

    class GraphOps[VD,ED] extends Serializable

     

    • 值成員
      collectEdges collectNeiborIds collectNeibors degrees filter inDegrees joinVertices numEdges numVertices outDegrees pageRank personalizedPageRank pickRandomVertex pregel triangleCount ···

    • 抽象值成員
      cache edges mapEdges mapTriplets mapVertices mask outerJoinVertices persist reverse subgraph triplets vertices ···

    • 具體值成員
      aggregateMessages mapEdges mapTriplets ···

GraphX實(shí)例

GraphX的基礎(chǔ)知識(shí)有哪些

  • 引用

    import org.apache.spark._
    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD

  • 構(gòu)圖

           有很多方式從一個(gè)原始文件、RDD構(gòu)造一個(gè)屬性圖。最一般的方法是利用Graph object。 下面的代碼從RDD集合生成屬性圖。

    // 假設(shè)SparkContext已經(jīng)被構(gòu)造
    val sc: SparkContext
    // 創(chuàng)建點(diǎn)RDD
    val users: RDD[(VertexId, (String, String))] =sc.parallelize(
                              Array((3L, ("rxin", "student")),
                                    (7L, ("jgonzal","postdoc")),
                                    (5L, ("franklin", "prof")), 
                                    (2L, ("istoica", "prof"))))
    // 創(chuàng)建邊RDD
    val relationships: RDD[Edge[String]] =  sc.parallelize(
                               Array(Edge(3L, 7L, "collab"),    
                                     Edge(5L, 3L, "advisor"),
                                     Edge(2L, 5L, "colleague"), 
                                     Edge(5L, 7L, "pi")
                                     Edge(5L, 0L, "colleague")))
    // 定義一個(gè)默認(rèn)用戶,避免有不存在用戶的關(guān)系
    val defaultUser = ("John Doe", "Missing")
    // 構(gòu)造Graph
    val graph = Graph(users, relationships, defaultUser)

  • 緩存

      //緩存。默認(rèn)情況下,緩存在內(nèi)存的圖會(huì)在內(nèi)存緊張的時(shí)候被強(qiáng)制清理,采用的是LRU算法
      graph.cache()
      graph.persist(StorageLevel.MEMORY_ONLY)
      graph.unpersistVertices(true)

  • 點(diǎn)、邊和三元組

          下面的代碼用到了Edge樣本類。邊有一個(gè)srcId和dstId分別對(duì)應(yīng)于源和目標(biāo)頂點(diǎn)的標(biāo)示符。另外,Edge類有一個(gè)attr成員用來(lái)存儲(chǔ)邊屬性??梢苑謩e用graph.vertices和graph.edges成員將一個(gè)圖解構(gòu)為相應(yīng)的頂點(diǎn)和邊。graph.vertices返回一個(gè)VertexRDD[(String, String)],它繼承于 RDD[(VertexID, (String, String))]。所以我們可以用scala的case表達(dá)式解構(gòu)這個(gè)元組。另一方面,graph.edges返回一個(gè)包含Edge[String]對(duì)象的EdgeRDD,我們也可以用到case類的類型構(gòu)造器。

           除了屬性圖的頂點(diǎn)和邊視圖,GraphX也包含了一個(gè)三元組視圖,三元視圖邏輯上將頂點(diǎn)和邊的屬性保存為一個(gè)RDD[EdgeTriplet[VD, ED]],它包含EdgeTriplet類的實(shí)例。EdgeTriplet類繼承于Edge類,并且加入了srcAttr和dstAttr成員,這兩個(gè)成員分別包含源和目的的屬性。我們可以用一個(gè)三元組視圖渲染字符串集合用來(lái)描述用戶之間的關(guān)系。

    // 找出職業(yè)為postdoc的人
    graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.collect
    // 計(jì)算源頂點(diǎn)ID大于目標(biāo)頂點(diǎn)ID的邊的數(shù)量
    graph.edges.filter(e => e.srcId > e.dstId).count
    graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
    // 使用三元組視圖描述關(guān)系事實(shí)
    val facts: RDD[String] = graph.triplets.map(triplet =>triplet.srcAttr._1 + 
                       " is the " + triplet.attr + " of " + triplet.dstAttr._1)
    facts.collect.foreach(println(_))

  • 度、入度、出度

           正如RDDs有基本的操作map, filter和reduceByKey一樣,屬性圖也有基本的集合操作,這些操作采用用戶自定義的函數(shù)并產(chǎn)生包含轉(zhuǎn)換特征和結(jié)構(gòu)的新圖。定義在Graph中的 核心操作是經(jīng)過(guò)優(yōu)化的實(shí)現(xiàn)。表示為核心操作的組合的便捷操作定義在GraphOps中。然而, 因?yàn)橛蠸cala的隱式轉(zhuǎn)換,定義在GraphOps中的操作可以作為Graph的成員自動(dòng)使用。例如,我們可以通過(guò)下面的方式計(jì)算每個(gè)頂點(diǎn)(定義在GraphOps中)的入度。區(qū)分核心圖操作和GraphOps的原因是為了在將來(lái)支持不同的圖表示。每個(gè)圖表示都必須提供核心操作的實(shí)現(xiàn)并重用很多定義在GraphOps中的有用操作。

    val degrees: VertexRDD[Int] = graph.degrees;
    degrees.collect().foreach(println)
    val inDegrees: VertexRDD[Int] = graph.inDegrees
    inDegrees.collect().foreach(println)
    val outDegrees: VertexRDD[Int] = graph.outDegrees
    outDegrees.collect().foreach(println)

  • 屬性操作:修改頂點(diǎn)和邊的屬性

           屬性操作每個(gè)操作都產(chǎn)生一個(gè)新的圖,這個(gè)新的圖包含通過(guò)用戶自定義的map操作修改后的頂點(diǎn)或邊的屬性。Map操作根據(jù)原圖的一些特性得到新圖,原圖結(jié)構(gòu)是不變的。這些操作的一個(gè)重要特征是它允許所得圖形重用原有圖形的結(jié)構(gòu)索引(indices)。下面的兩行代碼在邏輯上是等價(jià)的,但是第一個(gè)不是圖操作,它不保存結(jié)構(gòu)索引,所以不會(huì)從GraphX系統(tǒng)優(yōu)化中受益。Map操作根據(jù)原圖的一些特性得到新圖,原圖結(jié)構(gòu)是不變的。這些操作經(jīng)常用來(lái)初始化的圖形,用作特定計(jì)算或者用來(lái)處理項(xiàng)目不需要的屬性。例如,給定一個(gè)圖,這個(gè)圖的頂點(diǎn)特征包含出度,我們?yōu)镻ageRank初始化它。

    //頂點(diǎn)轉(zhuǎn)換,頂點(diǎn)age+1
    //RDD操作,再構(gòu)造新圖,不保存結(jié)構(gòu)索引,不會(huì)被系統(tǒng)優(yōu)化
    val newVertices = graph.vertices.map { case (id, attr) => 
                              (id, (attr._1 + "-1", attr._2 + "-2")) }
    val newGraph2 = Graph(newVertices, graph.edges)
    //圖Map操作,被系統(tǒng)優(yōu)化  
    val newGraph3 = graph.mapVertices((id, attr) => 
                               (id, (attr._1 + "-1", attr._2 + "-2")))

     

    //構(gòu)造一個(gè)新圖,頂點(diǎn)屬性是出度
    val inputGraph: Graph[Int, String] = graph.outerJoinVertices(
                   graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
    //根據(jù)頂點(diǎn)屬性為出度的圖構(gòu)造一個(gè)新圖,依據(jù)PageRank算法初始化邊與點(diǎn)
    val outputGraph: Graph[Double, Double] =inputGraph.mapTriplets(
                  triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

     

    //創(chuàng)建一個(gè)新圖,頂點(diǎn) VD 的數(shù)據(jù)類型為 User,并從 graph 做類型轉(zhuǎn)換
    case class User(name: String, pos: String, inDeg: Int, outDeg: Int)
    val initialUserGraph: Graph[User, String] = graph.mapVertices { 
                    case (id, (name, age)) => User(name, pos, 0, 0)}
    //initialUserGraph 與 inDegrees、outDegrees(RDD)進(jìn)行連接,并修改 initialUserGraph中 inDeg 值、outDeg 值
    val userGraph = initialUserGraph.outerJoinVertices(
          initialUserGraph.inDegrees) {
             case (id, u, inDegOpt) => 
                  User(u.name, u.pos, inDegOpt.getOrElse(0), u.outDeg)
             }.outerJoinVertices(
          initialUserGraph.outDegrees) {
             case (id, u, outDegOpt) => 
                  User(u.name, u.pos, u.inDeg, outDegOpt.getOrElse(0))
          }
    userGraph.vertices.collect.foreach(v => 
             println(s"${v._2.name} inDeg: ${v._2.inDeg} outDeg: ${v._2.outDeg}"))
    //出度和入讀相同的人員
    userGraph.vertices.filter {
          case (id, u) => u.inDeg == u.outDeg
        }.collect.foreach {
          case (id, property) => println(property.name)
        }

    • 自定義類型

    • Join 操作

    • map 操作

  • 結(jié)構(gòu)操作

    //由已定義的頂點(diǎn)構(gòu)成的子圖
    val subGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing" )
    subGraph.vertices.collect().foreach(println(_))
    subGraph.triplets.map(triplet => 
                   triplet.srcAttr._1 + " is the " + triplet.attr + 
                   " of " + triplet.dstAttr._1).collect().foreach(println(_))

     

    //圖的反向操作,新的圖形的所有邊的方向相反,不修改頂點(diǎn)或邊性屬性、不改變的邊的數(shù)目,它可以有效地實(shí)現(xiàn)不必要的數(shù)據(jù)移動(dòng)或復(fù)制        
      var rGraph = graph.reverse

     

    //Mask操作也是根據(jù)輸入圖構(gòu)造一個(gè)新圖,達(dá)到一個(gè)限制制約的效果
    val ccGraph = graph.connectedComponents()
    val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
    val validCCGraph = ccGraph.mask(validGraph)

    • Mask

    • 圖反向

    • 子圖

  • 聚合操作

    //計(jì)算年齡大于自己的關(guān)注者的總?cè)藬?shù)和總年齡
    val olderFollowers: VertexRDD[(Int, Double)] = 
                             graph.mapReduceTriplets[(Int, Double)](
      //Map函數(shù)
        triplet => {
          if (triplet.srcAttr > triplet.dstAttr) {
            Iterator((triplet.dstId, (1, triplet.srcAttr)))
          } else {
            Iterator.empty
          }
        },
        //Reduce函數(shù)
        (a, b) => (a._1 + b._1, a._2 + b._2)
      )
    //計(jì)算年齡大于自己的關(guān)注者的平均年齡
    val avgAgeOfOlderFollowers: VertexRDD[Double] =
            olderFollowers.mapValues((id, value) => 
                 value match {case (count, totalAge) => totalAge / count })
    avgAgeOfOlderFollowers.collect.foreach(println(_))
      
    //定義一個(gè)Reduce函數(shù)來(lái)計(jì)算圖中較大度的點(diǎn)
    def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
        if (a._2 > b._2) a else b
      }
    val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
    println(s"maxInDegree: $maxInDegree")
    val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
    println(s"maxOutDegree: $maxOutDegree")
    val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
    println(s"maxDegrees: $maxDegrees")

  • 相鄰聚合

    //計(jì)算鄰居相關(guān)函數(shù),這些操作是相當(dāng)昂貴的,需要大量的重復(fù)信息作為他們的通信,因此相同的計(jì)算還是推薦用mapReduceTriplets 
    val neighboorIds:VertexRDD[Array[VertexId]] =
                          graph.collectNeighborIds(EdgeDirection.Out)
    val neighboors:VertexRDD[Array[(VertexId, Double)]]=
                              graph.collectNeighbors(EdgeDirection.Out);

  • Pregel API

    //Pregel API。計(jì)算單源最短路徑
    //通過(guò)GraphGenerators構(gòu)建一個(gè)隨機(jī)圖
    val numVertices = 100
    val numEParts = 2
    val mu = 4.0
    val sigma = 1.3
    val graph2 = GraphGenerators.logNormalGraph(sc, numVertices, 
                      numEParts, mu, sigma).mapEdges(e=> e.attr.toDouble)
    //定義一個(gè)源值 點(diǎn)
    val sourceId: VertexId = 42
    //初始化圖的所有點(diǎn),除了與指定的源值點(diǎn)相同值的點(diǎn)為0.0以外,其他點(diǎn)為無(wú)窮大
    val initialGraph = graph2.mapVertices((id, _) => 
                     if (id == sourceId) 0.0 else Double.PositiveInfinity)
    //Pregel有兩個(gè)參數(shù)列表,第一個(gè)參數(shù)列表包括的是:初始化消息、迭代較大數(shù)、邊的方向(Out)。第二個(gè)參數(shù)列表包括的是:用戶定義的接受消息、計(jì)算消息、聯(lián)合合并消息的函數(shù)。
    val sssp = initialGraph.pregel(Double.PositiveInfinity)(
        //點(diǎn)程序
        (id, dist, newDist) => math.min(dist, newDist),
        //發(fā)送消息
        triplet => {
          if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
            Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
          } else {
            Iterator.empty
          }
        },
        //合并消息
        (a, b) => math.min(a, b)
        )
    println(sssp.vertices.collect.mkString("\n"))

  • 主要圖算法

    val pageRankGraph = graph2.pageRank(0.001)
    pageRankGraph.vertices.sortBy(_._2,false).saveAsTextFile(
                              "/user/hadoop/data/temp/graph/graph.pr")
    pageRankGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)

     

    val connectedComponentsGraph = graph2.connectedComponents()
    connectedComponentsGraph.vertices.sortBy(_._2, false).saveAsTextFile(
                                  "/user/hadoop/data/temp/graph/graph.cc")
    connectedComponentsGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)

     

    //TriangleCount主要用途之一是用于社區(qū)發(fā)現(xiàn) 保持sourceId小于destId
    val graph3 = GraphLoader.edgeListFile(sc, path, true)
    val triangleCountGraph = graph3.triangleCount()
    triangleCountGraph.vertices.sortBy(_._2,false).saveAsTextFile(
                                 "/user/hadoop/data/temp/graph/graph.tc")
    triangleCountGraph.vertices.top(5)(Ordering.by(_._2)).foreach(println)

    • TriangleCount

    • Connected Components

    • PageRank

  • 其他操作

    var path = "/user/Hadoop/data/temp/graph/graph.txt"
    var minEdgePartitions = 1
    var canonicalOrientation = false // if sourceId < destId this value is true
    val graph2 = GraphLoader.edgeListFile(sc, path, canonicalOrientation,
            minEdgePartitions,StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY)

     

    //通過(guò)GraphGenerators構(gòu)建一個(gè)隨機(jī)圖
    val numVertices = 100
    val numEParts = 2
    val mu = 4.0
    val sigma = 1.3
    val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc, 
          numVertices, numEParts, mu, sigma).mapVertices((id, _) => id.toDouble)
    graph.triplets.collect.foreach(triplet => 
            println(triplet.srcId + "-" + triplet.srcAttr + "-" + triplet.attr +
                       "-" + triplet.dstId + "-" + triplet.dstAttr))

     

    val setA: VertexRDD[Int] = VertexRDD(
             sc.parallelize(0L until 100L).map(id => (id, 1)))
    val rddB: RDD[(VertexId, Double)] = sc.parallelize(
             0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
    val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
    val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

到此,相信大家對(duì)“GraphX的基礎(chǔ)知識(shí)有哪些”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

新聞標(biāo)題:GraphX的基礎(chǔ)知識(shí)有哪些
文章位置:http://chinadenli.net/article44/gopjhe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供小程序開發(fā)、面包屑導(dǎo)航用戶體驗(yàn)、定制網(wǎng)站自適應(yīng)網(wǎng)站、企業(yè)建站

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司