眾所周知,flink在開啟checkpoint之后,source 任務(wù)收到觸發(fā)檢查點(diǎn)保存的指令后,會(huì)立即在當(dāng)前處理的數(shù)據(jù)中插入一個(gè)標(biāo)識(shí)字段(Barrier),然后再向下游任務(wù)發(fā)出。我們平時(shí)使用比較多的是對(duì)齊的barrier,那你知道非對(duì)齊的barrier嗎?如何使用呢?讓我們通過(guò)下面的閱讀一起了解一下吧。
一、Barrier
流的barrier是Flink Checkpoint中的一個(gè)核心概念。多個(gè)barrier被插入到數(shù)據(jù)流中, 然后作為數(shù)據(jù)流的一部分隨著數(shù)據(jù)流動(dòng)(有點(diǎn)類似于Watermark)。這些barrier不會(huì)跨越流中的數(shù)據(jù)。
每個(gè)barrier會(huì)把數(shù)據(jù)流分成兩部分:一部分?jǐn)?shù)據(jù)進(jìn)入當(dāng)前的快照,另一部分?jǐn)?shù)據(jù)進(jìn)入下一個(gè)快照。每個(gè)barrier攜帶著快照的id。barrier 不會(huì)暫停數(shù)據(jù)的流動(dòng),所以非常輕量級(jí)。在流中, 同一時(shí)間可以有來(lái)源于多個(gè)不同快照的多個(gè)barrier,這意味著可以并發(fā)地出現(xiàn)不同的快照。
二、對(duì)齊的barrier
在多并行度下,如果要實(shí)現(xiàn)嚴(yán)格一次,則要執(zhí)行barrier對(duì)齊。當(dāng) job graph 中的每個(gè) operator 接收到 barrier 時(shí),就會(huì)記錄下其狀態(tài)。擁有兩個(gè)輸入流的 operators(例如 CoProcessFunction)會(huì)執(zhí)行 barrier 對(duì)齊(barrier alignment),以便當(dāng)前快照能夠包含兩個(gè)輸入流 barrier 之前(但不超過(guò))的所有 events 產(chǎn)生的狀態(tài)。
1. 當(dāng)operator收到數(shù)字流的barrier n時(shí), 它就不能處理(但是可以接收)來(lái)自該流的任何數(shù)據(jù)記錄,直到它從字母流的所有輸入中接收到 barrier n 為止。
2. 接收到 barrier n 的流(數(shù)字流)暫時(shí)被擱置。從這些流接收的記錄會(huì)進(jìn)入輸入緩沖區(qū), 不會(huì)被處理。例如圖中的 barrier n 之后的數(shù)據(jù) 123 已經(jīng)到達(dá)了operator, 存入到了輸入緩沖區(qū)沒(méi)有被處理, 只有等到字母流的 barrier n 到達(dá)之后才會(huì)開始處理。
3. 一旦最后所有輸入流都接收到 barrier n,operator 就會(huì)把緩沖區(qū)中待輸出的數(shù)據(jù)發(fā)出去,然后把 barrier n 接著往下游發(fā)送。這里還會(huì)對(duì)自身進(jìn)行快照。
優(yōu)點(diǎn):
- barrier 對(duì)齊不僅保證了狀態(tài)的準(zhǔn)確性,還巧妙地消去了原生C-L算法中記錄輸入流狀態(tài)的步驟,十分輕量級(jí),保存的數(shù)據(jù)體積小。
缺點(diǎn):
- 延遲性高(快的barrier到達(dá)后會(huì)阻塞此條流的數(shù)據(jù)處理)。
- 加劇作業(yè)的反壓(當(dāng)出現(xiàn)反壓時(shí),數(shù)據(jù)本身就處理不過(guò)來(lái),此時(shí)某條流的數(shù)據(jù)又阻塞了,所以就會(huì)加劇反壓)。
- 整體 chenkpoint 時(shí)間變長(zhǎng)(因?yàn)榉磯簳?huì)導(dǎo)致數(shù)據(jù)流速變慢,導(dǎo)致barrier流動(dòng)速度也會(huì)變慢,所以整體chenkpoint時(shí)間就會(huì)變長(zhǎng))。
三、barrier不對(duì)齊
如果barrier不對(duì)齊會(huì)怎么樣?會(huì)重復(fù)消費(fèi),就是至少一次語(yǔ)義。
1. 當(dāng) operator 收到數(shù)字流的 barrier n 時(shí),開啟本地快照記錄自己的狀態(tài),并將這個(gè) barrier 發(fā)往下游(輸出緩沖區(qū))。
2. 接收到 barrier n 的流(數(shù)字流)會(huì)繼續(xù)往下走。字母流的 barrier n 前面的數(shù)據(jù)(abcd)會(huì)被保存到狀態(tài)里面,直到 barrier n 到來(lái)以后,再進(jìn)行checkpoint,將數(shù)據(jù)保存到檢查點(diǎn)中。
優(yōu)點(diǎn):
- 避免了 checkpoint 可能帶來(lái)的阻塞,有利于提升 Flink 的資源利用率。
缺點(diǎn):
- 由于要持久化緩存數(shù)據(jù),State Size 會(huì)有比較大的增長(zhǎng),磁盤負(fù)載會(huì)加重。
- 隨著 State Size 增長(zhǎng),作業(yè)恢復(fù)時(shí)間可能增長(zhǎng),運(yùn)維管理難度增加。
圖片來(lái)源于網(wǎng)絡(luò),侵刪
四、barrier的使用
對(duì)齊
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 開始一次 checkpoint
env.enableCheckpointing(1000);
// 高級(jí)選項(xiàng):
// 設(shè)置模式為精確一次 (這是默認(rèn)值),對(duì)于延遲要求較高的選擇,最少一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確認(rèn) checkpoints 之間的時(shí)間會(huì)進(jìn)行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必須在一分鐘內(nèi)完成,否則就會(huì)被拋棄
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允許兩個(gè)連續(xù)的 checkpoint 錯(cuò)誤
env.getCheckpointConfig().setTolerableCheckpointFAIlureNumber(2);
// 同一時(shí)間只允許一個(gè) checkpoint 進(jìn)行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints,這樣 checkpoint 在作業(yè)取消后仍就會(huì)被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 開啟實(shí)驗(yàn)性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
非對(duì)齊
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 啟用非對(duì)齊 Checkpoint
env.getCheckpointConfig().enableUnalignedCheckpoints();
或者在 flink-conf.yml 配置文件中增加配置:
execution.checkpointing.unaligned: true
五、總結(jié)
非對(duì)齊barrier主要是解決嚴(yán)重反壓情況下作業(yè)難以完成 checkpoint 的問(wèn)題,同時(shí)它以磁盤資源為代價(jià),避免了 checkpoint 可能帶來(lái)的阻塞,有利于提升 Flink 的資源利用率。