目錄
1. Shopee 數據系統建設中面臨的典型問題
2. 為什么選擇 Hudi
3. Shopee 在 Hudi 落地過程中的實踐
4. 社區貢獻
5. 總結與展望
湖倉一體(LakeHouse)作為大數據領域的重要發展方向,提供了流批一體和湖倉結合的新場景。目前,企業許多業務中會遇到的數據及時性、準確性,以及存儲的成本等問題,都可以通過湖倉一體方案得到解決。 當下,幾個主流的湖倉一體開源方案都在不斷迭代開發中,業界的應用也都是在摸索中前行,在實際的使用中難免會遇到一些不夠完善的地方和未支持的特性。Shopee 內部在使用過程中基于開源的 Apache Hudi 定制了自己的版本,以實現企業級的應用和一些內部業務需求的新特性。 通過引入 Hudi 的 Data lake 方案,Shopee 的 Data Mart、推薦、ShopeeVideo 等產品的數據處理流程實現了流批一體、增量處理的特性,很大程度上簡化了這一流程,并提升了性能。
1. Shopee 數據系統建設中面臨的典型問題 1.1 Shopee 數據系統簡介
上圖是 Shopee Data Infrastructure 團隊為公司內部業務方提供的一套整體解決方案。
-
第一步是數據集成(Data Integration),目前我們提供了基于日志數據、數據庫和業務事件流的數據集成方式;
-
然后通過平臺的 ETL(Extract Transform Load)服務 load 到業務的數倉中,業務同學通過我們提供的開發平臺和計算服務進行數據處理;
-
最后的結果數據通過 Dashboard 進行展示,使用即時查詢引擎進行數據探索,或者通過數據服務反饋到業務系統中。
下面先來分析一下 Shopee 數據系統建設中遇到的三個典型問題。
1.2 流批一體的數據集成
第一個問題:在基于數據庫的數據集成過程中,存在同一份數據同時面臨流處理和批處理的需求。傳統的做法是實現全量導出和 CDC 兩條鏈路。全量導出鏈路滿足批處理的需求,CDC 鏈路用于實時處理和增量處理的場景。
然而,這種做法存在的一個問題是全量導出效率低,導致數據庫負載高。另外,數據一致性也難以得到保證。
同時,在批數據集構建上有一定的存儲效率優化,所以我們希望基于 CDC 數據去構建批數據集,以此同時滿足三種處理場景的需求,提高數據時效性。
1.3 狀態表明細存儲
第二個問題是狀態表明細的存儲。我們可以認為,傳統批數據集是在某一時間點對業務數據整體狀態的一個快照,壓縮到一個點的快照會合并掉業務流程中的過程信息。這些變化過程反映了用戶使用我們服務的過程,是非常重要的分析對象。一旦被合并掉,將無法展開。
另外,在很多場景下,業務數據每天變化的部分只占全量數據的一小部分,每個批次都全量存儲會帶來很大的資源浪費。
1.4 大寬表創建
第三個問題是大寬表的創建。近實時寬表構建是數據處理中常見的一種場景,它存在的問題是傳統的批處理延遲過高,使用流式計算引擎資源浪費嚴重。因此,我們基于多個數據集合構建了業務寬表,支持 Ad hoc 類 OLAP 查詢。
2. 為什么選擇 Hudi
針對上述業務中遇到的問題,基于以下三點考量,最終我們選擇 Apache Hudi 來作為解決方案。
2.1 生態支持豐富
我們期望使用純流式的方式建設數據集成環境,而 Hudi 對流式場景有著良好的支持。
第二點是對各個大數據生態的兼容。我們構建的數據集將會同時存在批處理、流處理、增量處理和動態探索等多種需求的負載。目前這些工作負載運行在各種計算引擎中,所以,對多種計算引擎的支持也在我們的考慮范圍之內。
另一個考量點則是和 Shopee 業務需求的契合。當前,我們亟待處理的數據集大部分來源于業務系統,都帶有唯一性標識信息,所以 Hudi 的設計更加符合我們的數據特性。
2.2 插件化的能力
目前我們平臺提供 Flink 和 Spark 作為通用計算引擎,作為數據集成和數倉建設負載的承載者,同時也使用 Presto 承載數據探索的功能。Hudi 對這三者都支持。
在實際使用中,根據業務數據的重要程度不同,我們也會給用戶提供不同的數據索引方式。
2.3 業務特性匹配
在數據集成過程中,用戶的 schema 變化是一個非常常見的需要。ODS 的數據變化可能導致下游的計算任務出錯。同時,在增量處理時,我們需要時間處理的語義。支持主鍵數據的存儲對于我們業務數據庫的數據來說,意義重大。
3. Shopee 在 Hudi 落地過程中的實踐 3.1 實時數據集成
目前 Shopee 內部有大量的業務數據來自業務數據庫,我們采用類似 CDC 的技術獲取數據庫中的變更數據,給業務方構建支持批處理和近實時增量處理的 ODS 層數據。
當一個業務方的數據需要接入時,我們會在進行增量實時集成之前先做一次全量 Bootstrap,構建基礎表,然后基于新接入的 CDC 數據進行實時構建。
構建的過程中,我們一般根據用戶需求選擇構建的 COW 表或者 MOR 表。
1)問題構建與解決方案
在進行實時構建的過程中,存在以下兩種較為常見的問題:
一種是用戶將有大量變更的數據集的類型配置為 COW 表,導致數據寫放大。此時我們需要做的事情是建立相應的監控來識別這種配置。同時,我們基于 MOR 表的配置化數據合并邏輯,支持數據文件的同步或者異步更新。
第二個問題是默認的 Bloom filter 導致數據存在性判斷的問題。這里比較好的方式是采用 HBase Index 解決超大數據集的寫入問題。
2)問題解決的效果
這是將我們的某些數據集成鏈路換成基于 Hudi 的實時集成后的效果。上圖是數據可見性占比與時延的關系,目前我們能保證 80% 的數據在 10 分鐘內可見可用,所有的數據 15 分鐘內可見可用。
下圖是我們統計的資源消耗占比圖。藍色部分是實時鏈路的資源消耗,紅色是歷史的按批數據集成的資源消耗。
因為切換成了實時鏈路,對于一些大表重復率低的數據減少了重復處理,同時也減少了集中式處理效率降低導致的資源消耗。因此,我們的資源消耗遠低于批處理方式。
3.2 增量視圖
針對用戶需要狀態明細的場景,我們提供了基于 Hudi Savepoint 功能的服務,按照用戶需要的時間周期,定期構建快照(Snapshot),這些快照以分區的形式存在元數據管理系統中。
用戶可以方便地在 Flink、Spark,或者 Presto 中利用 SQL 去使用這些數據。因為數據存儲是完整且沒有合并的明細,所以數據本身支持全量計算,也支持增量處理。
在使用增量視圖的存儲時,對于一些變化數據占比不大的場景,會取得比較好的存儲節省效果。
這里有一則簡單的公式,用于計算空間使用率:(1 + (t - 1) * p ) / t
。
其中,P 表示變化數據的占比,t 表示需要保存的時間周期數。變化數據占比越低,所帶來的存儲節省越好。對于長周期數據,也會有一個比較好的節省效果。
同時,這種方式對增量計算的資源節省效果也比較好。缺點是按批全量計算會有一定的讀放大的問題。
3.3 增量計算
當我們的數據集基于 Hudi MOR 表來構建時,就可以同時支持批處理、增量處理和近實時處理負載。
以圖為例,Table A 是一個增量的 MOR 表,當我們基于 Table A 來構建后續的表 B 和表 C 時,如果計算邏輯都支持增量的構建,那我們在計算的過程中,只需要獲取新增的數據和變化的數據。這樣在計算的過程中就顯著減少了參與計算的數據量。
這里是離線計算平臺基于 Hudi 的增量計算來構建的一個近實時的用戶作業分析。當用戶提交一個 Spark 任務到集群運行,任務結束后會自動收集用戶的日志,并從中提取相關的 Metric 和關鍵日志寫入到 Hudi 表。然后一個處理任務增量讀取這些日志,分析出任務的優化項,以供用戶參考。
當一個用戶作業運行完后,一分鐘之內就可以分析出用戶的作業情況,并形成分析報告提供給用戶。
增量 Join
除了增量計算,增量的 Join 也是一個非常重要的應用場景。
相對于傳統的 Join,增量計算只需要根據增量數據查找到需要讀取的數據文件,進行讀取,并分析出需要重寫的分區,重新寫入。
相對于全量來說,增量計算顯著減少了參與計算的數據量。
merge Into
Merge Into 是在 Hudi 中非常實用的一個用于構建實時寬表的技術,它主要基于 partial update 來實現。
MERGE INTO target_table t0
USING SOURCE TABLE s0
ON t0.id = s0.id
WHEN matched THEN UPDATE SET
t0.price = s0.price+5,
_ts = s0.ts;MERGE INTO target_table_name [target_alias]
USING source_table_reference [source_alias]
ON merge_condition
[ WHEN MATCHED [ AND condition ] THEN matched_action ] [...]
[ WHEN NOT MATCHED [ AND condition ] THEN not_matched_action ] [...]
matched_action
{ DELETE |
UPDATE SET * |
UPDATE SET { column1 = value1 } [, ...] }
not_matched_action
{ INSERT * |
INSERT (column1 [, ...] ) VALUES (value1 [, ...])
這里展示了基于 Spark SQL 的 Merge Into 語法,它讓用戶構建寬表的作業開發變得非常簡單。
基于 Merge Into 的增量 Join 實現
Hudi 的實現是采用 Payload 的方式,在一個 Payload 中可以只存在一張表的部分列。
增量數據的 Payload 被寫入到 log 文件中,然后在后續的合并中生成用戶使用的寬表。因為后續合并存在時間延遲,所以我們優化了合并的寫入邏輯。
在數據合并完成后,我們會在元數據管理中寫入一個合并的數據時間和相關的 DML,然后在讀取這張 MOR 表的過程中分析 DML 和時間,為數據可見性提供保障。
而采用 Partial Update 的好處是:
-
顯著降低了流式構建大寬表的資源使用;
-
文件級別的數據修改時,處理效率增高。
4. 社區貢獻
在解決處理 Shopee 內部業務問題的同時,我們也貢獻了一批代碼到社區,將內部的優化和新特性分享出來,比較大的 feature 有 Meta sync(RFC-55 已完成)
、snapshot view(RFC-61)
、partial update(HUDI-3304)
、FileSystemLocker(HUDI-4065 已完成)
等等;同時也幫助社區修復了很多 bug。后續也希望能夠用這種方式,更好地滿足業務需求的同時,參與社區共建。
4.1 snapshot View
增量視圖(snapshot view)有以下幾個典型應用場景:
-
每天在基礎表上生成名稱為
compacted-YYYYMMDD
的快照,用戶使用快照表生成每日的衍生數據表,并計算報表數據。當用戶下游的計算邏輯發生變化時,能夠選擇對應快照進行重新計算。還可以設置留存期為 X 天,每天清理掉過期數據。這里其實也可以在多快照的數據上自然地實現 SCD-2。 -
一個命名為
yyyy-archived
的存檔分支可以每年在數據進行壓縮和優化之后生成,如果我們的保存策略有變化(例如要刪除敏感信息),那么可以在進行相關的操作之后,在這個分支上生成一個新的快照。 -
一個命名為
preprod-xx
的快照可以在進行了必要的質量檢查之后再正式發布給用戶,避免外部工具與 pipeline 本身的耦合。
對于 snapshot view 的需求,Hudi 已經可以在一定程度上通過兩個關鍵特性來做支持:
-
Time travel:用戶可以提供一個時間點來查詢對應時間上的 Hudi 表快照數據。
-
Savepoint:可以保證某個 commit 時間點的快照數據不會被清理,而在 savepoint 之外的中間數據仍然可以被清理。
簡單的實現如下圖所示:
但是在實際的業務場景中,為了滿足用戶的 snapshot view 需求,還需要從易用性和可用性上考慮更多。
例如,用戶如何得知一個 snapshot 已經正確地發布出來了?這其中包含的一個問題是可見性,也就是說,用戶應該可以在整個 pipeline 中顯式地拿到 snapshot 表,這里就需要提供類似 Git 的 tag 功能,增強易用性。
另外,在打快照的場景中,一個常見的需求是數據的精準切分。一個例子就是用戶其實不希望 event time 在 1 號的數據漂移到 2 號的快照之中,更希望的做法是在每個 FileGroup 下結合 watermark 做精細的 instant 切分。
為了更好地滿足生產環境中的需求,我們實現了以下優化:
-
擴展了 savepoint metadata,在此基礎上實現快照的 tag、branch 以及 lifecycle 管理,和自動的 meta 同步功能;
-
在 MergeOnRead 表上實現精細化的 ro 表 base file 切分,在 compaction 的時候通過 watermark 切分日志文件,保證 snapshot 的精確性。也就是說,我們可以在流式寫入的基礎上,給下游的離線處理提供精確到 0 點的數據。
目前我們正在將整體功能通過 RFC-61 貢獻回社區,實際落地過程的收益前面章節已有介紹,這里不再贅述。
4.2 多源 Partial update
前文簡單介紹了多源部分列更新(大寬表拼接)的場景,我們依賴 Hudi 的多源合并能力在存儲層實現 Join 的操作,大大降低了計算層在 state 和 shuffle 上的壓力。
目前,我們主要是通過 Hudi 內部的 Payload 接口實現多源的部分列更新。下面這張圖展示了 Payload 在 Hudi 的寫端和讀端的交互流程。
實現的原理基本上就是通過自定義的 Payload class 來實現相同 key 不同源數據的合并邏輯,寫端會在批次內做多源的合并并寫入 log,讀端在讀時合并時也會調用相同的邏輯來處理跨批次的情況。
這里需要注意的是亂序和遲到數據(out-of-order and late events)的問題。如果不做處理,在下游經常會導致舊數據覆蓋新數據,或者列更新不完整的情況。
針對亂序和遲到數據,我們對 Hudi 做了 Multiple ordering value 的增強,保證每個源只能更新屬于自己那部分列的數據,并且可以根據設置的 event time (ordering value)
列,確保只會讓新數據覆蓋舊數據。
后續我們還準備結合 lock less multiple writers 來實現多 Job 多源的并發寫入。
5. 總結與展望
針對在 Shopee 數據系統建設中面臨的問題,我們提出了湖倉一體的解決方案,通過對比選型選擇了 Hudi 作為核心組件。
在落地過程中,我們通過使用 Hudi 的核心特性以及在此之上的擴展改造,分別滿足了三個主要的用戶需求場景:實時數據集成、增量視圖和增量計算。并為用戶帶來了低延時(約 10 分鐘)、降低計算資源消耗、降低存儲消耗等收益。
接下來,我們還將提供更多特性,并針對以下兩個方面做進一步完善,從而滿足用戶更多的場景,提供更好的性能。
5.1 跨任務并發寫支持
當前 Hudi 支持了基于文件鎖的單個任務單 writer 的寫入方式。
但是在實際中,有一些場景需要多個任務多 writer 同時寫入,且寫入分區有交叉,目前的 OCC 對這種情況支持不佳。目前我們正在與社區合作解決 Flink 與 Spark 多重 writer 的場景。
5.2 性能優化
元數據讀取以及 File listing 操作無論是在寫入端還是讀取端都會有很大的性能消耗,海量的分區對外部元數據系統(比如 HMS)也會造成很大壓力。
針對這一問題,我們計劃第一步將 schema 之外的信息存儲從 HMS 過渡到 MDT;第二步是在未來使用一個獨立的 MetaStore 和 Table service 的 server,不再強耦合于 HDFS。
在這個 server 中,我們可以更容易地優化讀取性能,更靈活地進行資源調整。