導讀:ClickHouse已經成為行業主流且熱門的開源引擎。隨著業務數據量擴大,場景覆蓋變廣泛,在復雜query場景下,ClickHouse容易存在查詢異常問題,影響業務正常推進。本次主要分享字節跳動如何解決ClickHouse復雜查詢問題,并詳細解讀技術實現細節,目前該能力已經通過火山引擎ByteHouse面向開發者輸出。
全文將圍繞以下幾方面展開:
- 項目背景
- 技術方案
- 優化與診斷
- 效果及展望
01
項目背景
1. ClickHouse執行模式
ClickHouse 的執行模式相對比較簡單,和Druid、ES 類似,其基本查詢模式分為兩個階段:
第一階段,Coordinator 收到查詢后將請求發送給對應的 worker 節點;
第二階段,Coordinator 收到各個 worker 節點的結果后匯聚起來處理后返回。
以下面的SQL為例:
Select name from student_distribute where id = 5
①當 Coordinator 收到請求后,由于student_distribute是一個分布式表,因此需要將sql 改寫為對local表查詢,并轉發請求給每一個shard的worker;
②Worker收到請求后查詢本地的local表數據,返回結果給coordinator;
③Coordinator匯總每一個shard的數據并把結果返回給client。
Select name from student_local where id = 5
第二階段執行的模式能夠高效地支持很多常見場景,比如常見的針對大寬表的各類查詢,但是隨著業務場景的復雜化,也存在以下三點問題:
其一,第一階段返回的數據比較多且第二階段的計算比較復雜時,對于Coordinator的壓力會比較大,容易成為query的瓶頸,且shard越多可能計算越慢,瓶頸越大。例如一些重計算的agg算子count distinct。如果我們使用hash表去重時,第二階段需要在coordinator單機上merge各個worker的hash表,計算量很重且不能并行;又比如說group by基數比較大或者window計算。
其二,join是SQL的重要場景。由于不支持Shuffle操作,對于Join來說右表必須是全量數據。無論是普通Join還是Global Join,當Join的右表比較大時都放到內存里容易OOM,而Spill到磁盤雖然解決內存問題,可能會因為有磁盤 io和序列化計算的開銷影響性能。特別是當Join為最常見的Hash Join 時,右表如果是大表構建也比較慢。雖然社區最近也做了一些右表構建的優化,通過單機按照 join key split 來達到并行構建hash table。但是額外的代價是左右表都增加了一次 split 操作。
其三,對于復雜查詢(如多表 Join、嵌套多個子查詢、window function等)的支持并不友好,由于不能通過shuffle來分散數據,生成的pipeline在一些case下不能充分并行,難以充分發揮集群的全部資源。
2. 其他MMP數據庫
目前主流的MPP數據庫基本都支持Stage執行的方式。以Presto為例,如下圖所示,一個兩表join的agg sql可拆分為5個 Stage。
其中 Stage3、Stage4分別對應左右表數據讀取,Stage2完成兩表Join和partial agg 計算,Stage1完成final agg計算,Stage0收集Stage1的數據后匯總和輸出。在這個過程中,Stage 3、4、2、1可以在多個節點上并行執行,單個復雜的query被拆分成若干Stage,從而實現了Stage之間,不同worker的數據傳輸。
3. 業務背景和目標
隨著業務復雜程度提高,業務并不希望所有的數據都通過etl 產生大寬表;復雜查詢(特別是多輪分布式 Join和比較多的agg)的需求越來越強烈,而整體的數據量又在不斷增長。在集群資源有限的情況下,我們希望能夠充分利用機器資源,基于ClickHouse 高效地支持復雜查詢。
ByteHouse是字節跳動研發同學基于開源ClickHouse 進行了深度優化和改造的版本,提供海量數據上更強的查詢服務和數據寫入性能,支持多種應用場景。如圖所示,ByteHouse在內部多個場景如行為分析、畫像分析、智能營銷分析、App 日志分析上得到充分的驗證和使用,并在多個方面進行了增強,具備特有的能力。
02
技術方案
1. 設計思想
基于 ClickHouse 的復雜查詢的實現采用分Stage的方式,替換目前 ClickHouse的兩階段執行方式。類似其他分布式數據庫引擎(如 Presto、Impala 等),將一個復雜的Query按照數據交換情況切分成多個Stage,Stage和Stage之間通過 exchange完成數據的交換,單個Stage內不存在數據交換。Stage間的數據交換主要有以下三種形式:
①按照單(多)個 key 進行 Shuffle(shuffle)
②由1個或者多個節點匯聚到一個節點(我們稱為 gather)
③同一份數據復制到多個節點(也稱為 broadcast 或者說廣播)
按照不同的功能切分不同的模塊,設計目標如下:
①各個模塊約定好接口,盡量減少彼此的依賴和耦合。一旦某個模塊有變動不會影響別的模塊,例如Stage生成邏輯的調整不影響調度的邏輯。
②模塊采用插件的架構,允許模塊根據配置靈活支持不同的策略。
2. 相關術語
- ExchangeNode 在語法樹中表示數據交換的節點
- PlanSegment 單個 Stage 對應的執行的計劃片段
- ExchangeManager 管理數據的 exchange,負責不同 Stage 節點之間的數據交換
- SegmentScheduler 計劃片段調度器,負責下發計劃片段給 worker,由 Coordinator 節點調用
- InterpreterPlanSegment 計劃片段執行器,執行一個具體的計劃片段
3. 執行流程
①Coordinator 接受復雜查詢后,在目前 ClickHouse 語法樹的基礎上,根據節點類型和數據分布情況插入 Exchange 節點并生成分布式 Plan。
②Coordinator 根據 Exchange Node 類型,切分分布式 Plan 生成每個 Stage 的執行片段 PlanSegment。
③Coordinator 調用 SegmentScheduler 將各階段的 PlanSegment 發送到 Worker 節點。
④Worker 節點接受 PlanSegment 通過 InterpreterPlanSegment 完成數據的讀取和執行,通過 ExchangeManager 完成數據的交互。
⑤Coordinator 從最后一輪 Stage 對應節點的 ExchangeManager 讀取數據后處理后返回給 client。
4. Plan切分
下面是一個Plan切分的例子,這是1個2表Join的查詢場景,根據Exchange信息,將整個分布式 Plan切分成4個Stage。
5. 查詢片段調度器(SegmentScheduler)
查詢片段調度器SegmentScheduler 根據上下游依賴關系和數據分布,以及 Stage 并行度和worker 分布和狀態信息,按照一定的調度策略,將 PlanSemgent 發給不同的 Worker 節點。
目前支持的2種策略是:
①依賴調度:根據 Stage 依賴關系定義拓撲結構,產生 DAG 圖,根據 DAG 圖調度 stage,類似于拓撲排序,等到依賴的 Stage 啟動后再啟動新的 Stage。例如剛才的兩表 join,會先調度左右表讀取 stage,再調度 join stage。
②AllAtOnce:類似于Presto的AllAtOnce策略,會先計算每一個 Stage 的相關信息,一次性調度所有的Stage。
相比而言,這兩種策略是在容錯、資源使用和延時上做取舍。
第一種調度策略可以實現更好的容錯,由于 ClickHouse 可以有多個副本,當前一個 Stage 部分節點連接失敗時可以嘗試切換到副本節點,對后續依賴 stage 無感知。這里指的是讀數據的 Stage,我們稱為 Source Stage,非 Source Stage 因為沒有數據依賴,容錯能力會更強,只要保證并行度的節點數即可,甚至極端情況下可以降低 stage 并行度來支持更好的容錯。缺點是調度有依賴,不能完全并行,會增加調度時長,對于一些數據量和計算量小,但是 stage 多的節點調度延時可能會占 SQL 整體時間不小的比例。我們也做了一些針對性的優化,對于無依賴關系的盡可能支持并行。
第二種調度策略通過并行可以極大降低調度延時,為防止大量網絡 io 線程,我們通過異步化并且控制線程數目;這種策略的缺點是容錯性沒有依賴調度好,因為每一個 stage 的 worker 在調度前就已經確定,如果有一個 worker 出現連接異常則整個查詢會直接失敗。并且可能有一些 Stage 上游數據還沒有 Ready 就被調度執行了,需要長時間等數據。例如 final agg stage,需要等 partial agg 完成后才能拿到數據。雖然我們做了一些優化,并不會長時間空跑浪費 CPU 資源,但是畢竟也消耗了一部分資源,比如創建了執行的線程。
6. 查詢片段執行器(InterpreterPlanSegment)
下面介紹下計劃片段是如何執行的,原本 ClickHouse的查詢和節點執行主要是 SQL 形式,切分Stag后需要支持執行一個單獨的PlanSemgent。因此 InterpreterPlanSegment 的主要功能就是接受一個序列化后的 PlanSemgent,能夠在 Worker 節點上運行整個 PlanSemgent 的邏輯。主要的步驟為:
①根據 input 信息讀取數據,如果 input 是具體的 table,則從本地讀取數據;如果 input 是一個 exchange input,則從對應的 ExchangeManager 讀取數據;
②執行 PlanSemgent 的邏輯;
③輸出處理后的結果數據,如果是 Coordinator 節點,就將數據發給 Client;如果是非Coordinator 節點,就按照數據的exchange方式寫給本實例對應的 ExchangeManager。
Interpreter部分我們盡量復用當前ClickHouse的執行邏輯,例如processor 執行方式,process list管理等等。相比于InterpreterSelect邏輯要更簡單一些,可以認為1 個Stage只有1個階段。當然我們也做了很多功能和性能的增強,例如我們支持1個 stage處理多個join等,這樣可以減少stage數目和不必要的數據傳輸,在一張大表(通常情況下是事實表) join 多個維度表的場景有比較好的幫助。
InterpreterPlan Segment執行完會向coordinator上報對應的狀態信息。執行異常的時候會將異常信息報告給查詢片段調度器,取消Query其他worker的執行。
7. 數據交換(ExchangeManager)
ExchangeManager是PlanSegment數據交換的媒介,更是平衡數據上下游處理能力的重要組件。整體上采用 push 的方式,當上游數據 ready 時主動推送給下游,并支持反壓。其架構如下圖所示:
具體的流程如下:
①下游PlanSegment執行時,當input為exchange input時,根據一定的 token 規則 (通常由 query_id+segment_id+index_id 等組成)和數據 source 信息,向上游 ExchangeManager 注冊對應的數據請求;
②上游ExchangeManager收到請求后,建立上下游數據通道,并將上游的數據推送到下游,如果通道一直建立不了會 block 上游的執行。
在這個過程中,上下游都會通過隊列來優化發送和讀取,當隊列飽和的時候通過反壓的機制控制上游的執行速度。由于采用了 push 和隊列,這里我們要考慮一個特殊的場景,在某些 case 下下游的 Stage 并不需要讀取全部的上游數據,一個典型的場景是 limit。例如 limit 100,下游 stage 是需要讀取 100 條數據即可,而上游可能會輸出更大規模的數據,因此在這種情況下,當下游 stage 讀到足夠的數據后,需要能主動取消上游數據的執行并清空隊列。這是一個特定場景的優化,能夠大大加速查詢時間。
ExchangeManager 需要考慮和優化的點還有:
①細粒度的內存控制,能夠按照實例、query、segment 多層次進行內存控制,避免 OOM,更長期的考慮是支持 spill 到磁盤上,降低對內存的使用。為了提升傳輸效率,小數據需要進行 merge,大數據要 split。同時,網絡處理在某些場景要保證有序性,比如 sort 時,partial sort 和 merge sort 的網絡傳輸必須有序,否則數據可能是有問題的。
②連接復用和網絡優化,包括針對上下游在同一個節的場景下選擇走內存的交換不走網絡,可以減少網絡的開銷和減少數據序列化、反序列化的代價。另外,由于 ClickHouse 在計算方面做了非常充足的優化,有些場景下甚至內存帶寬成為瓶頸,我們在ExchangeManager的一些場景上也應用zero copy等技術來減少內存的拷貝。
③異常處理和監控,相比于單機執行,分布式情況下異常情況更復雜且不好感知。通過重試能避免一些節點的暫時高負載或者異常,以及出問題時能夠快速感知、排查和做針對性解決和優化。這里的工程實踐更多一些。
03
優化與診斷
1. Join 多種實現
根據數據的規模和分布,我們支持了多種Join實現,目前已經支持的有:
①Shuffle Join,最通用的 Join;
②Broadcast Join,針對大表Join小表的場景,通過把右表廣播到左表的所有 worker 節點來減少左表的傳輸;
③Colocate Join,針對左右表根據Join key保持相通分布的場景,減少左右表數據傳輸。
2. 網絡連接優化
網絡連接的優化的核心本質就是減少連接的使用。特別是數據需要Shuffle 的時候,下一輪 Stage的每一個節點需要從上一輪Stage的每一個節點拉取數據。當一個集群的節點比較多的時候,如果存在比較多的復雜 Query(Stage多,并行度(節點數)比較大),集群的Worker節點會建立非常多的連接,如下圖所示,單節點建立的連接數與集群節點數、并發stage數成正比。
字節內部的clickhouse集群規模非常大,最大的集群(單集群幾千臺規模)在目前 ClickHouse 的執行模式下單機最大可能會建立上幾萬個網絡連接。因此如果支持復雜 Query 執行,由于stage變多了,需要優化網絡連接,特別是支持連接復用。我們通過盡可能復用連接,在不同節點之間只會建立固定數目的連接,不同的查詢會復用這些連接,不隨 query 和 stage 的規模而增長。
3. 網絡傳輸優化
在數據中心領域,遠程直接內存訪問(RDMA)是一種繞過遠程主機操作系統內核訪問其內存中數據的技術,由于不經過操作系統,不僅節省了大量CPU資源,同樣也提高了系統吞吐量、降低了系統的網絡通信延遲,尤其適合在大規模并行計算機集群中有廣泛應用。
由于ClickHouse在計算層面做了很多優化,而網絡帶寬相比于內存帶寬要小不少,在一些數據量傳輸特別大的場景,網絡傳輸會成為一定的瓶頸。為了提升網絡傳輸的效率和提升數據exchange的吞吐,一方面我們引入壓縮來降低傳輸數據量,另一方面我們引入 RDMA 來減少一定的開銷。經過測試,在一些數據傳輸量大的場景,有不小的收益。
4. runtime Filter
Join算子通常是OLAP引擎中最耗時的算子。如果想優化 Join 算子,可以有兩種思路,一方面可以提升Join算子的性能,例如更好的Hash Table實現和Hash算法,以及更好的并行。另一方面可以盡可能減少參與Join計算的數據。
Runtime Filter在一些場景,特別是事實表join維度表的星型模型場景下會有比較大的效果。因為這種情況下通常事實表的規模比較大,而大部分過濾條件都在維度表上,事實表可能要全量join維度表。Runtime Filter的作用是通過在 Join 的 probe 端(就是左表)提前過濾掉那些不會命中Join的輸入數據來大幅減少 Join 中的數據傳輸和計算,從而減少整體的執行時間。以下圖為例:
左表并沒有直接過濾條件,右表帶有過濾條件item.proce > 1000。當完成右表查詢時,可以確定item.id 的范圍和集合,根據join類型inner join和join條件sales.item_id=item.id可以推斷出sales.item的范圍和集合。我們可以把sales.item 的范圍和集合作為一個過濾條件,在join前過濾sales的數據。
我們在復雜查詢上支持了Runtime Filter,目前主要支持minmax和bloomfilter。
總體執行流程如下:
①build plan segment worker(right table)會將生成的單節點 runtime filter 發送到coordinator節點;
②coordinator 在等待各個 worker的 runtime filter 都發送完成之后進行一次merge操作,將合并好的 runtime filter 分發到各個 execute plan segment worker(left table)節點中去;
③在 runtime filter 構造期間,execute plan segment(left table) 需要等待一定的時間,在超時之前如果runtime filter已經下發,則通過 runtime filter 執行過濾。
這里需要思考一個問題,Runtime filter column 是否構建索引(主鍵、skip index等)和命中prewhere?如果runtime filter的列(join column)構建了索引是需要重新生成 pipeline 的。因為命中索引后,可能會減少數據的讀取,pipeline并行度和對應數據的處理range都可能發生變化。如果runtime filter的列跟索引無關,可以在計劃生成的時候預先帶上過濾條件,只不過一開始作為占位是空的,runtime filter下發的時候把占位信息改成真正的過濾條件即可。這樣即使runtime filter 下發超時了,查詢片段已經開始執行了,只要查詢片段沒有執行完,之后的數據仍然可以進行過濾。
需要注意的是,runtime filter 是一種特殊場景下的優化,其針對的場景是右表數據量不大,且構建的 runtime filter 對左表有比較強的過濾效果。如果右表數據量比較大,構建runtime filter比較慢,或者對左表的數據過濾效果很差甚至沒有,那么 runtime filter 反而會增加查詢的耗時。因此,要根據數據的特征和規模來決定是否開啟。
5. 診斷和分析
引入復雜查詢的多Stage 執行模型后,SQL的執行模式變得復雜了。特別是當用戶查詢一些非常復雜的查詢,幾百行的sql生成的stage會非常多,把stage都看一遍并理解sql的含義要花比較長的時間。題外話:我們很早之前就完整的跑通了所有的tpcds query,這里面就有一些sql可能會產生幾十個 stage。那么在這種情況下,如何定位 SQL 的瓶頸并加以優化是一個難題。
我們做了如下兩點優化:
首先,最常見的做法是增加各類完善的metrics,包括整個Query的執行時間和不同Stage的執行時間、IO數據量、算子處理數據和執行情況、算子 metrics 和profile event等。
其次,我們記錄了反壓信息和上下游隊列長度,以此來推斷 stage 執行情況和瓶頸。
坦率地說,SQL 場景包括萬象,很多非常復雜的場景目前還是需要對引擎比較熟悉的同學才能診斷和分析SQL才能給出優化建議。在不斷積累經驗的過程中,我們希望通過能夠不斷完善 metrics 和分析路徑,不斷減輕oncall的負擔,并且在某些場景下可以更智能地給出優化提示,這對于使用同學來說也是有好處的。
04
效果及展望
1. 復雜查詢效果
根據上面的執行模型的三個缺點,分別測試如下三個場景:
①第二階段的計算比較復雜
②Hash Join 右表為大表
③多表 Join
以SSB 1T數據作為數據集,集群包含8個節點。
2. 第二階段的計算比較復雜
這個case SQL 如下圖所示:
uniqExact是count distinct的默認算法,采用hash table進行數據去重。使用復雜查詢后,query 執行時間從 8.514s=>2.198s,第二階段 agg uniqExact 算子的合并原本由 coordinator單點合并,現在通過按照group by key shuffle 后可以由多個節點并行完成。因此通過shuffle減輕了coordinator的merge agg 壓力。
3. Hash Join 右表為大表
這個 case 演示了右表是一個大表的場景,由于 ClickHouse 對多表的優化做的還不是很到位。這里采用子查詢來下推過濾的條件。
在這個case中,采用復雜查詢模式后,query 執行時間從17.210=>1.749s。lineorder 是一張大表,通過shuffle可以將大表數據按照join key shuffle到每個worker節點,減少了右表構建的壓力。
4. 多表 Join
這個 case 是一個 5 表 join 的 case。
開啟復雜查詢模式后,query 執行時間從8.583s=>4.464s,所有的右表可同時開始數據讀取和構建。為了和現有模式對比,針對復雜查詢沒有開啟 runtime filter,開啟 runtime filter后效果會更快。
這里還要重點說一下,今天的分享主要是從執行模式上講解如何支持復雜查詢。實際上,優化器對于復雜查詢的性能提升也非常大。通過一些rbo的規則,比如常見的謂詞下推、相關子查詢處理等。實際上這里的優化規則非常多,可以極大的提升 SQL 的執行效率。上面的 SQL 其實原本比較簡單,5 表 join 和一些維表的過濾條件,這里寫成子查詢是為了在 ClickHouse 現有模式下右表過濾條件更好下推。其實對于我們來說,在復雜查詢的模式下,由于有優化器的存在,用戶不用寫的這么復雜,優化器會自動完成下推和rbo優化。
上面是一些規則的優化,實際上在復雜查詢中,cbo 的優化也有很大作用。舉一個例子,在 ClickHouse 中,相同的兩個表,大表 join 小表的性能比小表 join 大表要好很多。前一個效果 2 中如果把表順序調整一下會快很多;另外,選用哪一種 join 的實現對 join 性能影響比較大,如果滿足 join key 分布,colcate join 比 shuffle join 來說完全減少了數據的 shuffle。多表 join 中,join 的順序和 join 的實現方式對執行的時長影響會比 2 表 join 影響更大。借助數據的統計信息,通過一些 cbo 優化,可以得到一個比較優的執行模式。
有了優化器,業務同學可以按照業務邏輯來寫任何的 SQL,引擎自動計算出相對最優的 SQL 計劃并執行,加速查詢的執行。
5. 展望
CLickHouse 目前的模式其實在很多單表查詢的場景上表現優異。我們主要是針對復雜的查詢場景做優化,主要是實現多stage的執行模式,并實現了stage之間數據傳輸。工程實踐上來說,做了比較多的嘗試和優化來提升執行和網絡傳輸的性能,并且希望通過完善metrics和智能診斷來降低SQL分析和調優的門檻,并減少oncall 的壓力。
目前的實現只是第一步,未來我們還有很多努力的方向。
首先,肯定是繼續提升執行和 Exchange 的性能。這里不談論引擎執行通用的優化,比如更好的索引或者算子的優化,主要是跟復雜查詢模式有關。
其次是Metrics 和智能診斷加強,就如同剛才提到的,SQL 的靈活度太高了,對于一些復雜的查詢沒有 metrics 幾乎難以診斷和調優,這個我們會長期持續的去做。
今天的分享就到這里,謝謝大家。
分享嘉賓:董一峰 字節跳動
編輯整理:胡勝達 蔚來汽車
出品平臺:DataFunTalk