日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

眾所周知,flink在開啟checkpoint之后,source 任務(wù)收到觸發(fā)檢查點(diǎn)保存的指令后,會(huì)立即在當(dāng)前處理的數(shù)據(jù)中插入一個(gè)標(biāo)識(shí)字段(Barrier),然后再向下游任務(wù)發(fā)出。我們平時(shí)使用比較多的是對(duì)齊的barrier,那你知道非對(duì)齊的barrier嗎?如何使用呢?讓我們通過(guò)下面的閱讀一起了解一下吧。

一、Barrier

流的barrier是Flink Checkpoint中的一個(gè)核心概念。多個(gè)barrier被插入到數(shù)據(jù)流中, 然后作為數(shù)據(jù)流的一部分隨著數(shù)據(jù)流動(dòng)(有點(diǎn)類似于Watermark)。這些barrier不會(huì)跨越流中的數(shù)據(jù)。

每個(gè)barrier會(huì)把數(shù)據(jù)流分成兩部分:一部分?jǐn)?shù)據(jù)進(jìn)入當(dāng)前的快照,另一部分?jǐn)?shù)據(jù)進(jìn)入下一個(gè)快照。每個(gè)barrier攜帶著快照的id。barrier 不會(huì)暫停數(shù)據(jù)的流動(dòng),所以非常輕量級(jí)。在流中, 同一時(shí)間可以有來(lái)源于多個(gè)不同快照的多個(gè)barrier,這意味著可以并發(fā)地出現(xiàn)不同的快照。

二、對(duì)齊的barrier

在多并行度下,如果要實(shí)現(xiàn)嚴(yán)格一次,則要執(zhí)行barrier對(duì)齊。當(dāng) job graph 中的每個(gè) operator 接收到 barrier 時(shí),就會(huì)記錄下其狀態(tài)。擁有兩個(gè)輸入流的 operators(例如 CoProcessFunction)會(huì)執(zhí)行 barrier 對(duì)齊(barrier alignment),以便當(dāng)前快照能夠包含兩個(gè)輸入流 barrier 之前(但不超過(guò))的所有 events 產(chǎn)生的狀態(tài)。

1. 當(dāng)operator收到數(shù)字流的barrier n時(shí), 它就不能處理(但是可以接收)來(lái)自該流的任何數(shù)據(jù)記錄,直到它從字母流的所有輸入中接收到 barrier n 為止。

2. 接收到 barrier n 的流(數(shù)字流)暫時(shí)被擱置。從這些流接收的記錄會(huì)進(jìn)入輸入緩沖區(qū), 不會(huì)被處理。例如圖中的 barrier n 之后的數(shù)據(jù) 123 已經(jīng)到達(dá)了operator, 存入到了輸入緩沖區(qū)沒(méi)有被處理, 只有等到字母流的 barrier n 到達(dá)之后才會(huì)開始處理。

3. 一旦最后所有輸入流都接收到 barrier n,operator 就會(huì)把緩沖區(qū)中待輸出的數(shù)據(jù)發(fā)出去,然后把 barrier n 接著往下游發(fā)送。這里還會(huì)對(duì)自身進(jìn)行快照。

優(yōu)點(diǎn):

  • barrier 對(duì)齊不僅保證了狀態(tài)的準(zhǔn)確性,還巧妙地消去了原生C-L算法中記錄輸入流狀態(tài)的步驟,十分輕量級(jí),保存的數(shù)據(jù)體積小。

缺點(diǎn):

  • 延遲性高(快的barrier到達(dá)后會(huì)阻塞此條流的數(shù)據(jù)處理)。
  • 加劇作業(yè)的反壓(當(dāng)出現(xiàn)反壓時(shí),數(shù)據(jù)本身就處理不過(guò)來(lái),此時(shí)某條流的數(shù)據(jù)又阻塞了,所以就會(huì)加劇反壓)。
  • 整體 chenkpoint 時(shí)間變長(zhǎng)(因?yàn)榉磯簳?huì)導(dǎo)致數(shù)據(jù)流速變慢,導(dǎo)致barrier流動(dòng)速度也會(huì)變慢,所以整體chenkpoint時(shí)間就會(huì)變長(zhǎng))。

三、barrier不對(duì)齊

如果barrier不對(duì)齊會(huì)怎么樣?會(huì)重復(fù)消費(fèi),就是至少一次語(yǔ)義。

1. 當(dāng) operator 收到數(shù)字流的 barrier n 時(shí),開啟本地快照記錄自己的狀態(tài),并將這個(gè) barrier 發(fā)往下游(輸出緩沖區(qū))。

2. 接收到 barrier n 的流(數(shù)字流)會(huì)繼續(xù)往下走。字母流的 barrier n 前面的數(shù)據(jù)(abcd)會(huì)被保存到狀態(tài)里面,直到 barrier n 到來(lái)以后,再進(jìn)行checkpoint,將數(shù)據(jù)保存到檢查點(diǎn)中。

優(yōu)點(diǎn):

  • 避免了 checkpoint 可能帶來(lái)的阻塞,有利于提升 Flink 的資源利用率。

缺點(diǎn):

  • 由于要持久化緩存數(shù)據(jù),State Size 會(huì)有比較大的增長(zhǎng),磁盤負(fù)載會(huì)加重。
  • 隨著 State Size 增長(zhǎng),作業(yè)恢復(fù)時(shí)間可能增長(zhǎng),運(yùn)維管理難度增加。

圖片來(lái)源于網(wǎng)絡(luò),侵刪

四、barrier的使用

對(duì)齊

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 1000ms 開始一次 checkpoint

env.enableCheckpointing(1000);

// 高級(jí)選項(xiàng):

// 設(shè)置模式為精確一次 (這是默認(rèn)值),對(duì)于延遲要求較高的選擇,最少一次

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 確認(rèn) checkpoints 之間的時(shí)間會(huì)進(jìn)行 500 ms

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// Checkpoint 必須在一分鐘內(nèi)完成,否則就會(huì)被拋棄

env.getCheckpointConfig().setCheckpointTimeout(60000);

// 允許兩個(gè)連續(xù)的 checkpoint 錯(cuò)誤

env.getCheckpointConfig().setTolerableCheckpointFAIlureNumber(2);

// 同一時(shí)間只允許一個(gè) checkpoint 進(jìn)行

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 使用 externalized checkpoints,這樣 checkpoint 在作業(yè)取消后仍就會(huì)被保留

env.getCheckpointConfig().setExternalizedCheckpointCleanup(

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 開啟實(shí)驗(yàn)性的 unaligned checkpoints

env.getCheckpointConfig().enableUnalignedCheckpoints();

非對(duì)齊

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 啟用非對(duì)齊 Checkpoint

env.getCheckpointConfig().enableUnalignedCheckpoints();

或者在 flink-conf.yml 配置文件中增加配置:

execution.checkpointing.unaligned: true

五、總結(jié)

非對(duì)齊barrier主要是解決嚴(yán)重反壓情況下作業(yè)難以完成 checkpoint 的問(wèn)題,同時(shí)它以磁盤資源為代價(jià),避免了 checkpoint 可能帶來(lái)的阻塞,有利于提升 Flink 的資源利用率。

分享到:
標(biāo)簽:Flink
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定