欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

大數(shù)據(jù)開(kāi)發(fā)中Spark調(diào)優(yōu)常用手段是什么

大數(shù)據(jù)開(kāi)發(fā)中Spark調(diào)優(yōu)常用手段是什么,針對(duì)這個(gè)問(wèn)題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問(wèn)題的小伙伴找到更簡(jiǎn)單易行的方法。

10多年的黃島網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開(kāi)發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。成都全網(wǎng)營(yíng)銷(xiāo)推廣的優(yōu)勢(shì)是能夠根據(jù)用戶(hù)設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整黃島建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)從事“黃島網(wǎng)站設(shè)計(jì)”,“黃島網(wǎng)站推廣”以來(lái),每個(gè)客戶(hù)項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

Spark調(diào)優(yōu)

spark調(diào)優(yōu)常見(jiàn)手段,在生產(chǎn)中常常會(huì)遇到各種各樣的問(wèn)題,有事前原因,有事中原因,也有不規(guī)范原因,spark調(diào)優(yōu)總結(jié)下來(lái)可以從下面幾個(gè)點(diǎn)來(lái)調(diào)優(yōu)。

1. 分配更多的資源

分配更多的資源:
  它是性能優(yōu)化調(diào)優(yōu)的王道,就是增加和分配更多的資源,這對(duì)于性能和速度上的提升是顯而易見(jiàn)的,
  基本上,在一定范圍之內(nèi),增加資源與性能的提升,是成正比的;寫(xiě)完了一個(gè)復(fù)雜的spark作業(yè)之后,進(jìn)行性能調(diào)優(yōu)的時(shí)候,首先第一步,就是要來(lái)調(diào)節(jié)最優(yōu)的資源配置;
  在這個(gè)基礎(chǔ)之上,如果說(shuō)你的spark作業(yè),能夠分配的資源達(dá)到了你的能力范圍的頂端之后,無(wú)法再分配更多的資源了,公司資源有限;那么才是考慮去做后面的這些性能調(diào)優(yōu)的點(diǎn)。

相關(guān)問(wèn)題:
(1)分配哪些資源?
(2)在哪里可以設(shè)置這些資源?
(3)剖析為什么分配這些資源之后,性能可以得到提升?

1.1 分配哪些資源

executor-memory、executor-cores、driver-memory

1.2 在哪里可以設(shè)置這些資源

在實(shí)際的生產(chǎn)環(huán)境中,提交spark任務(wù)時(shí),使用spark-submit shell腳本,在里面調(diào)整對(duì)應(yīng)的參數(shù)。
   
 提交任務(wù)的腳本:
 spark-submit \
 --master spark://node1:7077 \
 --class com.hoult.WordCount \
 --num-executors 3 \    配置executor的數(shù)量
 --driver-memory 1g \   配置driver的內(nèi)存(影響不大)
 --executor-memory 1g \ 配置每一個(gè)executor的內(nèi)存大小
 --executor-cores 3 \   配置每一個(gè)executor的cpu個(gè)數(shù)
 /export/servers/wordcount.jar

1.2 參數(shù)調(diào)節(jié)到多大,算是最大

  • ==Standalone模式==

先計(jì)算出公司spark集群上的所有資源 每臺(tái)節(jié)點(diǎn)的內(nèi)存大小和cpu核數(shù),
   比如:一共有20臺(tái)worker節(jié)點(diǎn),每臺(tái)節(jié)點(diǎn)8g內(nèi)存,10個(gè)cpu。
   實(shí)際任務(wù)在給定資源的時(shí)候,可以給20個(gè)executor、每個(gè)executor的內(nèi)存8g、每個(gè)executor的使用的cpu個(gè)數(shù)10。
  • ==Yarn模式==

先計(jì)算出yarn集群的所有大小,比如一共500g內(nèi)存,100個(gè)cpu;
   這個(gè)時(shí)候可以分配的最大資源,比如給定50個(gè)executor、每個(gè)executor的內(nèi)存大小10g,每個(gè)executor使用的cpu個(gè)數(shù)為2。
  • 使用原則

在資源比較充足的情況下,盡可能的使用更多的計(jì)算資源,盡量去調(diào)節(jié)到最大的大小

1.3 為什么調(diào)大資源以后性能可以提升

--executor-memory

--total-executor-cores

2. 提高并行度

2.1 Spark的并行度指的是什么

spark作業(yè)中,各個(gè)stage的task的數(shù)量,也就代表了spark作業(yè)在各個(gè)階段stage的并行度!
    當(dāng)分配完所能分配的最大資源了,然后對(duì)應(yīng)資源去調(diào)節(jié)程序的并行度,如果并行度沒(méi)有與資源相匹配,那么導(dǎo)致你分配下去的資源都浪費(fèi)掉了。同時(shí)并行運(yùn)行,還可以讓每個(gè)task要處理的數(shù)量變少(很簡(jiǎn)單的原理。合理設(shè)置并行度,可以充分利用集群資源,減少每個(gè)task處理數(shù)據(jù)量,而增加性能加快運(yùn)行速度。)

2.2 如何提高并行度

2.2.1 可以設(shè)置task的數(shù)量

至少設(shè)置成與spark Application 的總cpu core 數(shù)量相同。
最理想情況,150個(gè)core,分配150task,一起運(yùn)行,差不多同一時(shí)間運(yùn)行完畢
官方推薦,task數(shù)量,設(shè)置成spark Application 總cpu core數(shù)量的2~3倍 。
  
