大數(shù)據(jù)開發(fā)中Spark調(diào)優(yōu)常用手段是什么,針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
10多年的黃島網(wǎng)站建設(shè)經(jīng)驗,針對設(shè)計、前端、開發(fā)、售后、文案、推廣等六對一服務(wù),響應(yīng)快,48小時及時工作處理。成都全網(wǎng)營銷推廣的優(yōu)勢是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動調(diào)整黃島建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計,從而大程度地提升瀏覽體驗。成都創(chuàng)新互聯(lián)從事“黃島網(wǎng)站設(shè)計”,“黃島網(wǎng)站推廣”以來,每個客戶項目都認(rèn)真落實執(zhí)行。
spark調(diào)優(yōu)常見手段,在生產(chǎn)中常常會遇到各種各樣的問題,有事前原因,有事中原因,也有不規(guī)范原因,spark調(diào)優(yōu)總結(jié)下來可以從下面幾個點來調(diào)優(yōu)。
分配更多的資源: 它是性能優(yōu)化調(diào)優(yōu)的王道,就是增加和分配更多的資源,這對于性能和速度上的提升是顯而易見的, 基本上,在一定范圍之內(nèi),增加資源與性能的提升,是成正比的;寫完了一個復(fù)雜的spark作業(yè)之后,進行性能調(diào)優(yōu)的時候,首先第一步,就是要來調(diào)節(jié)最優(yōu)的資源配置; 在這個基礎(chǔ)之上,如果說你的spark作業(yè),能夠分配的資源達(dá)到了你的能力范圍的頂端之后,無法再分配更多的資源了,公司資源有限;那么才是考慮去做后面的這些性能調(diào)優(yōu)的點。 相關(guān)問題: (1)分配哪些資源? (2)在哪里可以設(shè)置這些資源? (3)剖析為什么分配這些資源之后,性能可以得到提升?
executor-memory、executor-cores、driver-memory
1.2 在哪里可以設(shè)置這些資源
在實際的生產(chǎn)環(huán)境中,提交spark任務(wù)時,使用spark-submit shell腳本,在里面調(diào)整對應(yīng)的參數(shù)。 提交任務(wù)的腳本: spark-submit \ --master spark://node1:7077 \ --class com.hoult.WordCount \ --num-executors 3 \ 配置executor的數(shù)量 --driver-memory 1g \ 配置driver的內(nèi)存(影響不大) --executor-memory 1g \ 配置每一個executor的內(nèi)存大小 --executor-cores 3 \ 配置每一個executor的cpu個數(shù) /export/servers/wordcount.jar
==Standalone模式==
先計算出公司spark集群上的所有資源 每臺節(jié)點的內(nèi)存大小和cpu核數(shù), 比如:一共有20臺worker節(jié)點,每臺節(jié)點8g內(nèi)存,10個cpu。 實際任務(wù)在給定資源的時候,可以給20個executor、每個executor的內(nèi)存8g、每個executor的使用的cpu個數(shù)10。
==Yarn模式==
先計算出yarn集群的所有大小,比如一共500g內(nèi)存,100個cpu; 這個時候可以分配的最大資源,比如給定50個executor、每個executor的內(nèi)存大小10g,每個executor使用的cpu個數(shù)為2。
使用原則
在資源比較充足的情況下,盡可能的使用更多的計算資源,盡量去調(diào)節(jié)到最大的大小
--executor-memory --total-executor-cores
spark作業(yè)中,各個stage的task的數(shù)量,也就代表了spark作業(yè)在各個階段stage的并行度! 當(dāng)分配完所能分配的最大資源了,然后對應(yīng)資源去調(diào)節(jié)程序的并行度,如果并行度沒有與資源相匹配,那么導(dǎo)致你分配下去的資源都浪費掉了。同時并行運行,還可以讓每個task要處理的數(shù)量變少(很簡單的原理。合理設(shè)置并行度,可以充分利用集群資源,減少每個task處理數(shù)據(jù)量,而增加性能加快運行速度。)
至少設(shè)置成與spark Application 的總cpu core 數(shù)量相同。 最理想情況,150個core,分配150task,一起運行,差不多同一時間運行完畢 官方推薦,task數(shù)量,設(shè)置成spark Application 總cpu core數(shù)量的2~3倍 。 比如150個cpu core ,基本設(shè)置task數(shù)量為300~500. 與理想情況不同的,有些task會運行快一點,比如50s就完了,有些task 可能會慢一點,要一分半才運行完,所以如果你的task數(shù)量,剛好設(shè)置的跟cpu core 數(shù)量相同,可能會導(dǎo)致資源的浪費。 因為比如150個task中10個先運行完了,剩余140個還在運行,但是這個時候,就有10個cpu core空閑出來了,導(dǎo)致浪費。如果設(shè)置2~3倍,那么一個task運行完以后,另外一個task馬上補上來,盡量讓cpu core不要空閑。同時盡量提升spark運行效率和速度。提升性能。
設(shè)置參數(shù)spark.default.parallelism 默認(rèn)是沒有值的,如果設(shè)置了值為10,它會在shuffle的過程才會起作用。 比如 val rdd2 = rdd1.reduceByKey(_+_) 此時rdd2的分區(qū)數(shù)就是10 可以通過在構(gòu)建SparkConf對象的時候設(shè)置,例如: new SparkConf().set("spark.defalut.parallelism","500")
使用rdd.repartition 來重新分區(qū),該方法會生成一個新的rdd,使其分區(qū)數(shù)變大。 此時由于一個partition對應(yīng)一個task,那么對應(yīng)的task個數(shù)越多,通過這種方式也可以提高并行度。
http://spark.apache.org/docs/2.3.3/sql-programming-guide.html
通過設(shè)置參數(shù) spark.sql.shuffle.partitions=500 默認(rèn)為200; 可以適當(dāng)增大,來提高并行度。 比如設(shè)置為 spark.sql.shuffle.partitions=500
專門針對sparkSQL來設(shè)置的
如上圖所示的計算邏輯: (1)當(dāng)?shù)谝淮问褂胷dd2做相應(yīng)的算子操作得到rdd3的時候,就會從rdd1開始計算,先讀取HDFS上的文件,然后對rdd1做對應(yīng)的算子操作得到rdd2,再由rdd2計算之后得到rdd3。同樣為了計算得到rdd4,前面的邏輯會被重新計算。 (3)默認(rèn)情況下多次對一個rdd執(zhí)行算子操作,去獲取不同的rdd,都會對這個rdd及之前的父rdd全部重新計算一次。 這種情況在實際開發(fā)代碼的時候會經(jīng)常遇到,但是我們一定要避免一個rdd重復(fù)計算多次,否則會導(dǎo)致性能急劇降低。 總結(jié):可以把多次使用到的rdd,也就是公共rdd進行持久化,避免后續(xù)需要,再次重新計算,提升效率。
可以調(diào)用rdd的cache或者persist方法。
(1)cache方法默認(rèn)是把數(shù)據(jù)持久化到內(nèi)存中 ,例如:rdd.cache ,其本質(zhì)還是調(diào)用了persist方法 (2)persist方法中有豐富的緩存級別,這些緩存級別都定義在StorageLevel這個object中,可以結(jié)合實際的應(yīng)用場景合理的設(shè)置緩存級別。例如: rdd.persist(StorageLevel.MEMORY_ONLY),這是cache方法的實現(xiàn)。
(1)如果正常將數(shù)據(jù)持久化在內(nèi)存中,那么可能會導(dǎo)致內(nèi)存的占用過大,這樣的話,也許會導(dǎo)致OOM內(nèi)存溢出。 (2)當(dāng)純內(nèi)存無法支撐公共RDD數(shù)據(jù)完全存放的時候,就優(yōu)先考慮使用序列化的方式在純內(nèi)存中存儲。將RDD的每個partition的數(shù)據(jù),序列化成一個字節(jié)數(shù)組;序列化后,大大減少內(nèi)存的空間占用。 (3)序列化的方式,唯一的缺點就是,在獲取數(shù)據(jù)的時候,需要反序列化。但是可以減少占用的空間和便于網(wǎng)絡(luò)傳輸 (4)如果序列化純內(nèi)存方式,還是導(dǎo)致OOM,內(nèi)存溢出;就只能考慮磁盤的方式,內(nèi)存+磁盤的普通方式(無序列化)。 (5)為了數(shù)據(jù)的高可靠性,而且內(nèi)存充足,可以使用雙副本機制,進行持久化 持久化的雙副本機制,持久化后的一個副本,因為機器宕機了,副本丟了,就還是得重新計算一次; 持久化的每個數(shù)據(jù)單元,存儲一份副本,放在其他節(jié)點上面,從而進行容錯; 一個副本丟了,不用重新計算,還可以使用另外一份副本。這種方式,僅僅針對你的內(nèi)存資源極度充足。 比如: StorageLevel.MEMORY_ONLY_2
在實際工作中可能會遇到這樣的情況,由于要處理的數(shù)據(jù)量非常大,這個時候可能會在一個stage中出現(xiàn)大量的task,比如有1000個task,這些task都需要一份相同的數(shù)據(jù)來處理業(yè)務(wù),這份數(shù)據(jù)的大小為100M,該數(shù)據(jù)會拷貝1000份副本,通過網(wǎng)絡(luò)傳輸?shù)礁鱾€task中去,給task使用。這里會涉及大量的網(wǎng)絡(luò)傳輸開銷,同時至少需要的內(nèi)存為1000*100M=100G,這個內(nèi)存開銷是非常大的。不必要的內(nèi)存的消耗和占用,就導(dǎo)致了你在進行RDD持久化到內(nèi)存,也許就沒法完全在內(nèi)存中放下;就只能寫入磁盤,最后導(dǎo)致后續(xù)的操作在磁盤IO上消耗性能;這對于spark任務(wù)處理來說就是一場災(zāi)難。 由于內(nèi)存開銷比較大,task在創(chuàng)建對象的時候,可能會出現(xiàn)堆內(nèi)存放不下所有對象,就會導(dǎo)致頻繁的垃圾回收器的回收GC。GC的時候一定是會導(dǎo)致工作線程停止,也就是導(dǎo)致Spark暫停工作那么一點時間。頻繁GC的話,對Spark作業(yè)的運行的速度會有相當(dāng)可觀的影響。
Spark中分布式執(zhí)行的代碼需要傳遞到各個executor的task上運行。對于一些只讀、固定的數(shù)據(jù),每次都需要Driver廣播到各個Task上,這樣效率低下。廣播變量允許將變量只廣播給各個executor。該executor上的各個task再從所在節(jié)點的BlockManager(負(fù)責(zé)管理某個executor對應(yīng)的內(nèi)存和磁盤上的數(shù)據(jù))獲取變量,而不是從Driver獲取變量,從而提升了效率。
廣播變量,初始的時候,就在Drvier上有一份副本。通過在Driver把共享數(shù)據(jù)轉(zhuǎn)換成廣播變量。 task在運行的時候,想要使用廣播變量中的數(shù)據(jù),此時首先會在自己本地的Executor對應(yīng)的BlockManager中,嘗試獲取變量副本;如果本地沒有,那么就從Driver遠(yuǎn)程拉取廣播變量副本,并保存在本地的BlockManager中; 此后這個executor上的task,都會直接使用本地的BlockManager中的副本。那么這個時候所有該executor中的task都會使用這個廣播變量的副本。也就是說一個executor只需要在第一個task啟動時,獲得一份廣播變量數(shù)據(jù),之后的task都從本節(jié)點的BlockManager中獲取相關(guān)數(shù)據(jù)。 executor的BlockManager除了從driver上拉取,也可能從其他節(jié)點的BlockManager上拉取變量副本,網(wǎng)絡(luò)距離越近越好。
比如一個任務(wù)需要50個executor,1000個task,共享數(shù)據(jù)為100M。 (1)在不使用廣播變量的情況下,1000個task,就需要該共享數(shù)據(jù)的1000個副本,也就是說有1000份數(shù)需要大量的網(wǎng)絡(luò)傳輸和內(nèi)存開銷存儲。耗費的內(nèi)存大小1000*100=100G. (2)使用了廣播變量后,50個executor就只需要50個副本數(shù)據(jù),而且不一定都是從Driver傳輸?shù)矫總€節(jié)點,還可能是就近從最近的節(jié)點的executor的blockmanager上拉取廣播變量副本,網(wǎng)絡(luò)傳輸速度大大增加;內(nèi)存開銷 50*100M=5G 總結(jié): 不使用廣播變量的內(nèi)存開銷為100G,使用后的內(nèi)存開銷5G,這里就相差了20倍左右的網(wǎng)絡(luò)傳輸性能損耗和內(nèi)存開銷,使用廣播變量后對于性能的提升和影響,還是很可觀的。 廣播變量的使用不一定會對性能產(chǎn)生決定性的作用。比如運行30分鐘的spark作業(yè),可能做了廣播變量以后,速度快了2分鐘,或者5分鐘。但是一點一滴的調(diào)優(yōu),積少成多。最后還是會有效果的。
(1)能不能將一個RDD使用廣播變量廣播出去? 不能,因為RDD是不存儲數(shù)據(jù)的。可以將RDD的結(jié)果廣播出去。 (2)廣播變量只能在Driver端定義,不能在Executor端定義。 (3)在Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值。 (4)如果executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。 (5)如果Executor端用到了Driver的變量,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本。
例如
(1) 通過sparkContext的broadcast方法把數(shù)據(jù)轉(zhuǎn)換成廣播變量,類型為Broadcast, val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6)) (2) 然后executor上的BlockManager就可以拉取該廣播變量的副本獲取具體的數(shù)據(jù)。 獲取廣播變量中的值可以通過調(diào)用其value方法 val array: Array[Int] = broadcastArray.value
spark中的shuffle涉及到數(shù)據(jù)要進行大量的網(wǎng)絡(luò)傳輸,下游階段的task任務(wù)需要通過網(wǎng)絡(luò)拉取上階段task的輸出數(shù)據(jù),shuffle過程,簡單來說,就是將分布在集群中多個節(jié)點上的同一個key,拉取到同一個節(jié)點上,進行聚合或join等操作。比如reduceByKey、join等算子,都會觸發(fā)shuffle操作。 如果有可能的話,要盡量避免使用shuffle類算子。 因為Spark作業(yè)運行過程中,最消耗性能的地方就是shuffle過程。
spark程序在開發(fā)的過程中使用reduceByKey、join、distinct、repartition等算子操作,這里都會產(chǎn)生shuffle,由于shuffle這一塊是非常耗費性能的,實際開發(fā)中盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業(yè),可以大大減少性能開銷。
小案例
//錯誤的做法: // 傳統(tǒng)的join操作會導(dǎo)致shuffle操作。 // 因為兩個RDD中,相同的key都需要通過網(wǎng)絡(luò)拉取到一個節(jié)點上,由一個task進行join操作。 val rdd3 = rdd1.join(rdd2) //正確的做法: // Broadcast+map的join操作,不會導(dǎo)致shuffle操作。 // 使用Broadcast將一個數(shù)據(jù)量較小的RDD作為廣播變量。 val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) // 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數(shù)據(jù)。 // 然后進行遍歷,如果發(fā)現(xiàn)rdd2中某條數(shù)據(jù)的key與rdd1的當(dāng)前數(shù)據(jù)的key是相同的,那么就判定可以進行join。 // 此時就可以根據(jù)自己需要的方式,將rdd1當(dāng)前數(shù)據(jù)與rdd2中可以連接的數(shù)據(jù),拼接在一起(String或Tuple)。 val rdd3 = rdd1.map(rdd2DataBroadcast...) // 注意,以上操作,建議僅僅在rdd2的數(shù)據(jù)量比較少(比如幾百M,或者一兩G)的情況下使用。 // 因為每個Executor的內(nèi)存中,都會駐留一份rdd2的全量數(shù)據(jù)。
map-side預(yù)聚合
如果因為業(yè)務(wù)需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預(yù)聚合的算子。 所謂的map-side預(yù)聚合,說的是在每個節(jié)點本地對相同的key進行一次聚合操作,類似于MapReduce中的本地combiner。 map-side預(yù)聚合之后,每個節(jié)點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節(jié)點在拉取所有節(jié)點上的相同key時,就會大大減少需要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤IO以及網(wǎng)絡(luò)傳輸開銷。 通常來說,在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子。因為reduceByKey和aggregateByKey算子都會使用用戶自定義的函數(shù)對每個節(jié)點本地的相同key進行預(yù)聚合。 而groupByKey算子是不會進行預(yù)聚合的,全量的數(shù)據(jù)會在集群的各個節(jié)點之間分發(fā)和傳輸,性能相對來說比較差。 比如如下兩幅圖,就是典型的例子,分別基于reduceByKey和groupByKey進行單詞計數(shù)。其中第一張圖是groupByKey的原理圖,可以看到,沒有進行任何本地聚合時,所有數(shù)據(jù)都會在集群節(jié)點之間傳輸;第二張圖是reduceByKey的原理圖,可以看到,每個節(jié)點本地的相同key數(shù)據(jù),都進行了預(yù)聚合,然后才傳輸?shù)狡渌?jié)點上進行全局聚合。
==groupByKey進行單詞計數(shù)原理==
==reduceByKey單詞計數(shù)原理==
reduceByKey/aggregateByKey 可以進行預(yù)聚合操作,減少數(shù)據(jù)的傳輸量,提升性能
groupByKey 不會進行預(yù)聚合操作,進行數(shù)據(jù)的全量拉取,性能比較低
mapPartitions類的算子,一次函數(shù)調(diào)用會處理一個partition所有的數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條,性能相對來說會高一些。 但是有的時候,使用mapPartitions會出現(xiàn)OOM(內(nèi)存溢出)的問題。因為單次函數(shù)調(diào)用就要處理掉一個partition所有的數(shù)據(jù),如果內(nèi)存不夠,垃圾回收時是無法回收掉太多對象的,很可能出現(xiàn)OOM異常。所以使用這類操作時要慎重!
原理類似于“使用mapPartitions替代map”,也是一次函數(shù)調(diào)用處理一個partition的所有數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條數(shù)據(jù)。 在實踐中發(fā)現(xiàn),foreachPartitions類的算子,對性能的提升還是很有幫助的。比如在foreach函數(shù)中,將RDD中所有數(shù)據(jù)寫MySQL,那么如果是普通的foreach算子,就會一條數(shù)據(jù)一條數(shù)據(jù)地寫,每次函數(shù)調(diào)用可能就會創(chuàng)建一個數(shù)據(jù)庫連接,此時就勢必會頻繁地創(chuàng)建和銷毀數(shù)據(jù)庫連接,性能是非常低下; 但是如果用foreachPartitions算子一次性處理一個partition的數(shù)據(jù),那么對于每個partition,只要創(chuàng)建一個數(shù)據(jù)庫連接即可,然后執(zhí)行批量插入操作,此時性能是比較高的。實踐中發(fā)現(xiàn),對于1萬條左右的數(shù)據(jù)量寫MySQL,性能可以提升30%以上。
通常對一個RDD執(zhí)行filter算子過濾掉RDD中較多數(shù)據(jù)后(比如30%以上的數(shù)據(jù)),建議使用coalesce算子,手動減少RDD的partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition中去。 因為filter之后,RDD的每個partition中都會有很多數(shù)據(jù)被過濾掉,此時如果照常進行后續(xù)的計算,其實每個task處理的partition中的數(shù)據(jù)量并不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。 因此用coalesce減少partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition。在某些場景下,對于性能的提升會有一定的幫助。
repartitionAndSortWithinPartitions是Spark官網(wǎng)推薦的一個算子,官方建議,如果需要在repartition重分區(qū)之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子。 因為該算子可以一邊進行重分區(qū)的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。
Spark在進行任務(wù)計算的時候,會涉及到數(shù)據(jù)跨進程的網(wǎng)絡(luò)傳輸、數(shù)據(jù)的持久化,這個時候就需要對數(shù)據(jù)進行序列化。Spark默認(rèn)采用Java的序列化器。默認(rèn)java序列化的優(yōu)缺點如下: 其好處: 處理起來方便,不需要我們手動做其他操作,只是在使用一個對象和變量的時候,需要實現(xiàn)Serializble接口。 其缺點: 默認(rèn)的序列化機制的效率不高,序列化的速度比較慢;序列化以后的數(shù)據(jù),占用的內(nèi)存空間相對還是比較大。 Spark支持使用Kryo序列化機制。Kryo序列化機制,比默認(rèn)的Java序列化機制,速度要快,序列化后的數(shù)據(jù)要更小,大概是Java序列化機制的1/10。所以Kryo序列化優(yōu)化以后,可以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少;在集群中耗費的內(nèi)存資源大大減少。
Kryo序列化機制,一旦啟用以后,會生效的幾個地方: (1)算子函數(shù)中使用到的外部變量 算子中的外部變量可能來著與driver需要涉及到網(wǎng)絡(luò)傳輸,就需要用到序列化。 最終可以優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅?,?yōu)化集群中內(nèi)存的占用和消耗 (2)持久化RDD時進行序列化,StorageLevel.MEMORY_ONLY_SER 將rdd持久化時,對應(yīng)的存儲級別里,需要用到序列化。 最終可以優(yōu)化內(nèi)存的占用和消耗;持久化RDD占用的內(nèi)存越少,task執(zhí)行的時候,創(chuàng)建的對象,就不至于頻繁的占滿內(nèi)存,頻繁發(fā)生GC。 (3) 產(chǎn)生shuffle的地方,也就是寬依賴 下游的stage中的task,拉取上游stage中的task產(chǎn)生的結(jié)果數(shù)據(jù),跨網(wǎng)絡(luò)傳輸,需要用到序列化。最終可以優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅?/pre>7.3 如何開啟Kryo序列化機制
// 創(chuàng)建SparkConf對象。 val conf = new SparkConf().setMaster(...).setAppName(...) // 設(shè)置序列化器為KryoSerializer。 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注冊要序列化的自定義類型。 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))8. 使用fastutil優(yōu)化數(shù)據(jù)格式
8.1 fastutil介紹
fastutil是擴展了Java標(biāo)準(zhǔn)集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊類型的map、set、list和queue; fastutil能夠提供更小的內(nèi)存占用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set.8.2 fastutil好處
fastutil集合類,可以減小內(nèi)存的占用,并且在進行集合的遍歷、根據(jù)索引(或者key)獲取元素的值和設(shè)置元素的值的時候,提供更快的存取速度8.3 Spark中應(yīng)用fastutil的場景和使用
8.3.1 算子函數(shù)使用了外部變量
(1)你可以使用Broadcast廣播變量優(yōu)化; (2)可以使用Kryo序列化類庫,提升序列化性能和效率; (3)如果外部變量是某種比較大的集合,那么可以考慮使用fastutil改寫外部變量; 首先從源頭上就減少內(nèi)存的占用(fastutil),通過廣播變量進一步減少內(nèi)存占用,再通過Kryo序列化類庫進一步減少內(nèi)存占用。8.3.2 算子函數(shù)里使用了比較大的集合Map/List
在你的算子函數(shù)里,也就是task要執(zhí)行的計算邏輯里面,如果有邏輯中,出現(xiàn),要創(chuàng)建比較大的Map、List等集合, 可能會占用較大的內(nèi)存空間,而且可能涉及到消耗性能的遍歷、存取等集合操作; 那么此時,可以考慮將這些集合類型使用fastutil類庫重寫, 使用了fastutil集合類以后,就可以在一定程度上,減少task創(chuàng)建出來的集合類型的內(nèi)存占用。 避免executor內(nèi)存頻繁占滿,頻繁喚起GC,導(dǎo)致性能下降。8.3.3 fastutil的使用
第一步:在pom.xml中引用fastutil的包 <dependency> <groupId>fastutil</groupId> <artifactId>fastutil</artifactId> <version>5.0.9</version> </dependency> 第二步:平時使用List (Integer)的替換成IntList即可。 List<Integer>的list對應(yīng)的到fastutil就是IntList類型 使用說明: 基本都是類似于IntList的格式,前綴就是集合的元素類型; 特殊的就是Map,Int2IntMap,代表了key-value映射的元素類型。9. 調(diào)節(jié)數(shù)據(jù)本地化等待時長
Spark在Driver上對Application的每一個stage的task進行分配之前,都會計算出每個task要計算的是哪個分片數(shù)據(jù),RDD的某個partition;Spark的task分配算法,優(yōu)先會希望每個task正好分配到它要計算的數(shù)據(jù)所在的節(jié)點,這樣的話就不用在網(wǎng)絡(luò)間傳輸數(shù)據(jù); 但是通常來說,有時事與愿違,可能task沒有機會分配到它的數(shù)據(jù)所在的節(jié)點,為什么呢,可能那個節(jié)點的計算資源和計算能力都滿了;所以這種時候,通常來說,Spark會等待一段時間,默認(rèn)情況下是3秒(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最后實在是等待不了了,就會選擇一個比較差的本地化級別,比如說將task分配到距離要計算的數(shù)據(jù)所在節(jié)點比較近的一個節(jié)點,然后進行計算。9.1 本地化級別
(1)PROCESS_LOCAL:進程本地化 代碼和數(shù)據(jù)在同一個進程中,也就是在同一個executor中;計算數(shù)據(jù)的task由executor執(zhí)行,數(shù)據(jù)在executor的BlockManager中;性能最好 (2)NODE_LOCAL:節(jié)點本地化 代碼和數(shù)據(jù)在同一個節(jié)點中;比如說數(shù)據(jù)作為一個HDFS block塊,就在節(jié)點上,而task在節(jié)點上某個executor中運行;或者是數(shù)據(jù)和task在一個節(jié)點上的不同executor中;數(shù)據(jù)需要在進程間進行傳輸;性能其次 (3)RACK_LOCAL:機架本地化 數(shù)據(jù)和task在一個機架的兩個節(jié)點上;數(shù)據(jù)需要通過網(wǎng)絡(luò)在節(jié)點之間進行傳輸; 性能比較差 (4) ANY:無限制 數(shù)據(jù)和task可能在集群中的任何地方,而且不在一個機架中;性能最差9.2 數(shù)據(jù)本地化等待時長
spark.locality.wait,默認(rèn)是3s 首先采用最佳的方式,等待3s后降級,還是不行,繼續(xù)降級...,最后還是不行,只能夠采用最差的。9.3 如何調(diào)節(jié)參數(shù)并且測試
修改spark.locality.wait參數(shù),默認(rèn)是3s,可以增加 下面是每個數(shù)據(jù)本地化級別的等待時間,默認(rèn)都是跟spark.locality.wait時間相同, 默認(rèn)都是3s(可查看spark官網(wǎng)對應(yīng)參數(shù)說明,如下圖所示) spark.locality.wait.node spark.locality.wait.process spark.locality.wait.rack在代碼中設(shè)置: new SparkConf().set("spark.locality.wait","10") 然后把程序提交到spark集群中運行,注意觀察日志,spark作業(yè)的運行日志,推薦大家在測試的時候,先用client模式,在本地就直接可以看到比較全的日志。 日志里面會顯示,starting task .... PROCESS LOCAL、NODE LOCAL..... 例如: Starting task 0.0 in stage 1.0 (TID 2, 192.168.200.102, partition 0, NODE_LOCAL, 5254 bytes) 觀察大部分task的數(shù)據(jù)本地化級別 如果大多都是PROCESS_LOCAL,那就不用調(diào)節(jié)了。如果是發(fā)現(xiàn),好多的級別都是NODE_LOCAL、ANY,那么最好就去調(diào)節(jié)一下數(shù)據(jù)本地化的等待時長。應(yīng)該是要反復(fù)調(diào)節(jié),每次調(diào)節(jié)完以后,再來運行,觀察日志 看看大部分的task的本地化級別有沒有提升;看看整個spark作業(yè)的運行時間有沒有縮短。 注意注意: 在調(diào)節(jié)參數(shù)、運行任務(wù)的時候,別本末倒置,本地化級別倒是提升了, 但是因為大量的等待時長,spark作業(yè)的運行時間反而增加了,那就還是不要調(diào)節(jié)了。10. 基于Spark內(nèi)存模型調(diào)優(yōu)
10.1 spark中executor內(nèi)存劃分
Executor的內(nèi)存主要分為三塊
第一塊是讓task執(zhí)行我們自己編寫的代碼時使用;
第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用
第三塊是讓RDD緩存時使用
在spark1.6版本以前 spark的executor使用的靜態(tài)內(nèi)存模型,但是在spark1.6開始,多增加了一個統(tǒng)一內(nèi)存模型。 通過spark.memory.useLegacyMode 這個參數(shù)去配置 默認(rèn)這個值是false,代表用的是新的動態(tài)內(nèi)存模型; 如果想用以前的靜態(tài)內(nèi)存模型,那么就要把這個值改為true。
實際上就是把我們的一個executor分成了三部分, 一部分是Storage內(nèi)存區(qū)域, 一部分是execution區(qū)域, 還有一部分是其他區(qū)域。如果使用的靜態(tài)內(nèi)存模型,那么用這幾個參數(shù)去控制: spark.storage.memoryFraction:默認(rèn)0.6 spark.shuffle.memoryFraction:默認(rèn)0.2 所以第三部分就是0.2 如果我們cache數(shù)據(jù)量比較大,或者是我們的廣播變量比較大, 那我們就把spark.storage.memoryFraction這個值調(diào)大一點。 但是如果我們代碼里面沒有廣播變量,也沒有cache,shuffle又比較多,那我們要把spark.shuffle.memoryFraction 這值調(diào)大。
靜態(tài)內(nèi)存模型的缺點
我們配置好了Storage內(nèi)存區(qū)域和execution區(qū)域后,我們的一個任務(wù)假設(shè)execution內(nèi)存不夠用了,但是它的Storage內(nèi)存區(qū)域是空閑的,兩個之間不能互相借用,不夠靈活,所以才出來我們新的統(tǒng)一內(nèi)存模型。
動態(tài)內(nèi)存模型先是預(yù)留了300m內(nèi)存,防止內(nèi)存溢出。動態(tài)內(nèi)存模型把整體內(nèi)存分成了兩部分, 由這個參數(shù)表示spark.memory.fraction 這個指的默認(rèn)值是0.6 代表另外的一部分是0.4, 然后spark.memory.fraction 這部分又劃分成為兩個小部分。這兩小部分共占整體內(nèi)存的0.6 .這兩部分其實就是:Storage內(nèi)存和execution內(nèi)存。由spark.memory.storageFraction 這個參數(shù)去調(diào)配,因為兩個共占0.6。如果spark.memory.storageFraction這個值配的是0.5,那說明這0.6里面 storage占了0.5,也就是executor占了0.3 。
統(tǒng)一內(nèi)存模型有什么特點呢?
Storage內(nèi)存和execution內(nèi)存 可以相互借用。不用像靜態(tài)內(nèi)存模型那樣死板,但是是有規(guī)則的
為什么受傷的都是storage呢? 是因為execution里面的數(shù)據(jù)是馬上就要用的,而storage里的數(shù)據(jù)不一定馬上就要用。
以下是一份spark-submit命令的示例,大家可以參考一下,并根據(jù)自己的實際情況進行調(diào)節(jié)
bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ --driver-memory 1G \ --conf spark.default.parallelism=1000 \ --conf spark.storage.memoryFraction=0.5 \ --conf spark.shuffle.memoryFraction=0.3 \
java.lang.OutOfMemoryError ExecutorLostFailure Executor exit code 為143 executor lost hearbeat time out shuffle file lost 如果遇到以上問題,很有可能就是內(nèi)存除了問題,可以先嘗試增加內(nèi)存。如果還是解決不了,那么請聽下一次數(shù)據(jù)傾斜調(diào)優(yōu)的課。
關(guān)于大數(shù)據(jù)開發(fā)中Spark調(diào)優(yōu)常用手段是什么問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。
網(wǎng)站欄目:大數(shù)據(jù)開發(fā)中Spark調(diào)優(yōu)常用手段是什么
分享鏈接:http://chinadenli.net/article48/gidhep.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供服務(wù)器托管、外貿(mào)建站、響應(yīng)式網(wǎng)站、小程序開發(fā)、品牌網(wǎng)站建設(shè)、域名注冊
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)