這篇文章主要講解了“Flink checkpoint機制是什么”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink checkpoint機制是什么”吧!
成都創(chuàng)新互聯(lián)成立于2013年,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項目成都做網(wǎng)站、網(wǎng)站建設(shè)網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元略陽做網(wǎng)站,已為上家服務(wù),為略陽各地企業(yè)和個人服務(wù),聯(lián)系電話:18980820575
checkpoint機制是Flink可靠性的基石,可以保證Flink集群在某個算子因為某些原因(如 異常退出)出現(xiàn)故障時,能夠?qū)⒄麄€應用流圖的狀態(tài)恢復到故障之前的某一狀態(tài),保 證應用流圖狀態(tài)的一致性。Flink的checkpoint機制原理來自“Chandy-Lamport algorithm”算法。
每個需要checkpoint的應用在啟動時,F(xiàn)link的JobManager為其創(chuàng)建一個 CheckpointCoordinator(檢查點協(xié)調(diào)器),CheckpointCoordinator全權(quán)負責本應用的快照制作。

1) CheckpointCoordinator(檢查點協(xié)調(diào)器) 周期性的向該流應用的所有source算子發(fā)送 barrier(屏障)。
2) 當某個source算子收到一個barrier時,便暫停數(shù)據(jù)處理過程,然后將自己的當前狀態(tài)制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自己快照制作情況,同時向自身所有下游算子廣播該barrier,恢復數(shù)據(jù)處理
3) 下游算子收到barrier之后,會暫停自己的數(shù)據(jù)處理過程,然后將自身的相關(guān)狀態(tài)制作成快照,并保存到指定的持久化存儲中,最后向CheckpointCoordinator報告自身快照情況,同時向自身所有下游算子廣播該barrier,恢復數(shù)據(jù)處理。
4) 每個算子按照步驟3不斷制作快照并向下游廣播,直到最后barrier傳遞到sink算子,快照制作完成。
5) 當CheckpointCoordinator收到所有算子的報告之后,認為該周期的快照制作成功; 否則,如果在規(guī)定的時間內(nèi)沒有收到所有算子的報告,則認為本周期快照制作失敗。
如果一個算子有兩個輸入源,則暫時阻塞先收到barrier的輸入源,等到第二個輸入源相 同編號的barrier到來時,再制作自身快照并向下游廣播該barrier。具體如下圖所示:
1) 假設(shè)算子C有A和B兩個輸入源
2) 在第i個快照周期中,由于某些原因(如處理時延、網(wǎng)絡(luò)時延等)輸入源A發(fā)出的 barrier 先到來,這時算子C暫時將輸入源A的輸入通道阻塞,僅收輸入源B的數(shù)據(jù)。
3) 當輸入源B發(fā)出的barrier到來時,算子C制作自身快照并向 CheckpointCoordinator 報告自身的快照制作情況,然后將兩個barrier合并為一個,向下游所有的算子廣播。
4) 當由于某些原因出現(xiàn)故障時,CheckpointCoordinator通知流圖上所有算子統(tǒng)一恢復到某個周期的checkpoint狀態(tài),然后恢復數(shù)據(jù)流處理。分布式checkpoint機制保證了數(shù)據(jù)僅被處理一次(Exactly Once)。
該持久化存儲主要將快照數(shù)據(jù)保存到JobManager的內(nèi)存中,僅適合作為測試以及快照的數(shù)據(jù)量非常小時使用,并不推薦用作大規(guī)模商業(yè)部署。
MemoryStateBackend 的局限性:
默認情況下,每個狀態(tài)的大小限制為 5 MB。可以在MemoryStateBackend的構(gòu)造函數(shù)中增加此值。
無論配置的最大狀態(tài)大小如何,狀態(tài)都不能大于akka幀的大小(請參閱配置)。
聚合狀態(tài)必須適合 JobManager 內(nèi)存。
建議MemoryStateBackend 用于:
本地開發(fā)和調(diào)試。
狀態(tài)很少的作業(yè),例如僅包含一次記錄功能的作業(yè)(Map,F(xiàn)latMap,F(xiàn)ilter,…),kafka的消費者需要很少的狀態(tài)。
該持久化存儲主要將快照數(shù)據(jù)保存到文件系統(tǒng)中,目前支持的文件系統(tǒng)主要是 HDFS和本地文件。如果使用HDFS,則初始化FsStateBackend時,需要傳入以 “hdfs://”開頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,則需要傳入以“file://”開頭的路徑(即:new FsStateBackend("file:///Data"))。在分布式情況下,不推薦使用本地文件。如果某 個算子在節(jié)點A上失敗,在節(jié)點B上恢復,使用本地文件時,在B上無法讀取節(jié)點 A上的數(shù)據(jù),導致狀態(tài)恢復失敗。
建議FsStateBackend:
具有大狀態(tài),長窗口,大鍵 / 值狀態(tài)的作業(yè)。
所有高可用性設(shè)置。
RocksDBStatBackend介于本地文件和HDFS之間,平時使用RocksDB的功能,將數(shù) 據(jù)持久化到本地文件中,當制作快照時,將本地數(shù)據(jù)制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用戶特別指明,只需在初始化時傳入HDFS 或本地路徑即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。
如果用戶使用自定義窗口(window),不推薦用戶使用RocksDBStateBackend。在自定義窗口中,狀態(tài)以ListState的形式保存在StatBackend中,如果一個key值中有多個value值,則RocksDB讀取該種ListState非常緩慢,影響性能。用戶可以根據(jù)應用的具體情況選擇FsStateBackend+HDFS或RocksStateBackend+HDFS。
val env = StreamExecutionEnvironment.getExecutionEnvironment()// start a checkpoint every 1000 msenv.enableCheckpointing(1000)// advanced options:// 設(shè)置checkpoint的執(zhí)行模式,最多執(zhí)行一次或者至少執(zhí)行一次env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 設(shè)置checkpoint的超時時間env.getCheckpointConfig.setCheckpointTimeout(60000)// 如果在只做快照過程中出現(xiàn)錯誤,是否讓整體任務(wù)失敗:true是 false不是env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)//設(shè)置同一時間有多少 個checkpoint可以同時執(zhí)行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
第一種:單任務(wù)調(diào)整
修改當前任務(wù)代碼
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】
第二種:全局調(diào)整
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
默認checkpoint功能是disabled的,想要使用的時候需要先啟用checkpoint開啟之后,默認的checkPointMode是Exactly-once
//配置一秒鐘開啟一個checkpointenv.enableCheckpointing(1000)//指定checkpoint的執(zhí)行模式//兩種可選://CheckpointingMode.EXACTLY_ONCE:默認值//CheckpointingMode.AT_LEAST_ONCEenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) 一般情況下選擇CheckpointingMode.EXACTLY_ONCE,除非場景要求極低的延遲(幾毫秒) 注意:如果需要保證EXACTLY_ONCE,source和sink要求必須同時保證EXACTLY_ONCE
//如果程序被cancle,保留以前做的checkpoint env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)默認情況下,檢查點不被保留,僅用于在故障中恢復作業(yè),可以啟用外部持久化檢查點,同時指定保留策略:ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在作業(yè)取消時保留檢查點,注意,在這種情況下,您必須在取消后手動清理檢查點狀態(tài)ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:當作業(yè)在被cancel時,刪除檢查點,檢查點僅在作業(yè)失敗時可用
//設(shè)置checkpoint超時時間env.getCheckpointConfig.setCheckpointTimeout(60000)//Checkpointing的超時時間,超時時間內(nèi)沒有完成則被終止
//Checkpointing最小時間間隔,用于指定上一個checkpoint完成之后//最小等多久可以觸發(fā)另一個checkpoint,當指定這個參數(shù)時,maxConcurrentCheckpoints的值為1env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//設(shè)置同一個時間是否可以有多個checkpoint執(zhí)行env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) 指定運行中的checkpoint最多可以有多少個 env.getCheckpointConfig.setFailOnCheckpointingErrors(true) 用于指定在checkpoint發(fā)生異常的時候,是否應該fail該task,默認是true,如果設(shè)置為false,則task會拒絕checkpoint然后繼續(xù)運行
Flink支持不同的重啟策略,這些重啟策略控制著job失敗后如何重啟。集群可以通過默認的重啟策略來重啟,這個默認的重啟策略通常在未指定重啟策略的情況下使用,而如果Job提交的時候指定了重啟策略,這個重啟策略就會覆蓋掉集群的默認重啟策略。
默認的重啟策略是通過Flink的 flink-conf.yaml來指定的,這個配置參數(shù) restart-strategy定義了哪種策略會被采用。如果checkpoint未啟動,就會采用 no restart策略,如果啟動了checkpoint機制,但是未指定重啟策略的話,就會采用 fixed-delay策略,重試 Integer.MAX_VALUE次。請參考下面的可用重啟策略來了解哪些值是支持的。
每個重啟策略都有自己的參數(shù)來控制它的行為,這些值也可以在配置文件中設(shè)置,每個重啟策略的描述都包含著各自的配置值信息。
除了定義一個默認的重啟策略之外,你還可以為每一個Job指定它自己的重啟策略,這個重啟策略可以在 ExecutionEnvironment中調(diào)用 setRestartStrategy()方法來程序化地調(diào)用,注意這種方式同樣適用于 StreamExecutionEnvironment。
下面的例子展示了如何為Job設(shè)置一個固定延遲重啟策略,一旦有失敗,系統(tǒng)就會嘗試每10秒重啟一次,重啟3次。
val env = ExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 重啟次數(shù) Time.of(10, TimeUnit.SECONDS) // 延遲時間間隔))
固定延遲重啟策略會嘗試一個給定的次數(shù)來重啟Job,如果超過了最大的重啟次數(shù),Job最終將失敗。在連續(xù)的兩次重啟嘗試之間,重啟策略會等待一個固定的時間。
重啟策略可以配置flink-conf.yaml的下面配置參數(shù)來啟用,作為默認的重啟策略:
restart-strategy: fixed-delay
例子:
restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s
固定延遲重啟也可以在程序中設(shè)置:
val env = ExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 重啟次數(shù) Time.of(10, TimeUnit.SECONDS) // 重啟時間間隔))
失敗率重啟策略在Job失敗后會重啟,但是超過失敗率后,Job會最終被認定失敗。在兩個連續(xù)的重啟嘗試之間,重啟策略會等待一個固定的時間。
失敗率重啟策略可以在flink-conf.yaml中設(shè)置下面的配置參數(shù)來啟用:
restart-strategy:failure-rate
例子:
restart-strategy.failure-rate.max-failures-per-interval: 3restart-strategy.failure-rate.failure-rate-interval: 5 minrestart-strategy.failure-rate.delay: 10 s
失敗率重啟策略也可以在程序中設(shè)置:
val env = ExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // 每個測量時間間隔最大失敗次數(shù) Time.of(5, TimeUnit.MINUTES), //失敗率測量的時間間隔 Time.of(10, TimeUnit.SECONDS) // 兩次連續(xù)重啟嘗試的時間間隔))
Job直接失敗,不會嘗試進行重啟
restart-strategy: none
無重啟策略也可以在程序中設(shè)置
val env = ExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.noRestart())
感謝各位的閱讀,以上就是“Flink checkpoint機制是什么”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對Flink checkpoint機制是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!
分享題目:Flinkcheckpoint機制是什么
文章鏈接:http://chinadenli.net/article6/jpcjog.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護、建站公司、網(wǎng)站策劃、用戶體驗、企業(yè)建站、ChatGPT
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)