比如150個(gè)cpu core ,基本設(shè)置task數(shù)量為300~500. 與理想情況不同的,有些task會(huì)運(yùn)行快一點(diǎn),比如50s就完了,有些task 可能會(huì)慢一點(diǎn),要一分半才運(yùn)行完,所以如果你的task數(shù)量,剛好設(shè)置的跟cpu core 數(shù)量相同,可能會(huì)導(dǎo)致資源的浪費(fèi)。
  因?yàn)楸热?50個(gè)task中10個(gè)先運(yùn)行完了,剩余140個(gè)還在運(yùn)行,但是這個(gè)時(shí)候,就有10個(gè)cpu core空閑出來(lái)了,導(dǎo)致浪費(fèi)。如果設(shè)置2~3倍,那么一個(gè)task運(yùn)行完以后,另外一個(gè)task馬上補(bǔ)上來(lái),盡量讓cpu core不要空閑。同時(shí)盡量提升spark運(yùn)行效率和速度。提升性能。

2.2.2 如何設(shè)置task數(shù)量來(lái)提高并行度

設(shè)置參數(shù)spark.default.parallelism
   默認(rèn)是沒(méi)有值的,如果設(shè)置了值為10,它會(huì)在shuffle的過(guò)程才會(huì)起作用。
   比如 val rdd2 = rdd1.reduceByKey(_+_) 
   此時(shí)rdd2的分區(qū)數(shù)就是10
   
可以通過(guò)在構(gòu)建SparkConf對(duì)象的時(shí)候設(shè)置,例如:
   new SparkConf().set("spark.defalut.parallelism","500")

2.2.3 給RDD重新設(shè)置partition的數(shù)量

使用rdd.repartition 來(lái)重新分區(qū),該方法會(huì)生成一個(gè)新的rdd,使其分區(qū)數(shù)變大。
此時(shí)由于一個(gè)partition對(duì)應(yīng)一個(gè)task,那么對(duì)應(yīng)的task個(gè)數(shù)越多,通過(guò)這種方式也可以提高并行度。

2.2.4 提高sparksql運(yùn)行的task數(shù)量

http://spark.apache.org/docs/2.3.3/sql-programming-guide.html

通過(guò)設(shè)置參數(shù) spark.sql.shuffle.partitions=500  默認(rèn)為200;
可以適當(dāng)增大,來(lái)提高并行度。 比如設(shè)置為 spark.sql.shuffle.partitions=500

專(zhuān)門(mén)針對(duì)sparkSQL來(lái)設(shè)置的

3. RDD的重用和持久化

3.1 實(shí)際開(kāi)發(fā)遇到的情況說(shuō)明

如上圖所示的計(jì)算邏輯:
(1)當(dāng)?shù)谝淮问褂胷dd2做相應(yīng)的算子操作得到rdd3的時(shí)候,就會(huì)從rdd1開(kāi)始計(jì)算,先讀取HDFS上的文件,然后對(duì)rdd1做對(duì)應(yīng)的算子操作得到rdd2,再由rdd2計(jì)算之后得到rdd3。同樣為了計(jì)算得到rdd4,前面的邏輯會(huì)被重新計(jì)算。

(3)默認(rèn)情況下多次對(duì)一個(gè)rdd執(zhí)行算子操作,去獲取不同的rdd,都會(huì)對(duì)這個(gè)rdd及之前的父rdd全部重新計(jì)算一次。
這種情況在實(shí)際開(kāi)發(fā)代碼的時(shí)候會(huì)經(jīng)常遇到,但是我們一定要避免一個(gè)rdd重復(fù)計(jì)算多次,否則會(huì)導(dǎo)致性能急劇降低。

總結(jié):可以把多次使用到的rdd,也就是公共rdd進(jìn)行持久化,避免后續(xù)需要,再次重新計(jì)算,提升效率。

3.2 如何對(duì)rdd進(jìn)行持久化

  • 可以調(diào)用rdd的cache或者persist方法。

(1)cache方法默認(rèn)是把數(shù)據(jù)持久化到內(nèi)存中 ,例如:rdd.cache ,其本質(zhì)還是調(diào)用了persist方法
(2)persist方法中有豐富的緩存級(jí)別,這些緩存級(jí)別都定義在StorageLevel這個(gè)object中,可以結(jié)合實(shí)際的應(yīng)用場(chǎng)景合理的設(shè)置緩存級(jí)別。例如: rdd.persist(StorageLevel.MEMORY_ONLY),這是cache方法的實(shí)現(xiàn)。

3.3 rdd持久化的時(shí)可以采用序列化

(1)如果正常將數(shù)據(jù)持久化在內(nèi)存中,那么可能會(huì)導(dǎo)致內(nèi)存的占用過(guò)大,這樣的話,也許會(huì)導(dǎo)致OOM內(nèi)存溢出。
(2)當(dāng)純內(nèi)存無(wú)法支撐公共RDD數(shù)據(jù)完全存放的時(shí)候,就優(yōu)先考慮使用序列化的方式在純內(nèi)存中存儲(chǔ)。將RDD的每個(gè)partition的數(shù)據(jù),序列化成一個(gè)字節(jié)數(shù)組;序列化后,大大減少內(nèi)存的空間占用。
(3)序列化的方式,唯一的缺點(diǎn)就是,在獲取數(shù)據(jù)的時(shí)候,需要反序列化。但是可以減少占用的空間和便于網(wǎng)絡(luò)傳輸
(4)如果序列化純內(nèi)存方式,還是導(dǎo)致OOM,內(nèi)存溢出;就只能考慮磁盤(pán)的方式,內(nèi)存+磁盤(pán)的普通方式(無(wú)序列化)。
(5)為了數(shù)據(jù)的高可靠性,而且內(nèi)存充足,可以使用雙副本機(jī)制,進(jìn)行持久化
  持久化的雙副本機(jī)制,持久化后的一個(gè)副本,因?yàn)闄C(jī)器宕機(jī)了,副本丟了,就還是得重新計(jì)算一次;
  持久化的每個(gè)數(shù)據(jù)單元,存儲(chǔ)一份副本,放在其他節(jié)點(diǎn)上面,從而進(jìn)行容錯(cuò);
  一個(gè)副本丟了,不用重新計(jì)算,還可以使用另外一份副本。這種方式,僅僅針對(duì)你的內(nèi)存資源極度充足。
   比如: StorageLevel.MEMORY_ONLY_2

