目錄
- 背景 & 目標
- Doris 數據劃分
- Partition
- Bucket
- Join 方式
- 總覽
- Broadcast / Shuffle Join
- Bucket Shuffle Join
- Plan Rule
- Colocate Join
- Runtime Filter 優化
- Join Reorder 優化
- Join 調優建議
背景 & 目標
- 掌握 Apache Doris Join 優化手段及其實現原理
- 為代碼閱讀提供理論基礎
Doris 數據劃分
不同的 Join 方式非常依賴于對 Doris 中數據劃分方式的透徹理解。因此先在這里列舉出必要的基礎知識。
首先,在 Doris 中數據都以表(Table)的形式進行邏輯上的描述。
在 Doris 的存儲引擎中,用戶數據被水平劃分為若干個數據分片(Tablet,也稱作數據分桶 Bucket)。每個 Tablet 包含若干數據行。各個 Tablet 之間的數據沒有交集,并且在物理上是獨立存儲的。
一個 Tablet 只屬于一個數據分區(Partition)。而一個 Partition 包含若干個 Tablet。因為 Tablet 在物理上是獨立存儲的,所以可以視為 Partition 在物理上也是獨立的。Tablet 是數據移動、復制等操作的最小物理存儲單元。
若干個 Partition 組成一個 Table。Partition 可以視為是邏輯上最小的管理單元。數據的導入與刪除,僅能針對一個 Partition 進行。
Doris 支持兩層的數據劃分。第一層是 Partition,支持 Range 和 List 的劃分方式。第二層是 Bucket(Tablet),僅支持 Hash 的劃分方式。也可以僅使用一層分區。使用一層分區時,只支持 Bucket 劃分。
下圖說明 Table、Partition、Bucket(Tablet) 的關系:
- Table 按照 Range 的方式按照 date 字段進行分區,得到了 N 個 Partition
- 每個 Partition 通過相同的 Hash 方式將其中的數據劃分為 M 個 Bucket(Tablet)
- 從邏輯上來說,Bucket 1 可以包含 N 個 Partition 中劃分得到的數據,比如下圖中的 Tablet 11、Tablet 21、Tablet N1
特別注意:
Doris 中的 Partition 和 Bucket 定義可能和某些其它數據庫系統的定義有一些差異,下面配以一個具體的建表語句為例來說明:
CREATE TABLE IF NOT EXISTS example_db.expamle_range_tbl ( `user_id` LARGEINT NOT NULL COMMENT "用戶id", `date` DATE NOT NULL COMMENT "數據灌入日期時間", `timestamp` DATETIME NOT NULL COMMENT "數據灌入的時間戳", `city` VARCHAR(20) COMMENT "用戶所在城市", `age` SMALLINT COMMENT "用戶年齡", `sex` TINYINT COMMENT "用戶性別", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用戶最后一次訪問時間", `cost` BIGINT SUM DEFAULT "0" COMMENT "用戶總消費", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用戶最大停留時間", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用戶最小停留時間" ) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`) PARTITION BY RANGE(`date`) ( PARTITION `p201701` VALUES LESS THAN ("2017-02-01"), PARTITION `p201702` VALUES LESS THAN ("2017-03-01"), PARTITION `p201703` VALUES LESS THAN ("2017-04-01") ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 16 PROPERTIES ( "replication_num" = "3" );
綠色高亮:Partition,此例中使用一個 date 字段進行分區
藍色高亮:Bucket,此例中使用 user_id 字段為作為分布列
Partition
- Partition 列可以指定一列或多列,分區列必須為 KEY 列
- 分區數量理論上沒有上限
- 當不使用 Partition 建表時,系統會自動生成一個和表名同名的,全值范圍的 Partition。該 Partition 對用戶不可見,并且不可刪改
創建分區時不可添加范圍重疊的分區
有兩種分區方式:
分區方式 | 一般用法 |
---|---|
Range | 通常按時間分區,以方便地管理新舊數據 |
List | 支持的類型更豐富,分區值為枚舉值。只有當數據為目標分區枚舉值其中之一時,才可以命中分區 |
Bucket
- 如果使用了 Partition,則 DISTRIBUTED 語句描述的是數據在各個分區內的劃分規則。如果不使用 Partition,則描述的是對整個表的數據劃分規則
- 分桶列的選擇,是在 查詢吞吐 和 查詢并發 之間的一種權衡:
- 如果選擇多個分桶列,則數據分布更均勻。如果一個查詢條件不包含所有分桶列的等值條件(意味著無法做桶裁剪以減少數據查詢范圍),那么該查詢會觸發所有分桶同時掃描,這樣查詢的吞吐會增加,單個查詢的延遲隨之降低。這個方式適合大吞吐低并發的查詢場景
- 如果僅選擇一個或少數分桶列,則對應的點查詢可以僅觸發一個分桶掃描(意味著可以做桶裁剪以減少數據查詢范圍)。此時,當多個點查詢并發時,這些查詢有較大的概率分別觸發不同的分桶掃描,各個查詢之間的 IO 影響較小,尤其當不同桶分布在不同磁盤上時),所以這種方式適合高并發的點查詢場景
- 分桶的數量理論上沒有上限
Join 方式
總覽
作為分布式的 MPP 數據庫, 在 Join 的過程中是需要進行數據的 Shuffle。數據需要進行拆分調度,才能保證最終的 Join 結果是正確的。舉個簡單的例子,假設關系 S 和 R 進行Join,N 表示參與 Join 計算的節點的數量;T 則表示關系的 Tuple 數目。
目前 Doris 支持的 Join 方式有以上 4 種,這 4 種方式靈活度和適用性是從高到低的,對數據分布的要求越來越嚴,但 Join 計算的性能則通過降低網絡開銷而越來越好。
Join 方式的選擇是 FE 生成分布式計劃階段會考慮的事項之一。在 FE 進行分布式計劃時,優先選擇的順序為(總是會優先選擇預期性能最好的):Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。
Colocate 以及 Bucket Shuffle 是可遇不可求的。當無法使用它們時,Doris會自動嘗試進行 Broadcast Join,如果預估小表過大則會自動切換至 Shuffle Join。
但是用戶可以通過顯式 Hint 來強制使用期望的 Join 類型,比如:
select * from test join [shuffle] baseall on test.k1 = baseall.k1;
Broadcast / Shuffle Join
原理比較簡單,這里不展開。
Bucket Shuffle Join
當 Join 條件命中了左表的數據分布列時,Broadcast 以及 Shuffle Join 會有非必要的網絡傳輸開銷。而 Bucket Shuffle Join 旨在解決這類問題,通過對左表實現本地性計算優化,來減少左表數據在節點間的傳輸耗時,從而加速查詢。
以上的例子中,Join 的等值表達式命中了表 A(左表)的數據分布列。Bucket Shuffle Join 會根據表 A 的數據分布信息,將表 B(右表)的數據發送到對應表 A 的數據計算節點。
定性分析上:
- 降低了網絡與內存開銷(相比 Broadcast 以及 Shuffle Join 都不會更差),使一類 Join 查詢有更好的性能。尤其是當 FE 能夠執行左表的分區裁剪與桶裁剪時
- 與 Colocate Join 不同,它對于表的數據分布方式沒有侵入性,對于用戶來說是透明的。對于表的數據分布沒有強制性的要求(體現在建表語句中不需要顯式地設置 colocate_with 屬性),不容易導致數據傾斜的問題
- 可以為 Join Reorder 提供更多可能的優化空間
Plan Rule
- Bucket Shuffle Join 只生效于 Join 條件為等值的場景,原因與 Colocate Join 類似,它們都依賴 Hash 來計算確定的數據分布
- 在等值 Join 條件之中包含兩張表的分桶列,當左表的分桶列為等值的 Join 條件時,它有很大概率會被規劃為 Bucket Shuffle Join
- 由于不同的數據類型的 Hash 值計算結果不同,所以 Bucket Shuffle Join 要求左表的分桶列的類型與右表等值 Join 列的類型需要保持一致,否則無法進行對應的規劃
- Bucket Shuffle Join 只作用于 Doris 原生的 OLAP 表,對于 ODBC,MySQL,ES 等外表,當其作為左表時是無法規劃生效的
- 對于分區表,由于每一個分區的數據分布規則可能不同,所以 Bucket Shuffle Join 只能保證左表為單分區時生效。所以在 SQL 執行之中,需要盡量使用 where 條件使分區裁剪的策略能夠生效
- 假如左表為 Colocate 的表,那么它每個分區的數據分布規則是確定的,Bucket Shuffle Join 能在Colocate 表上表現更好
Colocate Join
可以理解為在數據分布滿足一定條件的前提下,減少一切不必要的網絡傳輸開銷,實現完全的計算本地化來加速查詢。同時因為沒有網絡傳輸開銷,BE 節點可以擁有更高的并發度,從而進一步提升 Join 性能。
要理解這個算法,需要先了解兩個術語:
- Colocation Group(CG):一個 CG 中會包含一張及以上的 Table。在同一個 Group 內的 Table 有著相同的 Colocation Group Schema,并且有著相同的數據分片分布
- Colocation Group Schema(CGS):用于描述一個 CG 中的 Table,和 Colocation 相關的通用 Schema 信息。包括分桶列類型,分桶數以及副本數等
和 Buckets Sequence 這一概念:
一個表的數據,最終會根據分桶列值 Hash、對桶數取模后落在某一個分桶內。假設一個 Table 的分桶數為 8,則共有 [0, 1, 2, 3, 4, 5, 6, 7] 8 個分桶(Bucket),我們稱這樣一個序列為一個 BucketsSequence。每個 Bucket 內會有一個或多個數據分片(Tablet)。當表為單分區表時,一個 Bucket 內僅有一個 Tablet。如果是多分區表,則會有多個(因為多個 Partition 中的不同 Tablet 會被劃分到相同的 Bucket)。
Colocation Join 功能,是將一組擁有相同 CGS 的 Table 組成一個 CG。并保證這些 Table 對應的數據分片會落在同一個 BE 節點上。使得當 CG 內的表進行分桶列上的 Join 操作時,可以通過直接進行本地數據 Join,減少數據在節點間的傳輸耗時。
因此關鍵問題就轉變為了「如何保證這些 Table 對應的數據分片會落在同一個 BE 節點上?」
通過同一 CG 內的 Table 必須保證以下屬性相同實現:
- 分桶列和分桶數
分桶列,即在建表語句中 DISTRIBUTED BY HASH(col1, col2, …) 中指定的列。分桶列決定了一張表的數據通過哪些列的值進行 Hash 劃分到不同的 Tablet 中。同一 CG 內的 Table 必須保證分桶列的類型和數量完全一致,并且桶數一致,才能保證多張表的數據分片能夠一一對應的進行分布控制。
- 副本數
同一個 CG 內所有表的所有分區(Partition)的副本數必須一致。如果不一致,可能出現某一個 Tablet 的某一個副本,在同一個 BE 上沒有其他的表分片的副本對應。不過,同一個 CG 內的表,分區的個數、范圍以及分區列的類型不要求一致。
在固定了分桶列和分桶數后,同一個 CG 內的表會擁有相同的 BucketsSequence。而副本數決定了每個分桶內的 Tablet 的多個副本,存放在哪些 BE 上。假設 BucketsSequence 為 [0, 1, 2, 3, 4, 5, 6, 7],BE 節點有 [A, B, C, D] 4個。則一個可能的數據分布如下:
+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ | 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 | +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+ | A | | B | | C | | D | | A | | B | | C | | D | | | | | | | | | | | | | | | | | | B | | C | | D | | A | | B | | C | | D | | A | | | | | | | | | | | | | | | | | | C | | D | | A | | B | | C | | D | | A | | B | +---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
CG 內所有表的數據都會按照上面的規則進行統一分布,這樣就保證了,分桶列值相同的數據都在同一個 BE 節點上,可以進行本地數據 Join。其核心思想是「兩次映射」,保證相同的 Distributed Key 的數據會被映射到相同的 Bucket Sequence,再保證 Bucket Sequence 對應的 Bucket 映射到相同的 BE 節點:
通過查詢計劃可以檢查一個查詢是否使用了 Colocate Join,同時計劃中的 Exchange Node 也被去掉了,會將 ScanNode 直接設置為 Hash Join Node 的孩子節點。
DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2); -- 在 Hash Join 節點會顯示: -- colocate: true/false
Colocate Join 十分適合幾張表按照相同字段分桶,并高頻根據固定的字段 Join 的場景。這樣可以將數據預先存儲到相同的分桶中,實現本地計算。
Runtime Filter 優化
Doris 在進行 Hash Join 計算時會在右表構建一個 Hash Table,左表流式地通過右表的 Hash Table 從而得出 Join 結果。而 Runtime Filter 就是充分利用了右表的 Hash Table 構建階段去做一些額外的事情。
在右表生成 Hash Table 的時,同時生成一個基于 Hash Table 數據的一個過濾條件,然后下推到左表的數據掃描節點。通過這樣的方式,Doris 可以在運行時進行數據過濾。
假如左表是一張大表,右表是一張小表,那么利用下推到左表的過濾條件就可以把絕大多數 Join 層要過濾的數據在數據讀取時就提前過濾(如果能夠下推到引擎層,還能夠利用 Doris 針對 Key 列過濾的延遲物化),從而大幅度地提升 Join 查詢的性能。
Runtime Filter 在查詢規劃時生成,在 HashJoinNode 中構建,在 ScanNode 中應用。比如 T1(行數 10w) 和 T2(行數 2k) 的 Join 操作:
| > HashJoinNode < | | | | | 100000 | 2000 | | | | OlapScanNode OlapScanNode | ^ ^ | | 100000 | 2000 | T1 T2 |
顯而易見對 T2 掃描數據要遠遠快于 T1,如果我們主動等待一段時間再掃描 T1,等 T2 將掃描的數據記錄交給 HashJoinNode 后,HashJoinNode 根據 T2 的數據計算出一個過濾條件,比如 T2 數據的最大和最小值,或者構建一個 Bloom Filter,接著將這個過濾條件發給等待掃描 T1 的 ScanNode,后者應用這個過濾條件,將過濾后的數據交給 HashJoinNode,從而減少 probe hash table 的次數和網絡開銷,這個過濾條件就是 Runtime Filter,效果如下:
| > HashJoinNode < | | | | | 6000 | 2000 | | | | OlapScanNode OlapScanNode | ^ ^ | | 100000 | 2000 | T1 T2 |
如果能將過濾條件(Runtime Filter)下推到存儲引擎,則某些情況可以利用索引(比如 Join 列為 Key 列,可以利用延遲物化能力)來直接減少掃描的數據量,從而大大減少掃描耗時,效果如下:
| > HashJoinNode < | | | | | 6000 | 2000 | | | | OlapScanNode OlapScanNode | ^ ^ | | 6000 | 2000 | T1 T2 |
可見,和謂詞下推、分區裁剪不同,Runtime Filter 是在運行時動態生成的過濾條件,即在查詢運行時解析 Join 條件確定過濾表達式,并將表達式下推給正在讀取左表的 ScanNode,從而減少掃描的數據量,進而減少 probe hash table 的次數,避免不必要的 IO 和網絡傳輸。因為其運行時生效的特性,官方認為它是 Adaptive Query Execution 的一種應用。
根據上面的例子,可以推導出場景滿足以下的條件時,使用 Runtime Filter 的效果會比較好:
- 左表大右表小(當右表上還有額外的過濾條件會更理想),因為構建 Runtime Filter 是需要承擔計算成本的,包括一些內存的開銷,而開銷直接取決于右表的實際數據量
- 左右表 Join 出來的結果很少,說明通過 Runtime Filter 可以過濾掉左表的絕大部分數據
Doris 支持 3 種 Runtime Filter:
- 一種是 IN,很好理解,將一個 hashset 下推到數據掃描節點。
- 第二種就是 BloomFilter,就是利用哈希表的數據構造一個 BloomFilter,然后把這個 BloomFilter 下推到查詢數據的掃描節點。
- 最后一種就是 MinMax,就是個 Range 范圍,通過右表數據確定 Range 范圍之后,下推給數據掃描節點。
工作原理和優劣總結如下:
Runtime Filter 類型 | 工作原理 | 適用場景 | 優點 | 缺點 |
---|---|---|---|---|
IN | 子查詢的方式,實現上是將一個 Hashset 下推到 Scan 節點 | Broadcast Join | 開銷小,過濾效果明顯且快速 | 右表超過一定數據量時會失效,目前 Doris 配置的閾值是 1024 |
Min/Max | 通過右表構建一個 Range 范圍,然后將它下推到 Scan 節點 | 通用 | 開銷小 | 僅對數值類型有效果;對數值以外類型無法使用 |
BloomFilter | 通過右表構建一個 BloomFilter,然后將它下推到 Scan 節點 | 通用 | 通用性較好,適用于各種類型、效果也較好 | 配置比較復雜且計算成本較高;當過濾率較低或者左表數據較少時,可能導致性能降低 |
一些使用的注意事項(比較細節了,后面考慮結合代碼再深入理解):
開啟 Runtime Filter 后,左表的 ScanNode 會為每一個分配給自己的 Runtime Filter 等待一段時間再掃描數據,即如果 ScanNode 被分配了 3 個 Runtime Filter,那么它最多會等待 3000ms。
因為 Runtime Filter 的構建和合并均需要時間,ScanNode 會嘗試將等待時間內到達的 Runtime Filter 下推到存儲引擎,如果超過等待時間后,ScanNode 會使用已經到達的 Runtime Filter 直接開始掃描數據。
如果 Runtime Filter 在 ScanNode 開始掃描之后到達,則 ScanNode 不會將該 Runtime Filter 下推到存儲引擎,而是對已經從存儲引擎掃描上來的數據,在 ScanNode 上基于該 Runtime Filter 使用表達式過濾,之前已經掃描的數據則不會應用該 Runtime Filter,這樣得到的中間數據規模會大于最優解,但可以避免嚴重的劣化。
如果集群比較繁忙,并且集群上有許多資源密集型或長耗時的查詢,可以考慮增加等待時間,以避免復雜查詢錯過優化機會。如果集群負載較輕,并且集群上有許多只需要幾秒的小查詢,可以考慮減少等待時間,以避免每個查詢增加 1s 的延遲。
Join Reorder 優化
有了前面兩表 Join 的 Runtime Filter 鋪墊,再來看 Join Reorder 的優化,邏輯關系上就能夠理順了。
Doris 目前的 Join Reorder 算法是基于 RBO 的,邏輯描述如下:
- 盡量讓大表跟小表做 Join,它生成的中間結果是盡可能小的
- 把有條件的 Join 表往前放,也就是說盡量讓有條件的 Join 表進行過濾
- Hash Join 的優先級高于 Nest Loop Join,因為 Hash join 本身是比 Nest Loop Join 快很多的
可以發現前兩條,都是在朝著讓「右表」更小的方向去優化,而最后一條則是從算法的性能上來考慮。
Join 調優建議
- Join 列最好是相同的簡單類型;同類型避免 Cast 操作,簡單類型則有不錯的 Join 計算性能
- Join 列最好是 Key 列,原因是 Key 列能夠充分利用 Doris 延遲物化的特性,減少 IO 提升性能
- 大表之間的 Join 最好能夠利用上 Colocate,相當于已經做好了預 Shuffle,實際查詢的時候可以直接 Join 計算不再有 Shuffle 操作,徹底避免了大表的 Shuffle 網絡開銷
- 利用 Runtime Filter,Join 過濾性高時效果顯著。根據 3 種 Runtime Filter 特點選擇最適合的
- 涉及多表 Join,需要判斷 Join 的合理性。盡量保證“左大右小”的原則,HashJoin 優于 NLJ。必要時可以通過 SQL Rewrite,通過 Hint 來調整 Join 順序
REF
https://www.jb51.net/article/266004.htm
https://www.jb51.net/article/266000.htm
以上就是Apache Doris Join 優化原理詳解的詳細內容,更多關于Apache Doris Join 優化的資料請關注其它相關文章!