這篇文章給大家分享的是有關(guān)spark sql是如何變成執(zhí)行計(jì)劃的的內(nèi)容。小編覺(jué)得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過(guò)來(lái)看看吧。
成都創(chuàng)新互聯(lián)公司服務(wù)項(xiàng)目包括鯉城網(wǎng)站建設(shè)、鯉城網(wǎng)站制作、鯉城網(wǎng)頁(yè)制作以及鯉城網(wǎng)絡(luò)營(yíng)銷(xiāo)策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,鯉城網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到鯉城省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
Spark SQL模塊,主要就是處理跟SQL解析相關(guān)的一些內(nèi)容,說(shuō)得更通俗點(diǎn)就是怎么把一個(gè)SQL語(yǔ)句解析成Dataframe或者說(shuō)RDD的任務(wù)。以Spark 2.4.3為例,Spark SQL這個(gè)大模塊分為三個(gè)子模塊,如下圖所示

其中Catalyst可以說(shuō)是Spark內(nèi)部專門(mén)用來(lái)解析SQL的一個(gè)框架,在Hive中類(lèi)似的框架是Calcite(將SQL解析成MapReduce任務(wù))。Catalyst將SQL解析任務(wù)分成好幾個(gè)階段
而Core模塊其實(shí)就是Spark SQL主要解析的流程,當(dāng)然這個(gè)過(guò)程中會(huì)去調(diào)用Catalyst的一些內(nèi)容。這模塊里面比較常用的類(lèi)包括SparkSession,DataSet等。