4. 廣播變量的使用

4.1 場(chǎng)景描述

在實(shí)際工作中可能會(huì)遇到這樣的情況,由于要處理的數(shù)據(jù)量非常大,這個(gè)時(shí)候可能會(huì)在一個(gè)stage中出現(xiàn)大量的task,比如有1000個(gè)task,這些task都需要一份相同的數(shù)據(jù)來(lái)處理業(yè)務(wù),這份數(shù)據(jù)的大小為100M,該數(shù)據(jù)會(huì)拷貝1000份副本,通過(guò)網(wǎng)絡(luò)傳輸?shù)礁鱾€(gè)task中去,給task使用。這里會(huì)涉及大量的網(wǎng)絡(luò)傳輸開(kāi)銷(xiāo),同時(shí)至少需要的內(nèi)存為1000*100M=100G,這個(gè)內(nèi)存開(kāi)銷(xiāo)是非常大的。不必要的內(nèi)存的消耗和占用,就導(dǎo)致了你在進(jìn)行RDD持久化到內(nèi)存,也許就沒(méi)法完全在內(nèi)存中放下;就只能寫(xiě)入磁盤(pán),最后導(dǎo)致后續(xù)的操作在磁盤(pán)IO上消耗性能;這對(duì)于spark任務(wù)處理來(lái)說(shuō)就是一場(chǎng)災(zāi)難。

    由于內(nèi)存開(kāi)銷(xiāo)比較大,task在創(chuàng)建對(duì)象的時(shí)候,可能會(huì)出現(xiàn)堆內(nèi)存放不下所有對(duì)象,就會(huì)導(dǎo)致頻繁的垃圾回收器的回收GC。GC的時(shí)候一定是會(huì)導(dǎo)致工作線程停止,也就是導(dǎo)致Spark暫停工作那么一點(diǎn)時(shí)間。頻繁GC的話,對(duì)Spark作業(yè)的運(yùn)行的速度會(huì)有相當(dāng)可觀的影響。

4.2 廣播變量引入

Spark中分布式執(zhí)行的代碼需要傳遞到各個(gè)executor的task上運(yùn)行。對(duì)于一些只讀、固定的數(shù)據(jù),每次都需要Driver廣播到各個(gè)Task上,這樣效率低下。廣播變量允許將變量只廣播給各個(gè)executor。該executor上的各個(gè)task再?gòu)乃诠?jié)點(diǎn)的BlockManager(負(fù)責(zé)管理某個(gè)executor對(duì)應(yīng)的內(nèi)存和磁盤(pán)上的數(shù)據(jù))獲取變量,而不是從Driver獲取變量,從而提升了效率。
廣播變量,初始的時(shí)候,就在Drvier上有一份副本。通過(guò)在Driver把共享數(shù)據(jù)轉(zhuǎn)換成廣播變量。

  task在運(yùn)行的時(shí)候,想要使用廣播變量中的數(shù)據(jù),此時(shí)首先會(huì)在自己本地的Executor對(duì)應(yīng)的BlockManager中,嘗試獲取變量副本;如果本地沒(méi)有,那么就從Driver遠(yuǎn)程拉取廣播變量副本,并保存在本地的BlockManager中;
  
  此后這個(gè)executor上的task,都會(huì)直接使用本地的BlockManager中的副本。那么這個(gè)時(shí)候所有該executor中的task都會(huì)使用這個(gè)廣播變量的副本。也就是說(shuō)一個(gè)executor只需要在第一個(gè)task啟動(dòng)時(shí),獲得一份廣播變量數(shù)據(jù),之后的task都從本節(jié)點(diǎn)的BlockManager中獲取相關(guān)數(shù)據(jù)。

  executor的BlockManager除了從driver上拉取,也可能從其他節(jié)點(diǎn)的BlockManager上拉取變量副本,網(wǎng)絡(luò)距離越近越好。

4.3 使用廣播變量后的性能分析

比如一個(gè)任務(wù)需要50個(gè)executor,1000個(gè)task,共享數(shù)據(jù)為100M。
(1)在不使用廣播變量的情況下,1000個(gè)task,就需要該共享數(shù)據(jù)的1000個(gè)副本,也就是說(shuō)有1000份數(shù)需要大量的網(wǎng)絡(luò)傳輸和內(nèi)存開(kāi)銷(xiāo)存儲(chǔ)。耗費(fèi)的內(nèi)存大小1000*100=100G.

(2)使用了廣播變量后,50個(gè)executor就只需要50個(gè)副本數(shù)據(jù),而且不一定都是從Driver傳輸?shù)矫總€(gè)節(jié)點(diǎn),還可能是就近從最近的節(jié)點(diǎn)的executor的blockmanager上拉取廣播變量副本,網(wǎng)絡(luò)傳輸速度大大增加;內(nèi)存開(kāi)銷(xiāo) 50*100M=5G

