目錄
- What Colocate Join
- Why Colocate Join
- How Colocate Join
- 核心思路
- 術語定義
- 1 數據導入時保證本地性
- 2 Colocate Join Query Plan
- 3 Colocate Join Query Schedule
- 4 Colocate Join At Bucket Seq Level
- 5 Colocate Join Metadata Maintenance
- 6 How to decide a query can colocate join
- 7 Colocate Join Support Balance
- Colocate Join Performance
- How To Use Colocate Join
- Colocate Join 目前限制
- Colocate Join 適用場景
- Colocate Join FAQ
- 總結
What Colocate Join
我們都知道 Join 的常見連接類型分為以下幾種:
- INNER JOIN
- OUTER JOIN
- CROSS JOIN
- SEMI JOIN
- ANTI JOIN
Join 的常見算法實現包含以下幾種:
- Nested Loop Join
- Sort Merge Join
- Hash Join
分布式系統實現 Join 數據分布的常見策略有:
- Shuffle Join
- Broadcast Join
- Colocate/Local Join
Colocate/Local Join 就是指多個節點 Join 時沒有數據移動和網絡傳輸,每個節點只在本地進行 Join,能夠本地進行 Join 的前提是相同 Join Key 的數據分布在相同的節點。
Why Colocate Join
相比 Shuffle Join 和 Broadcast Join,Colocate Join 在查詢時沒有數據的網絡傳輸,性能會更高。 在 Doris 的具體實現中,Colocate Join 相比 Shuffle Join 可以擁有更高的并發粒度,也可以顯著提升 Join 的性能,這一點在后面會解釋。
How Colocate Join
核心思路
對于 colocate tables,在任何情況下都要保證數據的本地性。 具體包括:
- 數據導入時保證數據本地性
- 查詢調度時保證數據本地性
- 數據 balance 后保證數據本地性
實現中最復雜是第 3 點: 處理 colocate tables 的 balance。
術語定義
Colocate Group
我們將一組具體相同 Colocate 屬性的 Table 稱為 Group,下圖中 t1 和 t2 擁有相同的 Colocate Group。
Colocate Parent Table
我們將決定一個 Group 數據分布的 Table 稱為 Parent Table,下圖中 t1 是 Colocate Parent Table.
Colocate Child Table
我們將一個 Group 中除 Parent Table 之外的 Table 稱為 Child Table,下圖中 t2 是 Colocate Child Table.
Bucket Seq
如下圖,如果一個表有 N 個 Partition, 則每個 Partition 的第 M 個 bucket 的 Bucket Seq 是 M。
1 數據導入時保證本地性
Doris 的分區方式如下所示,先根據分區字段 Range 分區,再根據指定的 Distributed Key Hash 分桶:
所以我們在數據導入時保證本地性的核心思想就是兩次映射,對于 colocate tables,我們保證相同 Distributed Key 的數據映射到相同的 Bucket Seq,再保證相同 Bucket Seq 的 buckets 映射到相同的 BE。
具體來說,第一步:我們計算 Distributed Key 的 hash 值,并對 bucket num 取模,保證相同 Distributed Key 的數據映射到相同的 Bucket Seq。
第二步:將同一個 Colocate Group 下所有相同 Bucket Seq 的 Bucket 映射到相同的 BE,方法如下:
- Group 中所有 Table 的 Bucket Seq 和 BE 節點的映射關系和 Parent Table 一致
- Parent Table 中所有 Partition 的 Bucket Seq 和 BE 節點的映射關系和第一個 Partition 一致
- Parent Table 第一個 Partition 的 Bucket Seq 和 BE 節點的映射關系利用原生的 Round Robin 算法決定
2 Colocate Join Query Plan
對 HashJoinFragment,由于 Join 的多張表有了數據本地性保證,所以可以去掉 Exchange Node,避免網絡傳輸,將 ScanNode 直接設置為 Hash Join Node 的 Child。
3 Colocate Join Query Schedule
查詢調度的目標: 一個 Colocate join 中所有 ScanNode 中所有 Bucket Seq 相同的 Buckets 被調度到同一個 BE。
查詢調度的策略:第一個 ScanNode 的 Buckets 隨機選擇 BE,其余的 ScanNode 和第一個 ScanNode 保持一致。
4 Colocate Join At Bucket Seq Level
目前,Doris 的 Hash Join 是 Server 粒度的:
對于 colocate join,由于同一個 Colocate Group 下相同 Bucket Seq 的 Bucket 分布在相同的 BE,所以我們將 Join 的粒度從 Server 粒度降至 Bucket Seq 粒度:
5 Colocate Join Metadata Maintenance
對于 colocate join,我們需要維護以下幾個核心元數據:
- 代碼中,colocate group id 就是 colocate parent table id
- group2BackendsPerBucketSeq 代表每個 colocate group 中每個 bucket seq 映射到哪些 BE
- 為了支持 balance,以及保證元數據的一致性,這些元數據都需要持久化
6 How to decide a query can colocate join
- Join 的 tables 是 colocate able
- The colocate group 是 stable 狀態,沒有 balancing
- Join 的 Key 包含分桶的 Distributed Key
7 Colocate Join Support Balance
核心思路:
新增一個 daemon 線程專門處理 colocate table 的 balance,并讓正常的 balance 線程不處理 colocate table 的 balance。
何時 balance:
有 BE 節點新增,刪除,down 掉時。
balance 的粒度:
正常 balance 的粒度是 bucket,但是對于 colocate table,我們必須保證同一個 colocate group 下所有 bucket 的數據本地性,所以我們 balance 的單位是 colocate group。
balance 對查詢的影響:
當一個 colocate group 正在 balance 時,colocate join 會退化為原始的 shuffle join 或 broadcast join。
balance 流程:
- 為需要復制或遷移的 Bucket 選擇目標 BE
- 標記 colocate group 的轉態為 balancing
- 對于需要復制或遷移的 Bucket,發起 Clone Job,Clone Job 會從 Bucket 的現有副本復制一個新副本目標 BE
- 更新 backendsPerBucketSeq(維護 Bucket Seq 到 BE 映射關系的元數據)
- 當一個 colocate group 下的所有 Clone Job 都完成時,標記 colocate group 的轉態為 stable
- 刪除冗余的副本
當有 BE 節點刪除或長時間掛掉時,選擇目標 BE 的策略:
和正常 balance 時的選擇策略相同,考慮集群的整體負載,盡量選擇負載較低的 BE。
當有 BE 節點新增時,選擇目標 BE 的策略:
- 對于當前 colocate group,計算每個新增 BE 需要增加的 bucket seqs 個數:假如我們有 3 個 BE,8 個 bucket,每個 bucket 是 3 副本,則每個 BE 負責 8 個 bucket 副本,我們新增 1 個 BE 后,可以計算出每個 BE 負責的平均 bucket 副本數應該是 3 * 8 / 4 = 6,每個新增 BE 需要增加的 bucket seqs 個數為 6 / 1 = 6.
- 對于每個 bucket seqs, 隨機選擇從哪個舊的 BE 遷移副本到新增的 BE。
Colocate Join Performance
測試數據:
Table A,B,C 都有 10 天數據,1 天一個 partitions,每個 partition 有 570 萬數據。
測試集群:
4 臺低配物理機,每個 BE 24CPU,96MEM
測試 SQL:
SQL1:
select count(*) FROM A t1 INNER JOIN [shuffle] B t5 ON ((t1.dt = t5.dt) AND (t1.id = t5.id)) INNER JOIN [shuffle] C t6 ON ((t1.dt = t6.dt) AND (t1.id = t6.id)) where t1.dt in (xxx days);
SQL2:
select t1.dt, t1.id, t1.name, t1.second_id,t1.second_name, t5.id, t5.weight_time,t5.list, t6.ord_id, t6._id FROM A t1 INNER JOIN B t5 ON ((t1.dt = t5.dt) AND (t1.id = t5.id)) INNER JOIN C t6 ON ((t1.dt = t6.dt) AND (t1.id = t6.id)) where t1.dt in (xxx days) limit 10000;
Test Result for SQL1:
Test Result for SQL2:
可以看到,Colocate Join 相比 Shuffle Join 有明顯的性能提升,而且隨著集群規模越大,Join 的數據量越多,Colocate Join 的優勢會更明顯。
How To Use Colocate Join
社區最新代碼已經支持 Colocate Join,只不過默認是關閉的,只需要在 FE 配置中設置 disable_colocate_join 為 false,即可開啟 Colocate Join 功能。
具體使用時只需要在建表時增加 colocate_with 這個屬性即可,colocate_with 的值可以設置成同一組 colocate 表中的任意一個,不過需要保證 colocate_with 屬性中的表要先建立。
假如需要對 table t1 和 t2 進行 Colocate Join,可以按以下語句建表:
CREATE TABLE `t1` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "" ) ENGINE=OLAP DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "colocate_with" = "t1" ); CREATE TABLE `t2` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "" ) ENGINE=OLAP DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "colocate_with" = "t1" );
Colocate Join 目前限制
- Colocate Table 必須是 OLAP 類型的表
- colocate_with 屬性相同表的 BUCKET 數必須一樣
- colocate_with 屬性相同表的 副本數必須一樣 (這個限制之后可能會去掉,但對用戶應該沒有實際影響)
- colocate_with 屬性相同表的 DISTRIBUTED Columns 的數據類型必須一樣
Colocate Join 適用場景
Colocate Join 十分適合幾張表按照相同字段分桶,并高頻根據相同字段 Join 的場景,比如電商的不少應用都按照商家 Id 分桶,并高頻按照商家 Id 進行 Join。
Colocate Join FAQ
一句話總結,凡是不能進行 Colocate Join 的場景都會自動退化為原始的 Shuffle Join 或者 Broadcast Join。
Q1: 支持多張表進行 Colocate Join 嗎?
A: 支持
Q2: 支持 Colocate 表和正常表 Join 嗎?
A: 支持
Q3: Colocate 表支持用非分桶的 Key 進行 Join 嗎?
A: 支持:不符合 Colocate Join 條件的 Join 會使用 Shuffle Join 或 Broadcast Join
Q4: 如何確定 Join 是按照 Colocate Join 執行的?
A: explain 的結果中 Hash Join 的孩子節點如果直接是 OlapScanNode, 沒有 Exchange Node,就說明是 Colocate Join
Q5: 如何修改 colocate_with 屬性?
A: ALTER TABLE example_db.my_table set ("colocate_with"="target_table");
Q6: 如何禁用 colocate join?
A: set disable_colocate_join = true; 就可以禁用 Colocate Join,查詢時就會使用 Shuffle Join 或 Broadcast Join
總結
大多數支持 Join 的 OLAP 系統都會考慮支持 Colocate Join,比如 MemSQL, SnappyData, 阿里 AnalyticDB 等,阿里 AnalyticDB 更是在數據模型中就引入了 Table Group 的概念。總的來講,Colocate Join 通過在數據導入,查詢 Plan,查詢調度,數據 balance 時對數據本地性的保證和考慮,可以顯著加速特定場景的下 Join 查詢,是一個十分有用的 Feature。
以上就是Apache Doris Colocate Join 原理實踐教程的詳細內容,更多關于Apache Doris Colocate Join 原理的資料請關注其它相關文章!