(1)SparkStreaming 整合 kafka 兩種方式對比
Direct 方式的優(yōu)缺點(diǎn)分析:

創(chuàng)新互聯(lián)公司2013年至今,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都網(wǎng)站建設(shè)、成都網(wǎng)站制作網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元義馬做網(wǎng)站,已為上家服務(wù),為義馬各地企業(yè)和個人服務(wù),聯(lián)系電話:18982081108
- 優(yōu)點(diǎn):
- 簡化并行(Simplified Parallelism)。不現(xiàn)需要創(chuàng)建以及 union 多輸入源,Kafka topic 的partition 與 RDD 的 partition 一一對應(yīng)。
- 高效(Efficiency)?;?Receiver-based 的方式保證數(shù)據(jù)零丟失(zero-data loss)需要配置 spark.streaming.receiver.writeAheadLog.enable=true,此種方式需要保存兩份數(shù)據(jù),浪費(fèi)存儲空間也影響效率。而 Direct 方式則不存在這個問題。
- 強(qiáng)一致語義(Exactly-once semantics)。High-level 數(shù)據(jù)由 Spark Streaming 消費(fèi),但是Offsets 則是由 Zookeeper 保存。通過參數(shù)配置,可以實(shí)現(xiàn) at-least once 消費(fèi),此種情況有重復(fù)消費(fèi)數(shù)據(jù)的可能。
- 降低資源。Direct 不需要 Receivers,其申請的 Executors 全部參與到計(jì)算任務(wù)中;而Receiver-based 則需要專門的 Receivers 來讀取 Kafka 數(shù)據(jù)且不參與計(jì)算。因此相同的資源申請,Direct 能夠支持更大的業(yè)務(wù)。
- 降低內(nèi)存。Receiver-based 的 Receiver 與其他 Exectuor 是異步的,并持續(xù)不斷接收數(shù)據(jù),對于小業(yè)務(wù)量的場景還好,如果遇到大業(yè)務(wù)量時,需要提高 Receiver 的內(nèi)存,但是參與計(jì)算的 Executor 并無需那么多的內(nèi)存。而 Direct 因?yàn)闆]有 Receiver,而是在計(jì)算時讀取數(shù)據(jù),然后直接計(jì)算,所以對內(nèi)存的要求很低。
- 缺點(diǎn):
- 提高成本。Direct 需要用戶采用 checkpoint 或者第三方存儲來維護(hù) offsets,而不像Receiver-based 那樣,通過 ZooKeeper 來維護(hù) Offsets,此提高了用戶的開發(fā)成本。
- 監(jiān)控可視化。Receiver-based 方式指定 topic 指定 consumer 的消費(fèi)情況均能通過ZooKeeper 來監(jiān)控,而 Direct 則沒有這種便利,不能自動保存 offset 到 zookeeper,如果做到監(jiān)控并可視化,則需要投入人力開發(fā)。
Receiver 方式的優(yōu)缺點(diǎn)分析:
- 優(yōu)點(diǎn):
- 專注計(jì)算。Kafka 的 high-level 數(shù)據(jù)讀取方式讓用戶可以專注于所讀數(shù)據(jù),而不用關(guān)注或維護(hù) consumer 的 offsets,這減少用戶的工作量以及代碼量而且相對比較簡單。
- 缺點(diǎn):
- 防數(shù)據(jù)丟失。做 checkpoint 操作以及配置 spark.streaming.receiver.writeAheadLog.enable參數(shù),配置 spark.streaming.receiver.writeAheadLog.enable 參數(shù),每次處理之前需要將該batch 內(nèi)的日志備份到 checkpoint 目錄中,這降低了數(shù)據(jù)處理效率,反過來又加重了Receiver 端的壓力;另外由于數(shù)據(jù)備份機(jī)制,會受到負(fù)載影響,負(fù)載一高就會出現(xiàn)延遲的風(fēng)險,導(dǎo)致應(yīng)用崩潰。
- 單 Receiver 內(nèi)存。由于 receiver 也是屬于 Executor 的一部分,那么為了提高吞吐量
- 重復(fù)消費(fèi)。在程序失敗恢復(fù)時,有可能出現(xiàn)數(shù)據(jù)部分落地,但是程序失敗,未更新 offset的情況,這導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi)。
- Receiver 和計(jì)算的 Executor的異步的,那么遇到網(wǎng)絡(luò)等因素原因,導(dǎo)致計(jì)算出現(xiàn)延遲,計(jì)算隊(duì)列一直在增加,而Receiver 則在一直接收數(shù)據(jù),這非常容易導(dǎo)致程序崩潰。
(2)對kafka消費(fèi)的offset的管理
- spark自帶的checkpoint:
- 啟用spark streaming的checkpoint是存儲偏移量的最簡單方法
- 流式checkpoint專門保存用戶應(yīng)用程序的狀態(tài)
- 但是checkpoint的目錄是不能共享的,無法跨越應(yīng)用程序進(jìn)行恢復(fù)
- 一般不使用checkpoint管理offset
- 使用zookeeper管理offset
- 如果Zookeeper中未保存offset,根據(jù)kafkaParam的配置使用最新或者最舊的offset
- 如果 zookeeper中有保存offset,我們會利用這個offset作為kafkaStream 的起始位置
- 使用hbase保存offset
- Rowkey的設(shè)計(jì):topic名稱 + groupid + streaming的batchtime.milliSeconds
- 使用hdfs管理offset:當(dāng)然這種情況不推薦使用,因?yàn)樵趆dfs中會生成大量的小文件,導(dǎo)致,hdfs的性能急劇下降
(3)Driver的HA
介紹:他能夠在driver失敗的時候,通過讀取checkpoint目錄下的元數(shù)據(jù),恢復(fù)當(dāng)前streamingContext對象的狀態(tài);它能夠察覺到driver進(jìn)程異常退出之后,自動重啟。
具體流程:當(dāng)?shù)谝淮芜\(yùn)行程序時,發(fā)現(xiàn)checkpoint中沒有數(shù)據(jù),則根據(jù)定義的函數(shù)來第一次創(chuàng)建StreamingContext對象,當(dāng)程序異常退出的時候,此時會根據(jù)checkpoint中的元數(shù)據(jù)恢復(fù)一個StreamingContext對象,達(dá)到異常退出之前的狀態(tài),而實(shí)現(xiàn)異常退出并自動啟動則是sparkStreaming應(yīng)用程序?qū)river進(jìn)行監(jiān)控,并且在他失敗的時候感知,并進(jìn)行重啟。
必要條件:
- spark-submit提交作業(yè)的時候,必須是集群模式(cluster),并且必須在spark-standalong下。
spark-submit \
--class com.aura.mazh.spark.streaming.kafka.SparkStreamDemo_Direct \
//這里只能使用spark的standalong模式,所以配置為spark集群
--master spark://hadoop02:7077,hadoop04:7077 \
--driver-memory 512m \
--total-executor-cores 3 \
--executor-memory 512m \
#這句代碼一定要加,他可以使異常退出的driver程序,重新啟動
--supervise \
--name SparkStreamDemo_Direct \
--jars /home/hadoop/lib/kafka_2.11-0.8.2.1.jar,\
/home/hadoop/lib/metrics-core-2.2.0.jar,\
/home/hadoop/lib/spark-streaming_2.11-2.3.2.jar,\
/home/hadoop/lib/spark-streaming-kafka-0-8_2.11-2.3.2.jar,\
/home/hadoop/lib/zkclient-0.3.jar \
/home/hadoop/original-spark-1.0-SNAPSHOT.jar \
spark://hadoop02:7077,hadoop04:7077
- 需要添加--supervise \,才能實(shí)現(xiàn)失敗自啟動
- 需要配置checkpoint目錄,并且是存儲在hdfs上,jar也要放置在hdfs上
本文標(biāo)題:SparkStreaming整合kafka的補(bǔ)充
當(dāng)前地址:http://chinadenli.net/article28/giegjp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供電子商務(wù)、定制網(wǎng)站、自適應(yīng)網(wǎng)站、品牌網(wǎng)站制作、網(wǎng)站營銷、營銷型網(wǎng)站建設(shè)
廣告
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源:
創(chuàng)新互聯(lián)