總結(jié):
  不使用廣播變量的內(nèi)存開(kāi)銷(xiāo)為100G,使用后的內(nèi)存開(kāi)銷(xiāo)5G,這里就相差了20倍左右的網(wǎng)絡(luò)傳輸性能損耗和內(nèi)存開(kāi)銷(xiāo),使用廣播變量后對(duì)于性能的提升和影響,還是很可觀的。
  
  廣播變量的使用不一定會(huì)對(duì)性能產(chǎn)生決定性的作用。比如運(yùn)行30分鐘的spark作業(yè),可能做了廣播變量以后,速度快了2分鐘,或者5分鐘。但是一點(diǎn)一滴的調(diào)優(yōu),積少成多。最后還是會(huì)有效果的。

4.4 廣播變量使用注意事項(xiàng)

(1)能不能將一個(gè)RDD使用廣播變量廣播出去?

       不能,因?yàn)镽DD是不存儲(chǔ)數(shù)據(jù)的??梢詫DD的結(jié)果廣播出去。

(2)廣播變量只能在Driver端定義,不能在Executor端定義。

(3)在Driver端可以修改廣播變量的值,在Executor端無(wú)法修改廣播變量的值。

(4)如果executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。

(5)如果Executor端用到了Driver的變量,如果使用廣播變量在每個(gè)Executor中只有一份Driver端的變量副本。

4.5 如何使用廣播變量

  • 例如

(1) 通過(guò)sparkContext的broadcast方法把數(shù)據(jù)轉(zhuǎn)換成廣播變量,類(lèi)型為Broadcast,
  val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5,6))
  
(2) 然后executor上的BlockManager就可以拉取該廣播變量的副本獲取具體的數(shù)據(jù)。
    獲取廣播變量中的值可以通過(guò)調(diào)用其value方法
   val array: Array[Int] = broadcastArray.value

5. 盡量避免使用shuffle類(lèi)算子

5.1 shuffle描述

spark中的shuffle涉及到數(shù)據(jù)要進(jìn)行大量的網(wǎng)絡(luò)傳輸,下游階段的task任務(wù)需要通過(guò)網(wǎng)絡(luò)拉取上階段task的輸出數(shù)據(jù),shuffle過(guò)程,簡(jiǎn)單來(lái)說(shuō),就是將分布在集群中多個(gè)節(jié)點(diǎn)上的同一個(gè)key,拉取到同一個(gè)節(jié)點(diǎn)上,進(jìn)行聚合或join等操作。比如reduceByKey、join等算子,都會(huì)觸發(fā)shuffle操作。
  
  如果有可能的話,要盡量避免使用shuffle類(lèi)算子。
  因?yàn)镾park作業(yè)運(yùn)行過(guò)程中,最消耗性能的地方就是shuffle過(guò)程。

5.2 哪些算子操作會(huì)產(chǎn)生shuffle

spark程序在開(kāi)發(fā)的過(guò)程中使用reduceByKey、join、distinct、repartition等算子操作,這里都會(huì)產(chǎn)生shuffle,由于shuffle這一塊是非常耗費(fèi)性能的,實(shí)際開(kāi)發(fā)中盡量使用map類(lèi)的非shuffle算子。這樣的話,沒(méi)有shuffle操作或者僅有較少shuffle操作的Spark作業(yè),可以大大減少性能開(kāi)銷(xiāo)。

5.3 如何避免產(chǎn)生shuffle

  • 小案例

//錯(cuò)誤的做法:
// 傳統(tǒng)的join操作會(huì)導(dǎo)致shuffle操作。
// 因?yàn)閮蓚€(gè)RDD中,相同的key都需要通過(guò)網(wǎng)絡(luò)拉取到一個(gè)節(jié)點(diǎn)上,由一個(gè)task進(jìn)行join操作。
val rdd3 = rdd1.join(rdd2)
    
//正確的做法:
// Broadcast+map的join操作,不會(huì)導(dǎo)致shuffle操作。
// 使用Broadcast將一個(gè)數(shù)據(jù)量較小的RDD作為廣播變量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// 在rdd1.map算子中,可以從rdd2DataBroadcast中,獲取rdd2的所有數(shù)據(jù)。
// 然后進(jìn)行遍歷,如果發(fā)現(xiàn)rdd2中某條數(shù)據(jù)的key與rdd1的當(dāng)前數(shù)據(jù)的key是相同的,那么就判定可以進(jìn)行join。
// 此時(shí)就可以根據(jù)自己需要的方式,將rdd1當(dāng)前數(shù)據(jù)與rdd2中可以連接的數(shù)據(jù),拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// 注意,以上操作,建議僅僅在rdd2的數(shù)據(jù)量比較少(比如幾百M(fèi),或者一兩G)的情況下使用。
// 因?yàn)槊總€(gè)Executor的內(nèi)存中,都會(huì)駐留一份rdd2的全量數(shù)據(jù)。

5.4 使用map-side預(yù)聚合的shuffle操作

  • map-side預(yù)聚合

