隨著數據湖的發展和日漸增長的需求,對數據湖進行統一元數據和存儲管理也顯得日趨重要。本文將分享阿里云在數據湖統一元數據與存儲管理方面的實踐。
一、云上數據湖架構
首先介紹一下數據湖相關的概念和架構。
不同的云產商對數據湖有著不同的定義。但是從關鍵詞上來看,基本上都是圍繞這幾個特性和目標:
(1)統一存儲,即數據湖是一個統一的中心化的數據存儲。
(2)可以用來放一些原始數據。
(3)支持多種格式,包括結構化的數據和非結構化的數據。
首先,統一存儲主要是為了解決數據孤島的問題。因為傳統的數據庫或者是數據倉庫在設計上是存算一體的,也就是在不同的查詢引擎之間,數據需要經過清洗和同步。這樣不管是在存儲空間上,還是效率上,都存在一定的浪費。而數據湖上則是使用存算分離的查詢引擎,典型的比如 Hadoop 生態的 Hive 和 Spark。再加上開放的存儲格式,如 Parquet、ORC 等,來實現使用不同的引擎同時可以查詢同一個數據的功能。這就是早期數據湖的架構。
在存儲實現上,數據湖通常會使用擴展性比較高的,廉價的存儲,比如 HDFS,或者云上的 OSS、S3 等對象存儲。這樣大家可以把更多的原始數據,非結構化數據直接放入,避免原始數據的丟失。
為了能夠讀取這些原始數據,計算引擎通常支持類似 schema on read 的方式,采取事后建模的高靈活性的解析方式,對數據格式沒有很強的約束。這種靈活性也帶來了一些弊端,比如其高度開放性可能導致對安全和權限的管理相比于數倉是有所差距的。另外,因為開放存儲,并發寫入的場景尤其是流式寫入的場景,事務上對 ACID 的要求會更高。
是否有一種辦法使我們既能夠利用數據湖的優勢,也能讓數據湖擁有數倉的功能特性呢?
前兩年 Databrick 提出了 Lakehouse 湖倉一體的概念,希望讓數據湖能夠實現更多數倉的企業級能力,讓用戶可以像使用數倉一樣地使用數據湖。
Lakehouse 概念是在數據庫的基礎之上,添加了幾層內容。
首先在存儲上層,做了元數據的統一。對上層提供統一的元數據結構化 SQL 的接口,讓不同的應用,可以使用相同的元數據訪問數據。
另外在性能上,支持 cache,來優化數據湖讀取性能。
并且,利用數據湖格式實現事務層。目前很火熱的數據湖格式,Delta Lake、Hudi 和 Iceberg,使得我們現在提到數據湖場景,基本就跟這幾個數據湖格式劃等號了。雖然這有些夸張,但也足以證明它們在數據湖架構中的重要地位。
最后在底層的數據湖存儲的實現上,相比于 HDFS,目前在云上也有使用對象存儲作為數據湖存儲的趨勢。因為云上對象存儲的擴展性相比于自建 HDFS 要高很多。不管是在成本上,還是在可用性上其實都是會高一些。
阿里云提供了一些產品功能來幫助用戶使用數據湖的架構。
首先支持多引擎計算分析,比如常見的 EMR 的 Spark 和 Hive、Presto、StarRocks 這些引擎,以及阿里云自研的引擎,如 MaxCompute、Hologres,都可以進行湖上數據分析。可以根據不同場景來選擇合適的引擎。
另一方面這些引擎為了能夠無縫對接湖上的結構化數據,DLF(data lake formation)產品提供了統一的元數據和湖上的權限管理,作為整個 lakehouse 架構里的元數據管理層。在后面還會展開介紹。
最后在存儲層上,云上的對象存儲 OSS 是天生適合做數據湖存儲的,并且成本不高。同時現在 OSS 也支持兼容 HDFS 接口的產品,OSS-HDFS,是完全支持 HDFS 接口的,更適合對接一些老版本的大數據引擎。
DLF 的核心能力是提供一個全托管的統一元數據服務,因為數據都已經放在數據湖上了,元數據需要一個中心化的管理才能實現多個引擎的無縫對接,這體現了元數據服務在數據湖里的重要性。這樣不同引擎讀寫同一份數據是圍繞統一的 schema 做操作的,而不是每個引擎都需要單獨建外表。
同時圍繞元數據,我們提供對數據的細粒度的權限管控。
另外也提供了數據湖上的一些存儲管理的功能。
二、數據湖統一元數據
下面就來具體介紹阿里云數據湖的一個重要能力,數據湖上的統一元數據。
在開源大數據體系里,從早期的 map-reduce 到類似 SQL 查詢語言 Hive 的誕生之后,Hive 逐漸成為了開源數倉的事實標準,圍繞著 Hive 的元數據 Hive Metastore 也成為了對接開源數倉的元數據標準。從此以后各個引擎,包括 Spark、Presto 等都是支持對接 Hive Metastore,圍繞 Hive Metastore 做元數據管理。
Hive Metastore 是一個常駐的無狀態的服務,它可以部署一個或者多個實例。大數據引擎通過 thrift 協議連接 Hive Metastore 進行元數據的讀寫。
Hive Metastore 的元數據本身是需要存儲到數據庫上,通常會用 MySQL 作為 Hive Metastore 元數據的底層存儲。
這就形成了常見的開源大數據元數據體系。
使用 Hive Metastore 管理元數據也存在著一些問題和挑戰。
首先在功能層面上它是沒有做多版本的,不能追溯之前的元數據版本。ACID 的特性和 LOCK 接口是和 Hive 引擎綁定的,在湖上多引擎的場景下,是沒有辦法利用到它的一些功能的。
另外因為它暴露的是 thrift 協議的接口,如果你自有服務,或者自研引擎需要去對接會相對麻煩一些。有時可能還需要直接連 MySQL 去讀一些元數據,這也不是一個比較好的方法。
還有一個問題是它存在性能瓶頸,存在單點問題和運維成本,尤其是對元數據量比較大的用戶,這是一個比較常見的問題。因為單點的 Hive Metastore Server 和 Metastore 后端連接的 MySQL 都可能會成為瓶頸,需要一些性能調優的工作。
上圖中還列出了一些真實的客戶問題。在 Hive Metastore 的使用過程中,首先會遇到的就是 JDBC 連接的問題,可能會遇到一些錯誤。比如有的時候我們查詢元數據的所有請求都突然變慢了,這時首先要檢查 MySQL 的狀態,查看 MySQL 監控是否有慢 SQL。如果分區數總量很大的話,MySQL 表數量可能會達到上千萬,會導致查詢比較慢。這個時候,需要做一些數據清理,刪除一些分區來緩解這個問題。另外在自建的數據管理系統或者外部系統中,通常不會用 thrift 協議去調用 Hive 的 Metastore Server,而是直連 JDBC,這樣連接數多的話,也可能會帶來一些額外的壓力。
在內存方面,Hive Metastore Server 的內存存在 OOM 的風險。因為有些操作,比如 list partition,會加載全部分區對象,如果有人寫了一個糟糕的查詢,比如在一個很大的分區表上,沒有加分區查詢條件,就可能會拿到上百萬的分區,最后導致整個 Hive Metastore 內存出現 full gc 或者 OOM 的情況,一旦 Hive Metastore 出問題,整個集群的作業都會受到影響。
列舉幾個我們遇到過的 StackoverflowError 的情況。如果 drop partition 的分區數量很多的話,在 Hive Metastore 的內部實現是遞歸的,可能會導致堆棧溢出報錯,無法直接執行。
最后就是超時問題,因為 HMS 的客戶端設計沒有分頁,是全量返回的。所以在拉取元數據的時候,可能會出現超時的情況,這也是一個風險點。
這些都是我們在使用 HMS 時候遇到的一些問題。
因此在云上,我們提供了全托管的元數據服務的 DLF(data lake formation),采用的是完全不同的架構,來解決上面大部分問題和痛點。
首先作為云產品,我們通過標準的 open API 暴露接口,提供了兼容 Hive2 和 Hive3 的 Metastore 接口的 client。這個 client 可以直接替換掉引擎的 Hive Metastore client 實現類,原本訪問 Hive 元數據的地方可以直接替換為訪問我們客戶端的實現類,實現了無縫對接。
另外除了開源體系的引擎以外,我們也對接了阿里云上的其它大數據引擎,包括 Max Compute、Hologres、Flink 等等。云上其他大數據引擎也可以利用我們的統一元數據來進行元數據管理。這樣真正做到了統一 catalog,用一個引擎寫入,其它引擎讀取。比如用 Flink 入湖,之后可以直接使用 Spark 查,再用 Hologres 等做 OLAP 分析,這些都可以直接采用同一個元數據來完成。
不同于 HMS 使用 MySQL,擴展性比較差,我們的元數據服務底層實現是用阿里云的表格存儲。表格存儲也是阿里云提供的一種服務,面向海量數據有非常強的伸縮能力,擴展性很高,所以不用擔心分區數過大帶來的擴展性問題。
因為我們是一個全托管的服務,對使用方提供 SLA,高可用保障,前面提到的運維問題也可以避免。
總結一下,我們的統一元數據的優勢為,一方面因為是全托管,可以減少元數據運維成本;另一方面真正實現了對接云上多引擎。
再補充一些關于元數據本身實現的細節。
首先元數據的客戶端是兼容 Hive Metastore 行為的,實現了 Hive Metastore 的接口,可以直接去對接 Hive 生態相關的大數據引擎。Hive Metastore 內部的有些行為,比如在創建 partition 的時候統計 table size 等動作,都會保留在客戶端里,所以不用擔心兼容性問題。
另外客戶端會做一些性能優化,包括異常重試、并發讀取、分頁查詢等。對于重復提交的請求,客戶端也會做一些合并壓縮,減少 IO 開銷。
在服務內部,除了剛才提到的存儲層的高擴展性以外,我們也通過一些自動的分區索引,再做一些分區過濾的性能提升。
總體來講在元數據的性能上,在一些小表上可能跟 RDS 有些差距,但是并不明顯。在大分區表上,比如單表有 300 萬分區的場景下,我們的查詢性能會有比較明顯的優勢。比如在 300 萬分區表下,如果分區條件全部命中,list partition by filter 在我們的元數據可以在 0.5 秒內返回,但是在 RDS 上因為它的分區值沒有索引,需要花 5 秒左右才能返回。
在元數據的功能上再舉幾個例子。
首先是元數據多版本,我們會記住元數據每一次更新的前后狀態,可以看到什么時間點加了什么字段,是誰做的修改的。有比較好的回溯機制,實現元數據審計。在元數據檢索上,我們的元數據本身會把內容同步到 ES 搜索引擎里,對外暴露,可以通過字段搜表,也可以做全局搜索。
再來看一下權限相關的問題。
在開源大數據場景下做用戶級別的權限控制,通常會有這么幾種方案:
Hive 本身提供的認證能力,storage-based authorization和sql-standard-based authorization。但是 Hive 的實現都是跟 Hive 引擎綁定的。通常在其它引擎是無法使用到它的功能的,基本上也沒有人真正會在其它引擎上去使用。
大家通常做法是用 Ranger 來做權限管理。Ranger 是一個通用的多引擎方案,它可以對 SQL 進行權限管理,也可以對文件系統做權限管理。它的原理是從 LDAP 同步用戶信息,提供 UI 供用戶配置權限。在大數據引擎這一側,可以添加各種插件,通過插件來實現權限的攔截和檢查。Ranger 是目前一個可行的方案,但是在公有云上面對我們自研的大數據引擎,是沒法直接對接的。另一方面雖然它包括了如 SparkSQL 等類插件,但是官方的支持并不好,更多還是需要自研一些插件,或者找第三方插件,整體部署起來并沒有那么簡單。
因此在權限這一塊,DLF 統一元數據也提供了鑒權的能力。
權限控制默認是沒有開啟的,因為不一定所有用戶都需要,但是用戶可以按 catalog 級別進行開關。catalog 是基于 database 之上的一層管理模型,如果基于 catalog 設置權限之后,管理員就可以在控制臺進行具體的授權操作。包括 database、table、column、function 這些粒度都可以進行授權。也可以設置不同 action 的權限,比如只給某個人對某個 table 設置 select 權限,而不設置 insert 的權限。同時也支持 RBAC,可以把權限包在 role 里,統一賦權給一組用戶,這些基本的能力都是具備的。
在鑒權環節的實現上,我們提供了兩個層面的鑒權,第一層面是元數據的 API,我想要查看 table 或者 create table,這種動作會在服務端上鑒權。因為我們的云服務會直接去鑒權,判斷發送請求的用戶角色是否有相應動作的權限,如果沒有就會進行攔截。另外因為有些 SQL 操作在元數據層面感知不到,比如在元數據上可能就是查一張表,但是并不知道是在往里寫數據還是在讀數據,這個時候和 Ranger 類似,我們也提供了引擎的插件,可以放在 Spark、Hive 上做一層攔截。和 Ranger 類似,會在內部檢查代理用戶到底有沒有 select 權限,沒有的話去做攔截。這兩層的鑒權模型,適用于不同的場景。
再介紹一個額外功能,就是元數據遷移。元數據本身無論在云上,還是自建的 MySQL 的元數據,如果想要遷移,都需要一個遷移的過程。為了簡化這個過程,我們在產品上做了元數據遷移的功能,在控制臺上就可以做數據遷移。
簡單來講我們會去連遠端的 MySQL 數據庫,如果這個數據庫在阿里云 VPC 內,會自動打通網絡,通過 JDBC 直接拉取元數據,轉換成我們云上的 DLF 元數據,這是直接產品化的。除了導入需求,可能還會有導出需求,包括兩邊元數據對比的需求。這些也提供了現成的工具可以直接使用。在元數據遷移方面,不管是導入導出還是其它方面的需求,我們都保持開放性,不需要擔心元數據被綁定的問題。
除了元數據遷移,可能在有些場景下還需要做元數據抽取,快速構建出湖上的元數據。元數據抽取適合于這樣的場景,比如數據湖上已經有一些數據文件了,可能是從其它數倉拷貝過來的,或者是一些零散的 CSV 數據集文件等等。這個時候因為我們沒有對應表的元數據,就需要用 DDL 語句自己去建表,再做查詢,比較麻煩的,也容易出錯。尤其是對于像 JSON 這種半結構化的嵌套類型,更難去寫建表語句。這種情況下使用我們這個元數據抽取功能就比較方便,可以直接把元數據給推斷出來。用戶只需要填寫 OSS 路徑,我們會根據路徑格式自動掃描下面的表,包括分區值,創建好之后,就會寫入到元數據里進行直接查詢了。包括 CSV、JSON、Parquet 、ORC 等各種格式,也包括湖格式都是可以識別出來的。值得注意的是因為我們做格式推斷需要掃描所有數據,會比較耗時,于是我們采用了快速采樣的方式。
三、數據湖存儲管理與優化
接下來介紹我們在數據湖存儲分流方面做的一些管理和優化。
首先介紹一下元倉,元倉是我們在元數據存儲之外做的一個在線的元數據的數據倉庫。因為元數據存儲本身是在線服務,需要比較高的讀寫事務保障,有些后臺分析,包括一些聚合查詢是不適合在這里做的。于是我們做了一個實時的元數據倉庫。元倉底層是基于 Max Compute 和 Hologres 實現的,它會收集元數據的變更信息,也會收集計算引擎的查詢和寫入的信息,包括存儲上的信息都會實時收集到。這樣我們就形成了圍繞 database 的 table partition 做的指標庫,即 data profile 指標。我們會把這些指標通過標準的 API 暴露出來。一方面可以在控制臺上可以做統計分析,包括對接我們的一些云產品,如 dataworks 之類,做一些數據展示和預估。另一方面這些指標可以用來做存儲生命周期的優化和管理。
接下來舉例介紹一下 Data Profile 指標的幾個實現。
首先是表和分區的大小,這是一個比較基礎的屬性。通常來講,表和分區大小是寫在元數據層,即 Hive 元數據的 table property 里面,本身就定義了,計算引擎會在創建表或者分區的時候寫入。但是不同引擎寫入的標準會不一樣,比如 Hive 是叫 totalSize,Spark 是以 Spark 開頭的屬性值。另外,這些寫入也需要一些參數去開啟,不開啟是不會進行寫入的。所以在實際情況中會發現元數據本身存儲的表大小是不準確的。
在元倉里,因為我們默認大部分數據湖使用的是 OSS,我們會通過 OSS 的底層存儲來獲取表分區的大小,這樣可以最大限度保證數據的準確性。因為 OSS 提供了一個 t+1 更新的存儲清單,這一點類似于 LAMBDA 架構,會 t+1 更新存儲清單的表和分區的存儲大小。另外對于實時表和分區的變更,我們也會監聽到,再實時的從 OSS 那邊拿到最新的大小去做更新。也就是通過存量加增量的流程去獲取表分區的大小,拿到大小之后,會每天產出一些分析報表,比如表的存儲排名,文件大小占比等等。因此我們可以看到哪些表,哪些分區的存儲占用比較大,去做相應的優化。
上面是一個比較完整的湖上管理視圖。
另外再介紹兩個關鍵指標。
第一個指標是表和分區的訪問頻次,通過訪問頻次可以鑒別那些仍然在用但訪問不頻繁的表。這些表可以在 OSS 底層置為低頻存儲,照常讀取的同時可以節省一些成本。在原理上我們通過使用引擎的 Hook 來實現對訪問頻次的獲取,我們解析 SQL 的 plan,拿到它讀取的表和分區,再提交到元數據服務里去做記錄,最后把訪問頻次指標統計出來。
第二個指標是表和分區的最后訪問時間。它可以用來識別這個表和分區是否還有人在訪問。為了保證指標的準確性,最后訪問時間是通過 OSS 底層的訪問日志獲取的。這樣不管通過任何引擎任何途徑讀這里面的數據,訪問時間都會獲取到。最后對于沒有人使用的表和分區,就可以考慮做歸檔或者刪除。
結合這幾個指標,更有利于我們做庫表分區的生命周期管理。因為湖上生命周期管理也是一大重點,因為數倉是有存儲分層的概念,但在數據湖上是沒有一個比較完整的管理能力。我們目前就在做這方面相關的事情。
首先我們使用的標準型 OSS 對象存儲是提供了存儲分層能力的,也可以按需設置成低頻歸檔,冷歸檔這些層次。設置好歸檔之后,會對數據訪問方式產生影響,但是存儲成本會大幅降低。
用戶首先可以設置一些規則,包括基于分區值,分區的創建時間,上面提到的訪問頻次等指標,配置規則設定閾值,比如多長時間沒人訪問或者 30 天內訪問頻次低于幾次。后臺就會定期把符合這些條件的分區的整個目錄做歸檔,或置為低頻等。
另外歸檔和冷歸檔做了之后是不能直接訪問的,需要一個解凍的流程。如果用戶有一天需要訪問已經歸檔的數據,可以一鍵解凍,整個目錄就可以直接使用,而不需要像 OSS 那樣逐個文件進行解凍操作。這種存儲生命周期管理的存儲優化,對于存儲量比較高的數據湖用戶來說會是一個比較好的實踐。
四、數據湖格式管理與優化
最后介紹一下在數據湖格式層面,我們做的管理和優化。
常見湖格式 Hudi、Iceberg 有幾個特點,為了實現 ACID,它們的底層數據文件更新,copy on write 之后,讀取的都是新版本的數據文件,但是老版本的數據還會保留在存儲側。時間一長就需要清理歷史版本的數據文件。另一方面頻繁流式寫入會產生很多小文件,通常可以使用命令手動清理,或者結合在 streamming 任務當中,配置一些參數,比如多少 commit 清理一次,但是這對流式寫入本身的性能會產生影響。針對這種情況,業內很多公司都使用額外部署 table service 的方式,不影響流式寫入,另起一個批作業去清理和優化。DLF 把這種 table service 做在了云服務里面,這樣使用 DLF 湖格式的用戶,可以直接在控制臺上配置規則,比如基于版本號更新多少次就做一次清理。后臺就會跑任務做 vacuum 或者 optimize 命令,整個過程也是全托管的,用戶不用關心背后使用的資源。
具體實現原理為,元倉會維護很多元數據的變化和引擎消息,也會感知到哪些湖格式表發生了寫入和變化。每一次表的寫入,就會觸發規則引擎去做一次判斷是否滿足條件,如果滿足條件就會觸發動作的執行。目前我們對 Delta Lake 已經有比較完整的支持了,對 Hudi 的支持也在進行當中。這是一個比較新的模塊。
再具體介紹下湖格式管理的幾種優化策略。
第一種也是最常見的,基于版本間隔,清理清理歷史文件或者合并小文件。比如寫入了 20 個 commit 之后就會自動觸發整個表的清理,或者小文件合并。這個閾值是可以隨用戶級別或者作業級別做配置的。內部會把這些合并的任務放在一個隊列里,這樣前一個合并任務還沒有跑完,是不會跑下一個合并任務的,避免并發執行,產生寫沖突現象。
第二種合并規則是我們在客戶實踐過程當中覺得比較實用的,基于時間分區自動合并上一個分區的小文件。因為在流式寫入的場景下,通常會按時間順序去命名分區值,每寫入一個新分區就代表上一個分區寫入停止。在這個時候,一旦發現有新分區創建,就可以去對上一個分區做一些優化和合并的動作。這樣上一個分區后續的查詢性能就能得到保證,同時這種做法也能最大程度避免合并任務和寫入流任務的寫沖突。當然為了實現這個方案,我們內部也是做了時間格式的支持,自動處理了很多分區值的時間格式。這樣我們就可以自動識別這些時間分區哪個分區是最新的,哪個分區是上一分區的。
五、問答環節
Q1:DLF 元數據的管理,跟 Databricks 推的 unity catalog 有什么區別?
A1:DLF 元數據管理有點類似于 Hive Metastore 的升級。Databricks 推的 unity catalog 其實是跟它的執行引擎,Databricks 的 Spark 的綁定比較多,它是基于 Databricks 的引擎去做很多事情。我們對單個引擎的集成沒有 unity catalog 那么完整,但是更聚焦在云上的統一元數據,即同一份元數據可以被云上各種各樣的引擎,包括自研的和開源的引擎,都能統一的進行讀寫。總結:我們對云上統一的數據這個角度做的比較多,針對的是多引擎的打通。對某一個引擎內部做的集成沒有 unity catalog 那么深入。
Q2:DLF 的 OPEN API 是開源的嗎?
A2:首先我們是一個全托管的云產品,內部的實現是做成云服務。然后我們會提供標準的 API,用戶可以通過阿里云的 SDK 對 API 的調用和使用。最后我們的元數據 client,適配 Hive client,同時 client 本身也是開源的,內部的元數據服務是在云上實現的。
Q3:DLF 針對小文件治理,計算資源控制。跟湖格式相關的小文件合并的問題。
A3:目前因為我們的湖格式的小文件治理產品還處在公測階段,還沒有進行真正的計費。底層的資源我們是內部提供的,不使用用戶的資源。我們內部是會做一些針對單租戶的,最大的使用量的限制的。目前計費策略還沒有明確的推出。這個可能會等到后續足夠完善之后再去做相關事情。
Q4:現在的 Hive Hook 解析 HSQ 的 SQL,Matestore 的 listener 能監聽 DDL 嗎?
A4:我們現在實現的 listener 是能夠監聽到 DDL 的。首先 DLF 元數據本身,因為剛才提到了我們也有元倉。其實內部是會監聽到所有元數據的變更,同時我們也會基于引擎的 Hook 去監聽表查詢的信息,維護到 DLF 元倉里面。因為我們的實現是沒有 Metastore 的,用戶可以從 DLF 的 Data Profile 的 API 進行獲取。如果想自己實現像以前的 Hive Metastore 一樣的 Metastore 的 listener,這是不支持的。但是可以基于我們云上的 API 去獲取元信息。