8 月 25 日,字節跳動宣布,正式開源 Cloud Shuffle Service。
Cloud Shuffle Service(以下簡稱 css) 是字節自研的通用 Remote Shuffle Service 框架,支持 Spark/FlinkBatch/MapReduce 等計算引擎,提供了相比原生方案穩定性更好、性能更高、更彈性的數據 Shuffle 能力,同時也為存算分離 / 在離線混部等場景提供了 Remote Shuffle 解決方案。
目前,CSS 已在 Github 上開源,歡迎感興趣的同學一起參與共建!
項目地址:
https://github.com/bytedance/CloudShuffleService
開源背景
在大數據計算引擎中,Pull-Based Sort Shuffle 是一種常見的 Shuffle 方案,比如 Spark/MapReduce/FlinkBatch (高于 1.15 版本) 等都將 Sort Shuffle 作為引擎默認方案,但是 Sort Shuffle 實現機制有一定的缺陷,在大規模生產環境下經常因為 Shuffle 問題影響作業穩定性。
以 Spark 的 Sort Shuffle 為例:
如上圖所示鏈路,Sort Shuffle 會存在以下一些問題:
-
將多個 Spill 文件合并成一個文件,會額外消耗讀寫 IO;
-
假設有 m 個 MapTask & n 個 ReduceTask,會產生 m*n 個網絡鏈接,當數量特別多時:
- 大量的網絡請求會導致 Shuffle Service 容易形成積壓;
- Shuffle Service 會產生大量的隨機讀取,容易導致 IO 瓶頸,特別是 HDD 集群;
-
Shuffle Service 無法做到 Application 的資源隔離,當有一個異常作業時,可能會影響同一個 Shuffle Service 節點上其它所有作業,問題容易放大;
-
MapTask 生成的 Shuffle Data File 只存儲一份到本地,當磁盤壞了也會導致數據丟失,同樣引起 FetchFailed 問題;
-
Shuffle Data File 寫到本地磁盤的方式,依賴計算節點上的磁盤,無法做到存算分離
這些都很容易導致 ShuffleRead 慢或者超時,引起 FetchFailed 相關錯誤,嚴重影響線上作業的穩定性,ShuffleRead 慢也會大大降低資源利用率 (CPU&Memory),同時 FetchFailed 也會導致 Stage 中相關 Task 重算,浪費大量資源,拖慢整個集群作業運行;無法存算分離的架構,在在離線混部 (在線資源磁盤不足)/Serverless 云原生等場景下,也很難滿足要求。
字節跳動使用 Spark 作為主要的離線大數據處理引擎,每天線上運行作業數過百萬,日均 Shuffle 量 300+PB。在 HDFS 混部 & 在離線混部等場景,Spark 作業的穩定性經常無法得到保障,影響業務 SLA:
-
受限 HDD 磁盤 IO 能力 / 磁盤壞等情況,導致大量的 Shuffle FetchFailed 引起的作業慢 / 失敗 /Stage 重算等問題,影響穩定性 & 資源利用率
-
External Shuffle Service (以下簡稱 ESS) 存算無法分離,遇到磁盤容量低的機器經常出現磁盤打滿影響作業運行
在此背景下,字節跳動自研了 CSS,用來解決 Spark 原生 ESS 方案的痛點問題。自 CSS 在內部上線一年半以來,當前線上節點數 1500+,日均 Shuffle 量 20+PB,大大提高了 Spark 作業的 Shuffle 穩定性,保障了業務的 SLA
Cloud Shuffle Service 介紹
CSS 是字節自研的 Push-Based Shuffle Service,所有 MapTask 通過 Push 的方式將同一個 Partition 的 Shuffle 數據發送給同一個 CSS Worker 節點進行存儲,ReduceTask 直接從該節點通過 CSS Worker 順序讀取該 Partition 的數據,相對于 ESS 的隨機讀取,順序讀的 IO 效率大大提升。
CSS 架構
Cloud Shuffle Service(CSS) 架構圖
CSS Cluster 是獨立部署的 Shuffle Service 服務,主要涉及的組件為:
-
CSS Worker
CSS Worker 啟動后會向 ZooKeeper 節點注冊節點信息,它提供 Push/Fetch 兩種服務請求,Push 服務接受來自 MapTask 的 Push 數據請求,并將同一個 Partition 的數據寫到同一個文件;Fetch 服務接受來自 ReduceTask 的 Fetch 數據請求,讀取對應 Partition 數據文件返回;CSS Worker 還負責 Shuffle 數據清理的工作,當 Driver 進行 UnregisterShuffle 請求刪除 ZooKeeper 對應 ShuffleId 的 Znode 時,或者 Application 結束刪除 ZooKeeper 中 ApplicationId 的 Znode 時,CSS Workers 會 Watch 相關事件對 Shuffle 數據進行清理。
-
CSS Master
作業啟動后會在 Spark Driver 中啟動 CSS Master,CSS Master 會從 ZooKeeper 中獲取到 CSS Worker 的節點列表,然后為后續 MapTask 產生的各個 Partition 分配 n 個副本 (默認為 2) 的 CSS Worker 節點,并對這些 Meta 信息進行管理,供 ReduceTask 獲取 PartitionId 所在的 CSS Worker 節點進行拉取,同時在 RegisterShuffle/UnregisterShuffle 過程中會在 ZooKeeper 中創建對應的 ApplicationId/ShuffleId 的 Znode,CSS Worker 會 Watch Delete 事件對 Shuffle 數據進行清理。
-
ZooKeeper
如前描述,用來存儲 CSS Worker 節點信息以及 ShuffleId 等信息。
CSS 特性
-
多引擎支持
CSS 除了支持 Spark(2.x&3.x) 之外,也可以接入其他引擎,目前在字節跳動內部,CSS 還接入了 MapReduce/FlinkBatch 引擎。
-
PartitionGroup 支持
為了解決單個 Partition 太小,Push 效率比較低的問題,實際會將多個連續的 Partition 組合成更大的 PartitionGroup 進行 Push。
-
高效統一的內存管理
跟 ESS 類似,MapTask 中的 CSS Buffer 將所有 Partition 的數據都存儲在一起,在 Spill 之前會對數據按照 PartitionId 進行排序,然后按照 PartitionGroup 維度進行數據推送;同時 CSS Buffer 完全納入 Spark 的 UnifiedMemoryManager 內存管理體系,內存相關參數由 Spark 統一管理。
-
容錯處理
Push 失敗:當觸發 Spill 進行 Push PartitionGroup 數據時,每次 Push 的數據大小為 4MB(一個 Batch),當某次 Push batch 失敗時,并不影響之前已經 Push 成功的數據,只需要重新分配節點(Reallocate)繼續 Push 當前失敗的數據以及后續還未 Push 的數據,后續 ReduceTask 會從新老節點讀取完整的 Partition 數據;
多副本存儲:ReduceTask 從 CSS Worker 讀取某個 Partition 數據是按照 Batch 粒度進行拉取的,當 CSS Worker 異常(如網絡問題 / 磁盤壞等)導致無法獲取該 Batch 數據,可以繼續選擇另外一個副本節點繼續讀取該 Batch 以及后續 Batch 的數據;
數據去重:當作業開啟 Speculative 推測執行會有多個 AttempTask 并發跑,需要在讀取的時候進行去重。在 Push Batch 的時候,會給 Batch 數據加上 Header 信息,Header 信息中包含 MapId + AttempId + BatchId 等信息,ReduceTask 讀取時可以根據這些 ID 信息進行去重。
-
Adaptive Query Execution(AQE) 適配
CSS 完整支持 AQE 相關的功能,包括動態調整 Reduce 個數 / SkewJoin 優化 /Join 策略優化。對于 SkewJoin,CSS 做了更多的適配優化工作,解決了 Skew Partition 數據被多個 ReduceTask 重復讀取問題,大大提高了性能。
CSS 性能測試
我們將 CSS 與開源的 ESS 使用獨占 Label 計算資源進行 1TB 的 TPC-DS Benchmark 測試對比,整體端到端的性能提升15%左右,部分 Query 有30%以上的性能提升。同時我們也使用線上混部資源隊列 (ESS 穩定較差) 進行 1TB 的 TPC-DS Benchmark 測試對比,整體端到端性能提升4 倍左右。
圖片CSS 1TB 測試提升 30% 以上的 Query
未來規劃
CSS 目前開源了部分 Feature,還有一些 Feature & 優化后續會陸續開放:
支持 MapReduce/FlinkBatch 引擎;
CSS 集群增加 ClusterManager 服務角色,管理 CSS Worker 的狀態 & 負載信息,同時將當前 CSS Master 分配 CSS Worker 的功能提到 ClusterManager;
基于異構機器 (如磁盤能力不同)/ 負載 等維度的 CSS Worker 分配策略。