如果因?yàn)闃I(yè)務(wù)需要,一定要使用shuffle操作,無(wú)法用map類(lèi)的算子來(lái)替代,那么盡量使用可以map-side預(yù)聚合的算子。

  所謂的map-side預(yù)聚合,說(shuō)的是在每個(gè)節(jié)點(diǎn)本地對(duì)相同的key進(jìn)行一次聚合操作,類(lèi)似于MapReduce中的本地combiner。
  map-side預(yù)聚合之后,每個(gè)節(jié)點(diǎn)本地就只會(huì)有一條相同的key,因?yàn)槎鄺l相同的key都被聚合起來(lái)了。其他節(jié)點(diǎn)在拉取所有節(jié)點(diǎn)上的相同key時(shí),就會(huì)大大減少需要拉取的數(shù)據(jù)數(shù)量,從而也就減少了磁盤(pán)IO以及網(wǎng)絡(luò)傳輸開(kāi)銷(xiāo)。
  通常來(lái)說(shuō),在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來(lái)替代掉groupByKey算子。因?yàn)閞educeByKey和aggregateByKey算子都會(huì)使用用戶(hù)自定義的函數(shù)對(duì)每個(gè)節(jié)點(diǎn)本地的相同key進(jìn)行預(yù)聚合。
  而groupByKey算子是不會(huì)進(jìn)行預(yù)聚合的,全量的數(shù)據(jù)會(huì)在集群的各個(gè)節(jié)點(diǎn)之間分發(fā)和傳輸,性能相對(duì)來(lái)說(shuō)比較差。
  
  比如如下兩幅圖,就是典型的例子,分別基于reduceByKey和groupByKey進(jìn)行單詞計(jì)數(shù)。其中第一張圖是groupByKey的原理圖,可以看到,沒(méi)有進(jìn)行任何本地聚合時(shí),所有數(shù)據(jù)都會(huì)在集群節(jié)點(diǎn)之間傳輸;第二張圖是reduceByKey的原理圖,可以看到,每個(gè)節(jié)點(diǎn)本地的相同key數(shù)據(jù),都進(jìn)行了預(yù)聚合,然后才傳輸?shù)狡渌?jié)點(diǎn)上進(jìn)行全局聚合。
  • ==groupByKey進(jìn)行單詞計(jì)數(shù)原理==

  • ==reduceByKey單詞計(jì)數(shù)原理==

6. 使用高性能的算子

6.1 使用reduceByKey/aggregateByKey替代groupByKey

  • reduceByKey/aggregateByKey 可以進(jìn)行預(yù)聚合操作,減少數(shù)據(jù)的傳輸量,提升性能

  • groupByKey 不會(huì)進(jìn)行預(yù)聚合操作,進(jìn)行數(shù)據(jù)的全量拉取,性能比較低

6.2 使用mapPartitions替代普通map

mapPartitions類(lèi)的算子,一次函數(shù)調(diào)用會(huì)處理一個(gè)partition所有的數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條,性能相對(duì)來(lái)說(shuō)會(huì)高一些。
  但是有的時(shí)候,使用mapPartitions會(huì)出現(xiàn)OOM(內(nèi)存溢出)的問(wèn)題。因?yàn)閱未魏瘮?shù)調(diào)用就要處理掉一個(gè)partition所有的數(shù)據(jù),如果內(nèi)存不夠,垃圾回收時(shí)是無(wú)法回收掉太多對(duì)象的,很可能出現(xiàn)OOM異常。所以使用這類(lèi)操作時(shí)要慎重!

6.3 使用foreachPartition替代foreach

原理類(lèi)似于“使用mapPartitions替代map”,也是一次函數(shù)調(diào)用處理一個(gè)partition的所有數(shù)據(jù),而不是一次函數(shù)調(diào)用處理一條數(shù)據(jù)。
  在實(shí)踐中發(fā)現(xiàn),foreachPartitions類(lèi)的算子,對(duì)性能的提升還是很有幫助的。比如在foreach函數(shù)中,將RDD中所有數(shù)據(jù)寫(xiě)MySQL,那么如果是普通的foreach算子,就會(huì)一條數(shù)據(jù)一條數(shù)據(jù)地寫(xiě),每次函數(shù)調(diào)用可能就會(huì)創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接,此時(shí)就勢(shì)必會(huì)頻繁地創(chuàng)建和銷(xiāo)毀數(shù)據(jù)庫(kù)連接,性能是非常低下;  但是如果用foreachPartitions算子一次性處理一個(gè)partition的數(shù)據(jù),那么對(duì)于每個(gè)partition,只要?jiǎng)?chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接即可,然后執(zhí)行批量插入操作,此時(shí)性能是比較高的。實(shí)踐中發(fā)現(xiàn),對(duì)于1萬(wàn)條左右的數(shù)據(jù)量寫(xiě)MySQL,性能可以提升30%以上。

6.4 使用filter之后進(jìn)行coalesce操作

通常對(duì)一個(gè)RDD執(zhí)行filter算子過(guò)濾掉RDD中較多數(shù)據(jù)后(比如30%以上的數(shù)據(jù)),建議使用coalesce算子,手動(dòng)減少RDD的partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition中去。
  因?yàn)閒ilter之后,RDD的每個(gè)partition中都會(huì)有很多數(shù)據(jù)被過(guò)濾掉,此時(shí)如果照常進(jìn)行后續(xù)的計(jì)算,其實(shí)每個(gè)task處理的partition中的數(shù)據(jù)量并不是很多,有一點(diǎn)資源浪費(fèi),而且此時(shí)處理的task越多,可能速度反而越慢。
  因此用coalesce減少partition數(shù)量,將RDD中的數(shù)據(jù)壓縮到更少的partition之后,只要使用更少的task即可處理完所有的partition。在某些場(chǎng)景下,對(duì)于性能的提升會(huì)有一定的幫助。

6.5 使用repartitionAndSortWithinPartitions替代repartition與sort類(lèi)操作

repartitionAndSortWithinPartitions是Spark官網(wǎng)推薦的一個(gè)算子,官方建議,如果需要在repartition重分區(qū)之后,還要進(jìn)行排序,建議直接使用repartitionAndSortWithinPartitions算子。
  因?yàn)樵撍阕涌梢砸贿呥M(jìn)行重分區(qū)的shuffle操作,一邊進(jìn)行排序。shuffle與sort兩個(gè)操作同時(shí)進(jìn)行,比先shuffle再sort來(lái)說(shuō),性能可能是要高的。

