大數(shù)據(jù)開(kāi)發(fā)中Spark常見(jiàn)RDD是怎樣的,針對(duì)這個(gè)問(wèn)題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問(wèn)題的小伙伴找到更簡(jiǎn)單易行的方法。
巫溪網(wǎng)站建設(shè)公司成都創(chuàng)新互聯(lián)公司,巫溪網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為巫溪近1000家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\成都外貿(mào)網(wǎng)站建設(shè)公司要多少錢,請(qǐng)找那個(gè)售后服務(wù)好的巫溪做網(wǎng)站的公司定做!
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
這是RDD的源碼中注釋中寫到的,下面介紹這五種特征屬性
一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決 定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒(méi)有指定,那么就會(huì)采用默認(rèn)值
一個(gè)對(duì)分區(qū)數(shù)據(jù)進(jìn)行計(jì)算的函數(shù)。Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn) compute 函數(shù)以 達(dá)到該目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行組合,不需要保存每次計(jì)算的結(jié)果
RDD之間的存在依賴關(guān)系。RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,RDD之間形成類似于流水線一樣的前后依 賴關(guān)系(lineage)。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是 對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算
對(duì)于 key-value
的RDD而言,可能存在分區(qū)器(Partitioner
)。Spark 實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于 哈希的HashPartitioner
,另外一個(gè)是基于范圍的RangePartitioner。只有 key-value 的RDD,才可能有 Partitioner
,非key-value的RDD的Parititioner
的值是None。Partitioner
函數(shù)決定了RDD
本身的分片數(shù)量,也 決定了parent RDD Shuffle
輸出時(shí)的分片數(shù)量
一個(gè)列表,存儲(chǔ)存儲(chǔ)每個(gè)Partition
的優(yōu)先位置(preferred location)。對(duì)于一個(gè)HDFS
文件來(lái)說(shuō),這個(gè)列表保 存的就是每個(gè)Partition
所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不移動(dòng)計(jì)算”的理念,Spark
在任務(wù)調(diào)度的時(shí)候,會(huì)盡可 能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置
從前面的RDD
的基本特征入手,在工作中常編寫的程序是,創(chuàng)建RDD
,RDD
的轉(zhuǎn)換,RDD
的算子的執(zhí)行,創(chuàng)建對(duì)應(yīng)著外部系統(tǒng)的數(shù)據(jù)流入Spark集群的必選步驟,至于之間從集合創(chuàng)建的數(shù)據(jù),一般在測(cè)試時(shí)候使用,所以不細(xì)述,RDD
的轉(zhuǎn)換對(duì)應(yīng)一個(gè)專門的算子叫Transformation
其是惰性加載使用的, 而行動(dòng)對(duì)應(yīng)著觸發(fā)Transformation
執(zhí)行的操作,一般是輸出到集合,或者打印出來(lái),或者返回一個(gè)值,另外就是從集群輸出到別的系統(tǒng),這有一個(gè)專業(yè)詞叫Action
.
轉(zhuǎn)換算子,即從一個(gè)RDD到另外一個(gè)RDD的轉(zhuǎn)換操作,對(duì)應(yīng)一些內(nèi)置的Compute函數(shù),但是這些函數(shù)被有沒(méi)有shuffle來(lái)分為寬依賴算子和窄依賴算子
一般網(wǎng)上文章有兩種,一種是搬運(yùn)定義的,即是否一個(gè)父RDD
分區(qū)會(huì)被多個(gè)子分區(qū)依賴,另外一種是看有沒(méi)有Shuffle
,有Shuffle
就是寬依賴,沒(méi)有則是窄依賴,第一種還靠譜點(diǎn),第二種就是拿本身來(lái)說(shuō)本身,所以沒(méi)有參考價(jià)值,2.1.3 如何區(qū)別寬依賴和窄依賴,可以之間看這個(gè)
map(func)
:對(duì)數(shù)據(jù)集中的每個(gè)元素都使用func,然后返回一個(gè)新的RDD filter(func)
:對(duì)數(shù)據(jù)集中的每個(gè)元素都使用func,然后返回一個(gè)包含使func為true的元素構(gòu)成的RDD flatMap(func)
:與 map 類似,每個(gè)輸入元素被映射為0或多個(gè)輸出元素 mapPartitions(func)
:和map很像,但是map是將func作用在每個(gè)元素上,而mapPartitions是func作用在整個(gè)分 區(qū)上。假設(shè)一個(gè)RDD有N個(gè)元素,M個(gè)分區(qū)(N >> M),那么map的函數(shù)將被調(diào)用N次,而mapPartitions中的函數(shù) 僅被調(diào)用M次,一次處理一個(gè)分區(qū)中的所有元素 mapPartitionsWithIndex(func)
:與 mapPartitions 類似,多了分區(qū)的索引值的信息
glom()
:將每一個(gè)分區(qū)形成一個(gè)數(shù)組,形成新的RDD類型 RDD[Array[T]] sample(withReplacement, fraction, seed)
:采樣算子。以指定的隨機(jī)種子(seed)隨機(jī)抽樣出數(shù)量為fraction的數(shù) 據(jù),withReplacement表示是抽出的數(shù)據(jù)是否放回,true為有放回的抽樣,false為無(wú)放回的抽樣
coalesce(numPartitions,false)
:無(wú)shuffle,一般用來(lái)減少分區(qū)
union(otherRDD)
: 求兩個(gè)RDD的并集
cartesian(otherRDD)
:笛卡爾積
zip(otherRDD)
:將兩個(gè)RDD組合成 key-value 形式的RDD,默認(rèn)兩個(gè)RDD的partition數(shù)量以及元素?cái)?shù)量都相同,否 則會(huì)拋出異常。
map 與 mapPartitions 的區(qū)別map
:每次處理一條數(shù)據(jù) mapPartitions
:每次處理一個(gè)分區(qū)的數(shù)據(jù),分區(qū)的數(shù)據(jù)處理完成后,數(shù)據(jù)才能釋放,資源不足時(shí)容易導(dǎo)致 OOM 最佳實(shí)踐:當(dāng)內(nèi)存資源充足時(shí),建議使用mapPartitions
,以提高處理效率
groupBy(func)
:按照傳入函數(shù)的返回值進(jìn)行分組。將key相同的值放入一個(gè)迭代器
distinct([numTasks]))
:對(duì)RDD元素去重后,返回一個(gè)新的RDD??蓚魅雗umTasks參數(shù)改變RDD分區(qū)數(shù)
coalesce(numPartitions, true)
:有shuffle,無(wú)論增加分區(qū)還是減少分區(qū),一般用repartition來(lái)代替
repartition(numPartitions)
:增加或減少分區(qū)數(shù),有shuffle
sortBy(func, [ascending], [numTasks])
:使用 func 對(duì)數(shù)據(jù)進(jìn)行處理,對(duì)處理后的結(jié)果進(jìn)行排序
intersection(otherRDD)
: 求兩個(gè)RDD的交集
subtract (otherRDD)
: 求兩個(gè)RDD的差集
這里我建議理解不了的算子,直接從Spark
的history
的依賴圖來(lái)看,有沒(méi)有劃分Stage
,如果劃分了就是寬依賴,沒(méi)有劃分就是窄依賴,當(dāng)然這是實(shí)戰(zhàn)派的做法,可以在同事或者同學(xué)說(shuō)明問(wèn)題的時(shí)候,show your code
給他,然后把依賴圖拿給他 ,當(dāng)然作為理論加實(shí)踐的并行者,我這里再拿一種來(lái)判別,是從理解定義開(kāi)始的,定義說(shuō)是父RDD分區(qū)有沒(méi)有被多個(gè)子分區(qū)依賴,那可以從這個(gè)角度想一下,父分區(qū)單個(gè)分區(qū)數(shù)據(jù),有沒(méi)有可能流向不同的子RDD的分區(qū),比如想一想distinct算子,或者sortBy算子,全局去重和全局排序,假設(shè)剛開(kāi)始1,2,3在一個(gè)分區(qū),經(jīng)過(guò)map(x => (x, null)).reduceByKey((x, y) => x).map(_._1)
去重后,雖然分區(qū)數(shù)量沒(méi)有變,但是每個(gè)分區(qū)數(shù)據(jù)必然要看別的分區(qū)的數(shù)據(jù),才能知道最后自己要不要保留,從輸入分區(qū),到輸出分區(qū),必然經(jīng)過(guò)匯合重組,所以必然有shuffle
的。sortBy
同理。
Action觸發(fā)Job。一個(gè)Spark程序(Driver程序)包含了多少 Action 算子,那么就有多少Job; 典型的Action算子: collect / count collect() => sc.runJob() => ... => dagScheduler.runJob() => 觸發(fā)了Job
collect() / collectAsMap() stats / count / mean / stdev / max / min reduce(func) / fold(func) / aggregate(func)
first()
:Return the first element in this RDD take(n)
:Take the first num elements of the RDD top(n)
:按照默認(rèn)(降序)或者指定的排序規(guī)則,返回前num個(gè)元素。 takeSample(withReplacement, num, [seed])
:返回采樣的數(shù)據(jù) foreach(func) / foreachPartition(func)
:與map、mapPartitions類似,區(qū)別是 foreach 是 Action saveAsTextFile(path) / saveAsSequenceFile(path) / saveAsObjectFile(path)
RDD整體上分為 Value 類型和 Key-Value 類型。 前面介紹的是 Value 類型的RDD的操作,實(shí)際使用更多的是 key-value 類型的RDD,也稱為 PairRDD。 Value 類型RDD的操作基本集中在 RDD.scala 中; key-value 類型的RDD操作集中在 PairRDDFunctions.scala 中;
前面介紹的大多數(shù)算子對(duì) Pair RDD 都是有效的,RDD的值為key-value的時(shí)候即可隱式轉(zhuǎn)換為PairRDD, Pair RDD還有屬于自己的 Transformation、Action 算子;
mapValues / flatMapValues / keys / values,這些操作都可以使用 map 操作實(shí)現(xiàn),是簡(jiǎn)化操作。
PariRDD(k, v)使用范圍廣,聚合 groupByKey / reduceByKey / foldByKey / aggregateByKey combineByKey(OLD) / combineByKeyWithClassTag (NEW) => 底層實(shí)現(xiàn) subtractByKey:類似于subtract,刪掉 RDD 中鍵與 other RDD 中的鍵相同的元素
結(jié)論:效率相等用最熟悉的方法;groupByKey在一般情況下效率低,盡量少用
sortByKey:sortByKey函數(shù)作用于PairRDD,對(duì)Key進(jìn)行排序
cogroup / join / leftOuterJoin / rightOuterJoin / fullOuterJoin
val rdd1 = sc.makeRDD(Array((1,"Spark"), (2,"Hadoop"), (3,"Kylin"), (4,"Flink"))) val rdd2 = sc.makeRDD(Array((3,"李四"), (4,"王五"), (5,"趙六"), (6,"馮七"))) val rdd3 = rdd1.cogroup(rdd2) rdd3.collect.foreach(println) rdd3.filter{case (_, (v1, v2)) => v1.nonEmpty & v2.nonEmpty}.collect // 仿照源碼實(shí)現(xiàn)join操作 rdd3.flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) val rdd1 = sc.makeRDD(Array(("1","Spark"),("2","Hadoop"),("3","Scala"),("4","Java"))) val rdd2 = sc.makeRDD(Array(("3","20K"),("4","18K"),("5","25K"),("6","10K"))) rdd1.join(rdd2).collect rdd1.leftOuterJoin(rdd2).collect rdd1.rightOuterJoin(rdd2).collect rdd1.fullOuterJoin(rdd2).collect
collectAsMap / countByKey / lookup(key)
lookup(key)
:高效的查找方法,只查找對(duì)應(yīng)分區(qū)的數(shù)據(jù)
關(guān)于大數(shù)據(jù)開(kāi)發(fā)中Spark常見(jiàn)RDD是怎樣的問(wèn)題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒(méi)有解開(kāi),可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。
當(dāng)前名稱:大數(shù)據(jù)開(kāi)發(fā)中Spark常見(jiàn)RDD是怎樣的
網(wǎng)站地址:http://chinadenli.net/article2/ppgpoc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站改版、標(biāo)簽優(yōu)化、網(wǎng)站設(shè)計(jì)、全網(wǎng)營(yíng)銷推廣、ChatGPT、外貿(mào)建站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)