主要流程大概可以分為以下幾步:
Parser:Sql語(yǔ)句經(jīng)過(guò)Antlr4解析,生成Unresolved Logical Plan
Analysis:analyzer與catalog進(jìn)行綁定(catlog存儲(chǔ)元數(shù)據(jù)),生成Logical Plan;
Logical Optimizations:optimizer對(duì)Logical Plan優(yōu)化,生成Optimized LogicalPlan;
Physical Planning:前面的 logical plan 不能被 Spark 執(zhí)行,而這個(gè)過(guò)程是把 logical plan 轉(zhuǎn)換成多個(gè) physical plans,然后利用代價(jià)模型(cost model)選擇最佳的 physical plan;
prepareForExecution()將 Physical Plan 轉(zhuǎn)換成 executed Physical Plan;
Code Generation:這個(gè)過(guò)程會(huì)把 SQL 查詢生成 Java 字節(jié)碼。
execute()執(zhí)行可執(zhí)行物理計(jì)劃,得到RDD;
-- t1 id,value,cid,did 1,1,1,1 10,1,1,2 -- t2 id,value,cid,did 10,1,1,1 10,1,1,1 SELECT sum(v) FROM ( SELECT t1.id, 1 + 2 + t1.value AS v FROM t1 JOIN t2 WHERE t1.id = t2.id AND t1.cid = 1 AND t1.did = t1.cid + 1 AND t2.id > 5 ) iteblog
調(diào)用詞法分析器 SqlBaseLexer.java 和語(yǔ)法分析器SqlBaseParser.java構(gòu)建語(yǔ)法樹(shù)。生成語(yǔ)法樹(shù)之后,使用 AstBuilder 將語(yǔ)法樹(shù)轉(zhuǎn)換成 LogicalPlan,這個(gè) LogicalPlan 也被稱為 Unresolved LogicalPlan。解析后的邏輯計(jì)劃如下:
== Parsed Logical Plan ==
'Project [unresolvedalias('sum('v), None)]
+- 'SubqueryAlias iteblog
+- 'Project ['t1.id, ((1 + 2) + 't1.value) AS v#16]
+- 'Filter ((('t1.id = 't2.id) AND ('t1.cid = 1)) AND (('t1.did = ('t1.cid + 1)) AND ('t2.id > 5)))
+- 'Join Inner
:- 'UnresolvedRelation [t1]
+- 'UnresolvedRelation [t2]Unresolved LogicalPlan是按照sql直接翻譯過(guò)來(lái)的,可以對(duì)照著SQL從下往上看的,t1 和 t2 兩張表被生成了 UnresolvedRelation。
在 SQL 解析階段生成了 Unresolved LogicalPlan,從上圖可以看出邏輯算子樹(shù)中包含了 UnresolvedRelation 和 unresolvedalias 等對(duì)象。Unresolved LogicalPlan 僅僅是一種數(shù)據(jù)結(jié)構(gòu),不包含任何數(shù)據(jù)信息,比如不知道數(shù)據(jù)源、數(shù)據(jù)類(lèi)型,不同的列來(lái)自于哪張表等。Analyzer 階段會(huì)使用事先定義好的 Rule 以及 SessionCatalog 等信息對(duì) Unresolved LogicalPlan 進(jìn)行 transform。SessionCatalog 主要用于各種函數(shù)資源信息和元數(shù)據(jù)信息(數(shù)據(jù)庫(kù)、數(shù)據(jù)表、數(shù)據(jù)視圖、數(shù)據(jù)分區(qū)與函數(shù)等)的統(tǒng)一管理。而Rule 是定義在 Analyzer 里面的,如下具體如下:
lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.ResolveCoalesceHints,
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions :: //解析表的函數(shù)
ResolveRelations :: //解析表或視圖
ResolveReferences :: //解析列
ResolveCreateNamedStruct ::
ResolveDeserializer :: //解析反序列化操作類(lèi)
ResolveNewInstance ::
ResolveUpCast :: //解析類(lèi)型轉(zhuǎn)換
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions :: //解析函數(shù)
ResolveAliases :: //解析表別名
ResolveSubquery :: //解析子查詢
ResolveSubqueryColumnAliases ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ResolveOutputRelation ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables(conf) ::
ResolveHigherOrderFunctions(catalog) ::
ResolveLambdaVariables(conf) ::
ResolveTimeZone(conf) ::
ResolveRandomSeed ::
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
Batch("View", Once,
AliasViewChild(conf)),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("FixNullability", Once,
FixNullability),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
)補(bǔ)充表的信息,比如字段、類(lèi)型,綁定select、where各種字段和表的關(guān)系。綁定之后:
== Analyzed Logical Plan == sum(v): bigint Aggregate [sum(cast(v#16 as bigint)) AS sum(v)#22L] +- SubqueryAlias iteblog +- Project [id#0, ((1 + 2) + value#1) AS v#16] +- Filter (((id#0 = id#8) AND (cid#2 = 1)) AND ((did#3 = (cid#2 + 1)) AND (id#8 > 5))) +- Join Inner :- SubqueryAlias t1 : +- Relation[id#0,value#1,cid#2,did#3] parquet +- SubqueryAlias t2 +- Relation[id#8,value#9,cid#10,did#11] parquet
表的字段信息補(bǔ)全,文件來(lái)自parquet
跟之后的join、filter等等的字段做綁定
sum被解析成 Aggregate 函數(shù)

對(duì) Unresolved LogicalPlan 進(jìn)行相關(guān) transform 操作得到了 Analyzed Logical Plan,這個(gè) Analyzed Logical Plan 是可以直接轉(zhuǎn)換成 Physical Plan 然后在 Spark 中執(zhí)行。但是如果直接這么弄的話,得到的 Physical Plan 很可能不是最優(yōu)的,因?yàn)樵趯?shí)際應(yīng)用中,很多低效的寫(xiě)法會(huì)帶來(lái)執(zhí)行效率的問(wèn)題,需要進(jìn)一步對(duì)Analyzed Logical Plan 進(jìn)行處理,得到更優(yōu)的邏輯算子樹(shù)。于是, 針對(duì) SQL 邏輯算子樹(shù)的優(yōu)化器 Optimizer 應(yīng)運(yùn)而生。
這個(gè)階段的優(yōu)化器主要是基于規(guī)則的(Rule-based Optimizer,簡(jiǎn)稱 RBO),而絕大部分的規(guī)則都是啟發(fā)式規(guī)則,也就是基于直觀或經(jīng)驗(yàn)而得出的規(guī)則,比如列裁剪(過(guò)濾掉查詢不需要使用到的列)、謂詞下推(將過(guò)濾盡可能地下沉到數(shù)據(jù)源端)、常量累加(比如 1 + 2 這種事先計(jì)算好) 以及常量替換(比如 SELECT * FROM table WHERE i = 5 AND j = i + 3 可以轉(zhuǎn)換成 SELECT * FROM table WHERE i = 5 AND j = 8)等等。
與前文介紹綁定邏輯計(jì)劃階段類(lèi)似,這個(gè)階段所有的規(guī)則也是實(shí)現(xiàn) Rule 抽象類(lèi),多個(gè)規(guī)則組成一個(gè) Batch,多個(gè) Batch 組成一個(gè) batches,同樣也是在 RuleExecutor 中進(jìn)行執(zhí)行,由于前文已經(jīng)介紹了 Rule 的執(zhí)行過(guò)程,本節(jié)就不再贅述。
那么針對(duì)前文的 SQL 語(yǔ)句,這個(gè)過(guò)程都會(huì)執(zhí)行哪些優(yōu)化呢?這里按照 Rule 執(zhí)行順序一一進(jìn)行說(shuō)明。
謂詞下推在 Spark SQL 是由 PushDownPredicate 實(shí)現(xiàn)的,這個(gè)過(guò)程主要將過(guò)濾條件盡可能地下推到底層,最好是數(shù)據(jù)源。所以針對(duì)我們上面介紹的 SQL,使用謂詞下推優(yōu)化得到的邏輯計(jì)劃如下

從上圖可以看出,經(jīng)過(guò)列裁剪后,t1 表只需要查詢 id 和 value 兩個(gè)字段;t2 表只需要查詢 id 字段。這樣減少了數(shù)據(jù)的傳輸,而且如果底層的文件格式為列存(比如 Parquet),可以大大提高數(shù)據(jù)的掃描速度的。
常量替換在 Spark SQL 是由 ConstantPropagation 實(shí)現(xiàn)的。也就是將變量替換成常量,比如 SELECT * FROM table WHERE i = 5 AND j = i + 3 可以轉(zhuǎn)換成 SELECT * FROM table WHERE i = 5 AND j = 8。這個(gè)看起來(lái)好像沒(méi)什么的,但是如果掃描的行數(shù)非常多可以減少很多的計(jì)算時(shí)間的開(kāi)銷(xiāo)的。經(jīng)過(guò)這個(gè)優(yōu)化,得到的邏輯計(jì)劃如下:

優(yōu)化后的邏輯計(jì)劃:
== Optimized Logical Plan == Aggregate [sum(cast(v#16 as bigint)) AS sum(v)#22L] +- Project [(3 + value#1) AS v#16] +- Join Inner, (id#0 = id#8) :- Project [id#0, value#1] : +- Filter (((((isnotnull(cid#2) AND isnotnull(did#3)) AND (cid#2 = 1)) AND (did#3 = 2)) AND (id#0 > 5)) AND isnotnull(id#0)) : +- Relation[id#0,value#1,cid#2,did#3] parquet +- Project [id#8] +- Filter (isnotnull(id#8) AND (id#8 > 5)) +- Relation[id#8,value#9,cid#10,did#11] parquet
到這里,優(yōu)化邏輯計(jì)劃階段就算完成了。另外,Spark 內(nèi)置提供了多達(dá)70個(gè)優(yōu)化 Rule
前面介紹的邏輯計(jì)劃在 Spark 里面其實(shí)并不能被執(zhí)行的,為了能夠執(zhí)行這個(gè) SQL,一定需要翻譯成物理計(jì)劃,到這個(gè)階段 Spark 就知道如何執(zhí)行這個(gè) SQL 了。和前面邏輯計(jì)劃綁定和優(yōu)化不一樣,這里使用的是策略(Strategy),而且前面介紹的邏輯計(jì)劃綁定和優(yōu)化經(jīng)過(guò) Transformations 動(dòng)作之后,樹(shù)的類(lèi)型并沒(méi)有改變,也就是說(shuō):Expression 經(jīng)過(guò) Transformations 之后得到的還是 Transformations ;Logical Plan 經(jīng)過(guò) Transformations 之后得到的還是 Logical Plan。而到了這個(gè)階段,經(jīng)過(guò) Transformations 動(dòng)作之后,樹(shù)的類(lèi)型改變了,由 Logical Plan 轉(zhuǎn)換成 Physical Plan 了。
一個(gè)邏輯計(jì)劃(Logical Plan)經(jīng)過(guò)一系列的策略處理之后,得到多個(gè)物理計(jì)劃(Physical Plans),物理計(jì)劃在 Spark 是由 SparkPlan 實(shí)現(xiàn)的。多個(gè)物理計(jì)劃再經(jīng)過(guò)代價(jià)模型(Cost Model)得到選擇后的物理計(jì)劃(Selected Physical Plan)。
Cost Model 對(duì)應(yīng)的就是基于代價(jià)的優(yōu)化(Cost-based Optimizations,CBO),核心思想是計(jì)算每個(gè)物理計(jì)劃的代價(jià),然后得到最優(yōu)的物理計(jì)劃。
== Physical Plan == *(3) HashAggregate(keys=[], functions=[sum(cast(v#16 as bigint))], output=[sum(v)#18L]) +- Exchange SinglePartition, true, [id=#70] +- *(2) HashAggregate(keys=[], functions=[partial_sum(cast(v#16 as bigint))], output=[sum#21L]) +- *(2) Project [(3 + value#1) AS v#16] +- *(2) BroadcastHashJoin [id#0], [id#8], Inner, BuildRight :- *(2) Project [id#0, value#1] : +- *(2) Filter (((((isnotnull(cid#2) AND isnotnull(did#3)) AND (cid#2 = 1)) AND (did#3 = 2)) AND (id#0 > 5)) AND isnotnull(id#0)) : +- *(2) ColumnarToRow : +- FileScan parquet [id#0,value#1,cid#2,did#3] Batched: true, DataFilters: [isnotnull(cid#2), isnotnull(did#3), (cid#2 = 1), (did#3 = 2), (id#0 > 5), isnotnull(id#0)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bytedance/Documents/Own/project/practice/src/main/data/data1/t1.csv], PartitionFilters: [], PushedFilters: [IsNotNull(cid), IsNotNull(did), EqualTo(cid,1), EqualTo(did,2), GreaterThan(id,5), IsNotNull(id)], ReadSchema: struct<id:int,value:int,cid:int,did:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#64] +- *(1) Project [id#8] +- *(1) Filter (isnotnull(id#8) AND (id#8 > 5)) +- *(1) ColumnarToRow +- FileScan parquet [id#8] Batched: true, DataFilters: [isnotnull(id#8), (id#8 > 5)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bytedance/Documents/Own/project/practice/src/main/data/data1/t2.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,5)], ReadSchema: struct<id:int>

Join inner變成了broadcastHashJoin
感謝各位的閱讀!關(guān)于“spark sql是如何變成執(zhí)行計(jì)劃的”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!
分享文章:sparksql是如何變成執(zhí)行計(jì)劃的
網(wǎng)頁(yè)地址:http://chinadenli.net/article30/jijdso.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站建設(shè)、面包屑導(dǎo)航、網(wǎng)站營(yíng)銷(xiāo)、品牌網(wǎng)站設(shè)計(jì)、關(guān)鍵詞優(yōu)化、域名注冊(cè)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)