7. 使用Kryo優(yōu)化序列化性能

7.1 spark序列化介紹

Spark在進(jìn)行任務(wù)計(jì)算的時(shí)候,會(huì)涉及到數(shù)據(jù)跨進(jìn)程的網(wǎng)絡(luò)傳輸、數(shù)據(jù)的持久化,這個(gè)時(shí)候就需要對(duì)數(shù)據(jù)進(jìn)行序列化。Spark默認(rèn)采用Java的序列化器。默認(rèn)java序列化的優(yōu)缺點(diǎn)如下:
其好處:
  處理起來(lái)方便,不需要我們手動(dòng)做其他操作,只是在使用一個(gè)對(duì)象和變量的時(shí)候,需要實(shí)現(xiàn)Serializble接口。
其缺點(diǎn):
  默認(rèn)的序列化機(jī)制的效率不高,序列化的速度比較慢;序列化以后的數(shù)據(jù),占用的內(nèi)存空間相對(duì)還是比較大。

Spark支持使用Kryo序列化機(jī)制。Kryo序列化機(jī)制,比默認(rèn)的Java序列化機(jī)制,速度要快,序列化后的數(shù)據(jù)要更小,大概是Java序列化機(jī)制的1/10。所以Kryo序列化優(yōu)化以后,可以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少;在集群中耗費(fèi)的內(nèi)存資源大大減少。

7.2 Kryo序列化啟用后生效的地方

Kryo序列化機(jī)制,一旦啟用以后,會(huì)生效的幾個(gè)地方:
(1)算子函數(shù)中使用到的外部變量
  算子中的外部變量可能來(lái)著與driver需要涉及到網(wǎng)絡(luò)傳輸,就需要用到序列化。
      最終可以?xún)?yōu)化網(wǎng)絡(luò)傳輸?shù)男阅?,?yōu)化集群中內(nèi)存的占用和消耗
    
(2)持久化RDD時(shí)進(jìn)行序列化,StorageLevel.MEMORY_ONLY_SER
  將rdd持久化時(shí),對(duì)應(yīng)的存儲(chǔ)級(jí)別里,需要用到序列化。
      最終可以?xún)?yōu)化內(nèi)存的占用和消耗;持久化RDD占用的內(nèi)存越少,task執(zhí)行的時(shí)候,創(chuàng)建的對(duì)象,就不至于頻繁的占滿(mǎn)內(nèi)存,頻繁發(fā)生GC。
    
(3)  產(chǎn)生shuffle的地方,也就是寬依賴(lài)
  下游的stage中的task,拉取上游stage中的task產(chǎn)生的結(jié)果數(shù)據(jù),跨網(wǎng)絡(luò)傳輸,需要用到序列化。最終可以?xún)?yōu)化網(wǎng)絡(luò)傳輸?shù)男阅?/pre>

7.3 如何開(kāi)啟Kryo序列化機(jī)制

// 創(chuàng)建SparkConf對(duì)象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設(shè)置序列化器為KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

// 注冊(cè)要序列化的自定義類(lèi)型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

8. 使用fastutil優(yōu)化數(shù)據(jù)格式

8.1 fastutil介紹

fastutil是擴(kuò)展了Java標(biāo)準(zhǔn)集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類(lèi)庫(kù),提供了特殊類(lèi)型的map、set、list和queue;

fastutil能夠提供更小的內(nèi)存占用,更快的存取速度;我們使用fastutil提供的集合類(lèi),來(lái)替代自己平時(shí)使用的JDK的原生的Map、List、Set.

8.2 fastutil好處

fastutil集合類(lèi),可以減小內(nèi)存的占用,并且在進(jìn)行集合的遍歷、根據(jù)索引(或者key)獲取元素的值和設(shè)置元素的值的時(shí)候,提供更快的存取速度

8.3 Spark中應(yīng)用fastutil的場(chǎng)景和使用

8.3.1 算子函數(shù)使用了外部變量

(1)你可以使用Broadcast廣播變量?jī)?yōu)化;

(2)可以使用Kryo序列化類(lèi)庫(kù),提升序列化性能和效率;

(3)如果外部變量是某種比較大的集合,那么可以考慮使用fastutil改寫(xiě)外部變量;

首先從源頭上就減少內(nèi)存的占用(fastutil),通過(guò)廣播變量進(jìn)一步減少內(nèi)存占用,再通過(guò)Kryo序列化類(lèi)庫(kù)進(jìn)一步減少內(nèi)存占用。

8.3.2 算子函數(shù)里使用了比較大的集合Map/List

在你的算子函數(shù)里,也就是task要執(zhí)行的計(jì)算邏輯里面,如果有邏輯中,出現(xiàn),要?jiǎng)?chuàng)建比較大的Map、List等集合,
可能會(huì)占用較大的內(nèi)存空間,而且可能涉及到消耗性能的遍歷、存取等集合操作; 
那么此時(shí),可以考慮將這些集合類(lèi)型使用fastutil類(lèi)庫(kù)重寫(xiě),

使用了fastutil集合類(lèi)以后,就可以在一定程度上,減少task創(chuàng)建出來(lái)的集合類(lèi)型的內(nèi)存占用。 
避免executor內(nèi)存頻繁占滿(mǎn),頻繁喚起GC,導(dǎo)致性能下降。

8.3.3 fastutil的使用

