一 摘要
Apache Flink是目前大數(shù)據(jù)領(lǐng)域最流行的流批一體化計算引擎,而數(shù)據(jù)湖技術(shù)也是互聯(lián)網(wǎng)時代的產(chǎn)物,以Iceberg、Hudi和Delta為代表的數(shù)據(jù)湖技術(shù)應(yīng)運而生。Iceberg目前已經(jīng)提供對Apache Flink 1.11.x的集成支持,F(xiàn)link可以通過DataStream API/Table API將數(shù)據(jù)寫入Iceberg。
二 背景及痛點
新浪和微博有強大的用戶群體,目前積累的數(shù)據(jù)已經(jīng)達到幾百PB。微博的技術(shù)通常會采集應(yīng)用App的埋點數(shù)據(jù)以及應(yīng)用服務(wù)日志之類的數(shù)據(jù),這些數(shù)據(jù)通過Kafka消息中間件接入數(shù)據(jù)倉庫。
2.1 數(shù)據(jù)平臺架構(gòu)
在微博的數(shù)據(jù)倉庫中,有多種大數(shù)據(jù)存儲組件,譬如Hive/HBase/HDFS,計算引擎有MapReduce、Spark、Flink,根據(jù)用戶不同的需求,會應(yīng)用不同的技術(shù)在大數(shù)據(jù)平臺中計算,將結(jié)果保存到MySQL、ES等支持快速查詢的關(guān)系型、非關(guān)系型數(shù)據(jù)庫中,接下來應(yīng)用層就可以基于這些數(shù)據(jù)進行BI報表開發(fā)、用戶畫像,或者基于Presto這種OLAP工具進行統(tǒng)計查詢。
2.2 Lambda架構(gòu)的痛點
在整個數(shù)據(jù)處理的過程中,我們會借用調(diào)用系統(tǒng)來調(diào)度應(yīng)用程序,定期(T+1或者H+1)去執(zhí)行一些Spark任務(wù)。離線數(shù)據(jù)處理的整個過程中存在著大量的數(shù)據(jù)延遲現(xiàn)象,這些數(shù)據(jù)可能是T+1輸出或者是H+1輸出。但是,業(yè)務(wù)方已經(jīng)不再滿足于這些離線處理數(shù)據(jù)的方式,因此,我們也用Flink+Kafka去構(gòu)建了實時流數(shù)據(jù)處理系統(tǒng)。
如下圖,就是原來使用的Lambda架構(gòu),Lambda架構(gòu)將數(shù)倉分成離線層和實時層。也就是說,同一份數(shù)據(jù)會被處理兩次以上,同一套業(yè)務(wù)邏輯需要適配兩次開發(fā)。
例如在實時場景下計算PV、UV時,我們會用實時技術(shù)計算,這些數(shù)據(jù)指標(biāo)會離開呈現(xiàn)出來。但是,我們有時候需要做趨勢分析,需要每天再重新計算一次PV、UV數(shù)據(jù),比如在凌晨3點的時候,用Spark在調(diào)度系統(tǒng)上把前一天的數(shù)據(jù)全部重新再跑一遍。
很顯然,這兩個過程運行的時間不一致,跑的數(shù)據(jù)卻是完全相同的。重新跑一遍離線分析的數(shù)據(jù),數(shù)據(jù)的更新成本很高,更嚴(yán)重的是二者的數(shù)據(jù)可能不一致(比如有延遲數(shù)據(jù)產(chǎn)生,離線數(shù)據(jù)比實時更準(zhǔn)確)。
為了解決Lambda架構(gòu)的痛點,就產(chǎn)生了Kappa架構(gòu),相信大家對這個架構(gòu)也非常熟悉。
2.3 Kappa架構(gòu)的痛點
Kappa架構(gòu)解決了Lambda架構(gòu)中離線和實時數(shù)據(jù)間不一致、運營和開發(fā)成本加班的問題,但是Kappa架構(gòu)也有痛點。
首先,我們需要借用Kafka來構(gòu)建實時場景,但是如果需要對ODS層數(shù)據(jù)做進一步的分析時,就要接入Flink計算引擎把數(shù)據(jù)寫入到DWD層的Kafka,同樣也會將一部分結(jié)果數(shù)據(jù)寫入到DWS層的Kafka。但是,如果想做簡單的數(shù)據(jù)分析時,又要將DWD和DWS層的數(shù)據(jù)寫入到ClickHouse、ES、MySQL或者是Hive里做進一步分析,這無疑帶來了鏈路的復(fù)雜性。
其次,Kappa架構(gòu)是嚴(yán)重依賴于消息隊列的,我們知道消息隊列本身的準(zhǔn)確性嚴(yán)格依賴它上游數(shù)據(jù)的順序,但是,消息隊列越多,發(fā)生亂序的可能性越大。通常情況下,ODS層的數(shù)據(jù)是絕對準(zhǔn)確的,把ODS層數(shù)據(jù)經(jīng)過計算之后寫入到DWD層時就會產(chǎn)生亂序,DWD到DWS更容易產(chǎn)生亂序,這樣的數(shù)據(jù)不一致性問題非常大。
那么有沒有一種架構(gòu),既能滿足實時性的需求,又能滿足離線計算的需求,同時還能減輕運營開發(fā)成本?解決Kappa架構(gòu)的痛點呢?
2.4 痛點總結(jié)
2.5 實時數(shù)據(jù)倉庫建設(shè)需求
是否有一種技術(shù),既能夠保證數(shù)據(jù)高效的回溯能力,支持?jǐn)?shù)據(jù)更新,又能夠?qū)崿F(xiàn)數(shù)據(jù)的流批讀寫,并且還能夠?qū)崿F(xiàn)分鐘級別的數(shù)據(jù)接入。
這也是建設(shè)實時數(shù)據(jù)倉庫的迫切需要,實際上需要對Kappa架構(gòu)進行改進升級,以解決Kappa架構(gòu)中遇到的問題,接下來我們會進一步探討數(shù)據(jù)湖技術(shù)--Iceberg。
三 數(shù)據(jù)湖Apache Iceberg介紹
3.1 Iceberg是什么
官網(wǎng)對Iceberg的描述如下:
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Trino and Spark that use a high-performance format that works just like a SQL table.
Iceberg的官方定義是一種表格式,可以理解為是基于計算層(Spark、Flink)和存儲層(ORC、Parquet、Avro)的中間介質(zhì)層,用Flink或者Spark將數(shù)據(jù)寫入Iceberg,然后通過Presto、Flink、Spark來讀取這些表。
3.2 Iceberg的Table格式介紹
Iceberg主要是為分析海量數(shù)據(jù)計算的,被定義為Table Format,Table Format介于計算層和存儲層之間。
Table Format向下管理存儲系統(tǒng)上的文件,向上為計算層提供接口。比如一張Hive表,在HDFS上會有Partition,存儲格式,壓縮格式和數(shù)據(jù)的HDFS目錄等,這些信息都維護在元數(shù)據(jù)中,這些元數(shù)據(jù)被稱為一種文件的組織形式。
Iceberg能夠高效支撐上層的計算層訪問磁盤上的文件。
3.3 Iceberg的功能總結(jié)
Iceberg目前支持三種文件格式,Parquet、ORC、Avro,Iceberg的主要功能如下:
3.4 Iceberg的設(shè)計
3.4.1 設(shè)計目標(biāo)
- 和HIVE模式類似,它也是一種開放的靜態(tài)數(shù)據(jù)存儲形式,和計算層使用的語言不同。
- 具有強大的擴展性和可靠性:簡單透明的使用方式,用戶只需要關(guān)心寫入數(shù)據(jù)的邏輯,Iceberg會自動識別所有元數(shù)據(jù)的變更。Iceberg也支持并發(fā)寫。
- 存儲結(jié)構(gòu)高可用:Iceberg有非常合理的Schema管理模式,具有多版本管理機制,支持版本回滾。
3.4.2 詳細(xì)設(shè)計
自帶ACID能力:保障每次寫入后的數(shù)據(jù)都是一個完整的快照(snapshot),每個snapshot包含著一系列的文件列表,落地任務(wù)把數(shù)據(jù)直接寫入Iceberg表中,不需要任務(wù)再做額外的success狀態(tài)維護。Iceberg會根據(jù)分區(qū)字段自動處理延時到來的數(shù)據(jù),把延時的數(shù)據(jù)及時的寫入到正確的分區(qū),因為有ACID的保障,延時數(shù)據(jù)寫入過程中Iceberg表依然提供可靠的讀取能力。
基于MVCC(Multi Version Concurrency Control)的機制,默認(rèn)讀取文件會從最新的的版本,每次寫入都會產(chǎn)生一個新的snapshot,讀寫相互不干擾。
基于多版本的機制可以可用輕松實現(xiàn)回滾和時間旅行的功能,讀取或者回滾任意版本的snapshot數(shù)據(jù)。
3.4.3 組織架構(gòu)
下圖是 Iceberg 整個文件的組織架構(gòu)。從上往下看:
- 最上層是 snapshot 模塊。snapshot 是用戶可讀取的基本數(shù)據(jù)單位,也就是說,每次讀取一張表里面的所有數(shù)據(jù),都是一個snapshot 里面的數(shù)據(jù)。
- 中間層manifest。一個 snapshot 下面會有多個 manifest,如圖 snapshot-0 有兩個 manifest,而 snapshot-1 有三個 manifest,每個 manifest 下面會管理一個或多個 DataFiles 文件。
- 數(shù)據(jù)層DataFiles。manifest 文件里面存放的就是數(shù)據(jù)的元信息,我們可以打開 manifest 文件,可以看到里面其實是一行行的 datafiles 文件路徑。
3.5 Iceberg的讀寫過程介紹
3.5.1 Iceberg的讀寫
如下圖所示,虛線框(snapshot-1)表示正在進行寫操作,但是還沒有發(fā)生commit操作,這時候 snapshot-1 是不可讀的,用戶只能讀取已經(jīng) commit 之后的 snapshot。同理, snapshot-2,snapshot-3表示已經(jīng)可讀。
可以支持并發(fā)讀,例如可以同時讀取S1、S2、S3的快照數(shù)據(jù),同時,可以回溯到snapshot-2或者snapshot-3。在snapshot-4 commit完成之后,這時候snapshot-4已經(jīng)變成實線,就可以讀取數(shù)據(jù)了。
例如,現(xiàn)在current Snapshot 的指針移到S3,用戶對一張表的讀操作,都是讀 current Snapshot 指針?biāo)赶虻?Snapshot,但不會影響前面的 snapshot 的讀操作。
3.5.2 增量讀取
Iceberg的每個snapshot都包含前一個snapshot的所有數(shù)據(jù),每次都相當(dāng)于全量讀取數(shù)據(jù),對于整個鏈路來說,讀取數(shù)據(jù)的代價是非常高的。
如果我們只想讀取當(dāng)前時刻的增量數(shù)據(jù),就可以根據(jù)Iceberg中Snapshot的回溯機制來實現(xiàn),僅讀取Snapshot1到Snapshot2的增量數(shù)據(jù),也就是下圖中的紫色數(shù)據(jù)部分。
同理,S3也可以只讀取紅色部分的增量數(shù)據(jù),也可以讀取S1-S3的增量數(shù)據(jù)。
Iceberg支持讀寫分離,也就是說可以支持并發(fā)讀和增量讀。
3.6 小文件問題
3.6.1 實時小文件問題
目前Flink社區(qū)現(xiàn)在已經(jīng)重構(gòu)了 Flink 里面的 FlinkIcebergSink,提供了 global committee 的功能,我們采用的也是社區(qū)提供的FlinkIcebergSink,曲線框中的這塊內(nèi)容是 FlinkIcebergSink。
多個 IcebergStreamWriter 和一個 IcebergFileCommitter 的情況下,在上游的數(shù)據(jù)寫到 IcebergStreamWriter 的時候,每個 writer 里面做的事情都是去寫 datafiles 文件。
當(dāng)每個 writer 寫完自己當(dāng)前這一批 datafiles 小文件的時候,就會發(fā)送消息給 IcebergFileCommitter,告訴它可以提交了。而 IcebergFileCommitter 收到信息的時,就一次性將 datafiles 的文件提交,進行一次 commit 操作。
commit 操作本身只是對一些原始信息的修改,讓其從不可見變成可見。
3.6.2 實時合并小文件
在實際的生產(chǎn)環(huán)境中,F(xiàn)link 實時作業(yè)會一直在集群中運行,為了要保證數(shù)據(jù)的時效性,一般會把 Iceberg commit 操作的時間周期設(shè)成 30 秒或者是一分鐘。當(dāng) Flink 作業(yè)跑一天時,如果是一分鐘一次 commit,一天需要 1440 個 commit,如果 Flink 作業(yè)跑一個月commit 操作會更多。甚至 snapshot commit 的時間間隔越短,生成的 snapshot 的數(shù)量會越多。當(dāng)流式作業(yè)運行后,就會生成大量的小文件。
Iceberg 小文件合并是在
org.apache.iceberg.actions.RewriteDataFilesAction 類里面實現(xiàn)的。社區(qū)中小文件合并其實是通過 Spark 并行計算的,我們參考了社區(qū)Spark的實現(xiàn)方法,自己封裝了使用Flink合并小文件的方法。
四 Flink+Iceberg構(gòu)建實時數(shù)倉
4.1 準(zhǔn)實時數(shù)據(jù)倉庫分析系統(tǒng)
我們知道Iceberg支持讀寫分離,又支持并發(fā)讀、增量讀、合并小文件,而且還能做到秒級/分鐘級的數(shù)據(jù)延遲。我們基于Iceberg這些優(yōu)勢,采用Flink+Iceberg的方式構(gòu)建了流批一體化的實時數(shù)據(jù)倉庫。
在數(shù)據(jù)倉庫處理層,可以用 presto 進行一些簡單的查詢,因為 Iceberg 支持 Streaming read,所以在系統(tǒng)的中間層也可以直接接入 Flink,直接在中間層用 Flink 做一些批處理或者流式計算的任務(wù),把中間結(jié)果做進一步計算后輸出到下游。
4.2 采用Iceberg替代Kafka實時數(shù)倉的優(yōu)劣勢
五 未來規(guī)劃
5.1Iceberg 內(nèi)核能力提升
- Row-level delete 功能。目前社區(qū)還不支持行級別的刪除功能,Iceberg 當(dāng)前只支持 copy on write 的 update 的能力。如果要真正的構(gòu)建一個實時數(shù)據(jù)倉庫,還是需要一個高效的 merge on read 的 update 能力。我們會繼續(xù)根據(jù)社區(qū)的更新動態(tài),逐步迭代升級。
- 建立統(tǒng)一索引加速數(shù)據(jù)檢索。期待社區(qū)會有一個完善的統(tǒng)一索引加速功能。
5.2 內(nèi)部大數(shù)據(jù)平臺升級
希望借助Alluxio構(gòu)建一個數(shù)據(jù)湖加速功能,以便在查詢層實現(xiàn)秒級分析功能。
建立自動Schema建表的功能。
和所有業(yè)務(wù)系統(tǒng)打通,年內(nèi)遷移完成所有業(yè)務(wù)線的數(shù)據(jù),完整全部數(shù)據(jù)入湖建設(shè)。
原創(chuàng)不易,歡迎點贊加關(guān)注,您的關(guān)注是我持續(xù)創(chuàng)作的動力。