本文為 Apache Hudi 技術社區分享會嘉賓分享文章,主要介紹火山引擎 LAS 團隊自研的多場景樣本離線存儲技術,用于處理機器學習系統的離線數據流。同時,還會為大家揭秘流批一體樣本生成的過程,分享對 Hudi 內核所做出的優化和改造,探索其在數據處理領域的實際應用和效果。
以下為分享原文。
1. 業務場景
為了讓大家更容易理解接下來要講的基于數據湖的樣本存儲和樣本生成問題,文章先給大家簡單介紹一些相關的基礎概念。首先是機器學習系統的離線數據流架構,機器學習系統和其他線上服務系統類似,其中和樣本有關的角色也比較集中。如下圖所示,整個離線數據流架構分為流式和批式兩種類型,其中的樣本數據由兩部分構成,分別是特征和標簽。
在流式架構中,特征由在線預估服務在 serving 時 dump 對應的快照并發送到消息隊列中。標簽則來自實時行為采集服務,通過日志上報等方法采集得到。在線樣本生成服務消費兩個數據流,通過關聯得到完整的樣本,并發送到下游的流式訓練服務中進行模型訓練,完成樣本數據的消費。
批式架構是流式架構的補充,批式架構在訂閱流式數據的同時,還會加入批式的特征或者批式生成的標簽。比如風控反作弊或者廣告類的業務,會有批式生產的數據,并使用批式的樣本生成模塊生成樣本,進而被模型訓練組件消費。
流式和批式數據流架構中,還有元數據服務,元數據服務記錄了特征的相關元數據,流式批式數據流都會訪問元數據服務獲取 meta 信息。因此,我們對于批式的特征存儲有若干種特定的訪問 pattern。
讀方面有以下讀數據 pattern:大范圍的按天批式讀取,關注吞吐指標;秒級的點查;高效的謂詞下推查詢能力;存在基于主鍵/外建的 join。
在寫方面需支持以下能力:基于主鍵的 upsert;針對部分 cell 的插入與更新;針對行/列/cell 的刪除;基于外鍵的 upsert。
在這樣的背景下,我們了解 Hudi 在機器學習離線數據流中的若干應用場景。
2.離線樣本存儲與迭代
我們希望設計的樣本離線存儲方案能夠適用于多種場景,主要包含以下三類情況。
第一,模型的重新訓練,回放流式訓練的過程,迭代/糾偏模型等等。
第二,樣本的數據迭代,增加修改或者刪除對應的特征/標簽,并重新訓練模型。
第三,樣本的 OLAP 查詢,用于日常 debug 等。
為了能夠支持以上的場景的樣本存儲與迭代,我們提出的存儲方案整體架構設計如下。在邏輯建模上,構建樣本存儲和構建特定 pattern 的 Hive 表非常類似,樣本包含主鍵、分區鍵、內部元數據列等功能性 column,然后包含若干特征列和若干標簽列。在物理架構上,通過流式和批式生產/采集的特征數據和標簽數據通過多個作業混合 upsert 的方式寫入 Hudi,更新位于 KV 存儲的索引信息,并將實際的數據寫入 HDFS 中。由于 Hudi 基于主鍵/外鍵 upsert 的特性,數據會被自然地拼接在一起,形成完整的包含特征和標簽的樣本數據,供消費使用。
在對離線特征進行調研時,我們需要面臨以下挑戰:基于 HDFS 這種不可變的文件存儲,如何實現低成本低讀寫放大的數據修改。在沒有使用數據湖之前,用戶做離線特征調研之前需要復制樣本,修改并另存一份。其中消耗了巨大的計算和存儲資源,伴隨樣本量的增大,這樣的方案將消耗數個 EB 的存儲,使得迭代變得不可能。
我們基于 Hudi 實現了 ColumnFamily 的能力。這個方案受到了經典 BigTable 存儲 Apache Hbase 的啟發,將 IO pattern 不同的數據使用不同的文件進行存儲,以減少不必要的讀寫放大。原理是將同一個 FileGroup 的不同列數據存儲在不同的文件中,在讀時進行合并。這種方法會將新增列的數據單獨進行文件存儲,發生修改或者新增成本很低。
我們通過為調研特征列賦予單獨的 CF 的方式來減少讀寫放大,其他列復用線上的特征所在的 CF。這樣資源的使用量只會和新增特征相關。這種方式極大得減少了迭代所需的存儲使用,并且不會引入任何 shuffle 操作。
上文介紹了離線樣本的存儲與迭代方案,接下來我們進一步為大家介紹在線樣本生成時的流批一體生成方案,討論其如何降低在線存儲的使用成本。
3. 流批一體的樣本生成
在線樣本生成服務中,我們使用 KV 或者 BigTable 類存儲來滿足樣本拼接的需求,比如 RocksDB 等。這類存儲點查性能好,延遲低,但是存儲成本也較高。如果在數據有明顯的冷熱分層的情況下,這類存儲本身并不能很好的滿足這樣的存儲需求。Hudi 是一個具有 KV 語義的離線存儲,存儲成本較低,我們將冷數據存在 Hudi 上的方式來降低在線存儲的使用成本,并通過統一的讀寫接口來屏蔽差異。這一架構也受到了目前市面的多種 HSAP 系統的啟發。
為了能夠讓 Hudi 支持更好的點查,我們復用了寫時的 Hbase 索引。點查請求會先訪問 Hbase 索引找到數據所在文件,然后根據文件進行點查。整體端到端的延遲可以做到秒級。適合存儲數據量大,qps 較低的場景。
4. 功能與優化
在使用 Hudi 滿足諸多業務需求的過程中,我們也對其內核做了一些改造,以更好得服務我們的業務場景。
4.1 Local Sort
我們支持了單文件內的主鍵排序。排序是較為常見的查詢性能優化手段。通過對主鍵的排序,享受以下收益
● CF 在讀時,多 CF 合并使用 Sort Merge 的方式,內存使用更低。
● Compaction 時支持 Sort Merge。不會觸發 spill,內存使用低。我們之前使用 SSD 隊列來做 Compaction 以保證性能,現在可以使用一些廉價的資源(比如無盤的潮汐資源)來進行 Compaction。
● 在流批一體的樣本生成中,由于主鍵是排好序的,我們點查時基于主鍵的謂詞下推效果非常好。提升了點查性能。
4.2 Bulkload 并發寫
并發寫一直是 Hudi 的比較大的挑戰。我們的業務場景中會發生行級別/列級別的寫沖突,這種沖突無法通過樂觀鎖來避免?;跈C器學習對于數據沖突的解決需求,我們之前就支持了 MVCC 的沖突解決方式。更進一步得,為了能夠讓 Hudi 支持并發讀寫,我們參考 Hbase 支持了 Bulkload 的功能來解決并發寫需求。所有寫數據都會寫成功,并由數據內部的 mvcc 來決定數據沖突。
我們首先將數據文件生成到一個臨時緩沖區,每個緩沖區對應一個 commit 請求,多個寫臨時緩沖區的請求可以并發進行。當數據完整寫入臨時緩沖區之后,我們有一個常駐的任務會接收數據 load 的請求,將數據從緩沖區中通過文件移動的方式 load 進 Hudi,并生成對應的 commit 信息。多個 load 請求是線性進行的,由 Hudi Timeline 的表鎖保證,但是每個 load 請求中只涉及文件的移動,所以 load 請求執行時間是秒級,這樣就實現了大吞吐的數據多并發寫和最終一致性。
4.3 Compaction Service
關于 Compaction,Hudi 社區提供了若干 Compaction 的開箱即用的策略。但是業務側的需求非常靈活多變,無法歸類到一種開箱即用的策略上。因此我們提供了 Compaction Service 這樣的組件用來處理用戶的 Compaction 請求,允許用戶主動觸發一次 Compaction,并可指定 Compaction 的數據范圍,資源使用等等。用戶也可以選擇按照時間周期性觸發 Compaction,以達到自動化數據生效的效果。
在底層我們針對 Compaction 的業務場景做了冷熱隊列分層,根據不同的 SLA 的 Compaction 任務,會選擇對應的隊列資源來執行。用來降低 Compaction 的整體成本。比如每天天級別的數據生效是一個高保障的 Compaction 任務,會有獨占隊列來執行。但是進行歷史數據的單次修復觸發的 Compaction,對執行時間不敏感,會被調度到低優先級隊列以較低成本完成。
針對數據湖的樣本存儲與生成問題,我們搭建了適用于多種場景的存儲方案架構,實現了批流一體的樣本生成,并且通過對 Hudi 內核進行一定的改造,實現更加滿足實際業務需求的功能設計。
以上就是字節跳動在 Hudi 的實踐,目前均已通過火山引擎 湖倉一體分析服務 LAS 產品對外服務,歡迎對這方面有需求、感興趣的用戶都可以積極地來體驗一下我們的 LAS 湖倉一體分析服務 。
湖倉一體分析服務 LAS(Lakehouse Analytics Service)是面向湖倉一體架構的 Serverless 數據處理分析服務,提供字節跳動最佳實踐的一站式 EB 級海量數據存儲計算和交互分析能力,兼容 Spark、Presto 生態,幫助企業輕松構建智能實時湖倉。