第一步:在pom.xml中引用fastutil的包
    <dependency>
      <groupId>fastutil</groupId>
      <artifactId>fastutil</artifactId>
      <version>5.0.9</version>
    </dependency>
    
第二步:平時(shí)使用List (Integer)的替換成IntList即可。 
  List<Integer>的list對(duì)應(yīng)的到fastutil就是IntList類(lèi)型
  
  
使用說(shuō)明:
基本都是類(lèi)似于IntList的格式,前綴就是集合的元素類(lèi)型; 
特殊的就是Map,Int2IntMap,代表了key-value映射的元素類(lèi)型。

9. 調(diào)節(jié)數(shù)據(jù)本地化等待時(shí)長(zhǎng)

Spark在Driver上對(duì)Application的每一個(gè)stage的task進(jìn)行分配之前,都會(huì)計(jì)算出每個(gè)task要計(jì)算的是哪個(gè)分片數(shù)據(jù),RDD的某個(gè)partition;Spark的task分配算法,優(yōu)先會(huì)希望每個(gè)task正好分配到它要計(jì)算的數(shù)據(jù)所在的節(jié)點(diǎn),這樣的話就不用在網(wǎng)絡(luò)間傳輸數(shù)據(jù);

  但是通常來(lái)說(shuō),有時(shí)事與愿違,可能task沒(méi)有機(jī)會(huì)分配到它的數(shù)據(jù)所在的節(jié)點(diǎn),為什么呢,可能那個(gè)節(jié)點(diǎn)的計(jì)算資源和計(jì)算能力都滿(mǎn)了;所以這種時(shí)候,通常來(lái)說(shuō),Spark會(huì)等待一段時(shí)間,默認(rèn)情況下是3秒(不是絕對(duì)的,還有很多種情況,對(duì)不同的本地化級(jí)別,都會(huì)去等待),到最后實(shí)在是等待不了了,就會(huì)選擇一個(gè)比較差的本地化級(jí)別,比如說(shuō)將task分配到距離要計(jì)算的數(shù)據(jù)所在節(jié)點(diǎn)比較近的一個(gè)節(jié)點(diǎn),然后進(jìn)行計(jì)算。

9.1 本地化級(jí)別

(1)PROCESS_LOCAL:進(jìn)程本地化
  代碼和數(shù)據(jù)在同一個(gè)進(jìn)程中,也就是在同一個(gè)executor中;計(jì)算數(shù)據(jù)的task由executor執(zhí)行,數(shù)據(jù)在executor的BlockManager中;性能最好
(2)NODE_LOCAL:節(jié)點(diǎn)本地化
  代碼和數(shù)據(jù)在同一個(gè)節(jié)點(diǎn)中;比如說(shuō)數(shù)據(jù)作為一個(gè)HDFS block塊,就在節(jié)點(diǎn)上,而task在節(jié)點(diǎn)上某個(gè)executor中運(yùn)行;或者是數(shù)據(jù)和task在一個(gè)節(jié)點(diǎn)上的不同executor中;數(shù)據(jù)需要在進(jìn)程間進(jìn)行傳輸;性能其次
(3)RACK_LOCAL:機(jī)架本地化  
  數(shù)據(jù)和task在一個(gè)機(jī)架的兩個(gè)節(jié)點(diǎn)上;數(shù)據(jù)需要通過(guò)網(wǎng)絡(luò)在節(jié)點(diǎn)之間進(jìn)行傳輸; 性能比較差
(4)  ANY:無(wú)限制
  數(shù)據(jù)和task可能在集群中的任何地方,而且不在一個(gè)機(jī)架中;性能最差

9.2 數(shù)據(jù)本地化等待時(shí)長(zhǎng)

spark.locality.wait,默認(rèn)是3s
首先采用最佳的方式,等待3s后降級(jí),還是不行,繼續(xù)降級(jí)...,最后還是不行,只能夠采用最差的。

9.3 如何調(diào)節(jié)參數(shù)并且測(cè)試

修改spark.locality.wait參數(shù),默認(rèn)是3s,可以增加

下面是每個(gè)數(shù)據(jù)本地化級(jí)別的等待時(shí)間,默認(rèn)都是跟spark.locality.wait時(shí)間相同,
默認(rèn)都是3s(可查看spark官網(wǎng)對(duì)應(yīng)參數(shù)說(shuō)明,如下圖所示)
spark.locality.wait.node
spark.locality.wait.process
spark.locality.wait.rack
在代碼中設(shè)置:
new SparkConf().set("spark.locality.wait","10")

然后把程序提交到spark集群中運(yùn)行,注意觀察日志,spark作業(yè)的運(yùn)行日志,推薦大家在測(cè)試的時(shí)候,先用client模式,在本地就直接可以看到比較全的日志。 
日志里面會(huì)顯示,starting task .... PROCESS LOCAL、NODE LOCAL.....
例如:
Starting task 0.0 in stage 1.0 (TID 2, 192.168.200.102, partition 0, NODE_LOCAL, 5254 bytes)

觀察大部分task的數(shù)據(jù)本地化級(jí)別 
如果大多都是PROCESS_LOCAL,那就不用調(diào)節(jié)了。如果是發(fā)現(xiàn),好多的級(jí)別都是NODE_LOCAL、ANY,那么最好就去調(diào)節(jié)一下數(shù)據(jù)本地化的等待時(shí)長(zhǎng)。應(yīng)該是要反復(fù)調(diào)節(jié),每次調(diào)節(jié)完以后,再來(lái)運(yùn)行,觀察日志 
看看大部分的task的本地化級(jí)別有沒(méi)有提升;看看整個(gè)spark作業(yè)的運(yùn)行時(shí)間有沒(méi)有縮短。

注意注意:
在調(diào)節(jié)參數(shù)、運(yùn)行任務(wù)的時(shí)候,別本末倒置,本地化級(jí)別倒是提升了, 但是因?yàn)榇罅康牡却龝r(shí)長(zhǎng),spark作業(yè)的運(yùn)行時(shí)間反而增加了,那就還是不要調(diào)節(jié)了。

10. 基于Spark內(nèi)存模型調(diào)優(yōu)

10.1 spark中executor內(nèi)存劃分

  • Executor的內(nèi)存主要分為三塊

    • 第一塊是讓task執(zhí)行我們自己編寫(xiě)的代碼時(shí)使用;

    • 第二塊是讓task通過(guò)shuffle過(guò)程拉取了上一個(gè)stage的task的輸出后,進(jìn)行聚合等操作時(shí)使用

    • 第三塊是讓RDD緩存時(shí)使用

10.2 spark的內(nèi)存模型

在spark1.6版本以前 spark的executor使用的靜態(tài)內(nèi)存模型,但是在spark1.6開(kāi)始,多增加了一個(gè)統(tǒng)一內(nèi)存模型。
  通過(guò)spark.memory.useLegacyMode 這個(gè)參數(shù)去配置
      默認(rèn)這個(gè)值是false,代表用的是新的動(dòng)態(tài)內(nèi)存模型;
      如果想用以前的靜態(tài)內(nèi)存模型,那么就要把這個(gè)值改為true。

10.2.1 靜態(tài)內(nèi)存模型

實(shí)際上就是把我們的一個(gè)executor分成了三部分,
  一部分是Storage內(nèi)存區(qū)域,
  一部分是execution區(qū)域,
  還有一部分是其他區(qū)域。如果使用的靜態(tài)內(nèi)存模型,那么用這幾個(gè)參數(shù)去控制:
  
spark.storage.memoryFraction:默認(rèn)0.6
spark.shuffle.memoryFraction:默認(rèn)0.2  
所以第三部分就是0.2

如果我們cache數(shù)據(jù)量比較大,或者是我們的廣播變量比較大,
  那我們就把spark.storage.memoryFraction這個(gè)值調(diào)大一點(diǎn)。
  但是如果我們代碼里面沒(méi)有廣播變量,也沒(méi)有cache,shuffle又比較多,那我們要把spark.shuffle.memoryFraction 這值調(diào)大。
  • 靜態(tài)內(nèi)存模型的缺點(diǎn)

我們配置好了Storage內(nèi)存區(qū)域和execution區(qū)域后,我們的一個(gè)任務(wù)假設(shè)execution內(nèi)存不夠用了,但是它的Storage內(nèi)存區(qū)域是空閑的,兩個(gè)之間不能互相借用,不夠靈活,所以才出來(lái)我們新的統(tǒng)一內(nèi)存模型。

10.2.2 統(tǒng)一內(nèi)存模型

動(dòng)態(tài)內(nèi)存模型先是預(yù)留了300m內(nèi)存,防止內(nèi)存溢出。動(dòng)態(tài)內(nèi)存模型把整體內(nèi)存分成了兩部分,
由這個(gè)參數(shù)表示spark.memory.fraction 這個(gè)指的默認(rèn)值是0.6 代表另外的一部分是0.4,

然后spark.memory.fraction 這部分又劃分成為兩個(gè)小部分。這兩小部分共占整體內(nèi)存的0.6 .這兩部分其實(shí)就是:Storage內(nèi)存和execution內(nèi)存。由spark.memory.storageFraction 這個(gè)參數(shù)去調(diào)配,因?yàn)閮蓚€(gè)共占0.6。如果spark.memory.storageFraction這個(gè)值配的是0.5,那說(shuō)明這0.6里面 storage占了0.5,也就是executor占了0.3 。
  • 統(tǒng)一內(nèi)存模型有什么特點(diǎn)呢?

Storage內(nèi)存和execution內(nèi)存 可以相互借用。不用像靜態(tài)內(nèi)存模型那樣死板,但是是有規(guī)則的
為什么受傷的都是storage呢?

是因?yàn)閑xecution里面的數(shù)據(jù)是馬上就要用的,而storage里的數(shù)據(jù)不一定馬上就要用。

10.2.3 任務(wù)提交腳本參考

  • 以下是一份spark-submit命令的示例,大家可以參考一下,并根據(jù)自己的實(shí)際情況進(jìn)行調(diào)節(jié)

bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

10.2.4 個(gè)人經(jīng)驗(yàn)

java.lang.OutOfMemoryError
ExecutorLostFailure
Executor exit code 為143
executor lost
hearbeat time out
shuffle file lost

如果遇到以上問(wèn)題,很有可能就是內(nèi)存除了問(wèn)題,可以先嘗試增加內(nèi)存。如果還是解決不了,那么請(qǐng)聽(tīng)下一次數(shù)據(jù)傾斜調(diào)優(yōu)的課。

關(guān)于大數(shù)據(jù)開(kāi)發(fā)中Spark調(diào)優(yōu)常用手段是什么問(wèn)題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒(méi)有解開(kāi),可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。

網(wǎng)站欄目:大數(shù)據(jù)開(kāi)發(fā)中Spark調(diào)優(yōu)常用手段是什么
分享鏈接:http://chinadenli.net/article48/gidhep.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供服務(wù)器托管、外貿(mào)建站、響應(yīng)式網(wǎng)站、小程序開(kāi)發(fā)、品牌網(wǎng)站建設(shè)、域名注冊(cè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁(yè)設(shè)計(jì)公司