作者 | HamaWhite
審校| 蔡芳芳
在當(dāng)今數(shù)字化時(shí)代,數(shù)據(jù)已經(jīng)成為企業(yè)和組織中不可或缺的重要資產(chǎn),包括個(gè)人信息、商業(yè)機(jī)密、財(cái)務(wù)數(shù)據(jù)等等。然而,隨著數(shù)據(jù)泄露和安全問題的不斷增加,數(shù)據(jù)脫敏已經(jīng)成為了一項(xiàng)非常重要的工作。隨著以 Flink 為代表的實(shí)時(shí)數(shù)倉的興起,企業(yè)對(duì)實(shí)時(shí)數(shù)據(jù)安全的需求越來越迫切。但由于 Flink 實(shí)時(shí)數(shù)倉領(lǐng)域發(fā)展相對(duì)較短,Apache Ranger 尚不支持 FlinkSQL,且依賴 Ranger 會(huì)導(dǎo)致系統(tǒng)的部署和運(yùn)維愈加復(fù)雜。
因此,自研出 FlinkSQL 的數(shù)據(jù)脫敏方案,支持面向用戶級(jí)別的數(shù)據(jù)脫敏訪問控制,即特定用戶只能訪問到脫敏后的數(shù)據(jù)。在技術(shù)實(shí)現(xiàn)上做到對(duì) Flink 和 Calcite 源碼的零侵入,可以快速集成到已有實(shí)時(shí)平臺(tái)產(chǎn)品中。
一、基礎(chǔ)知識(shí)
1.1 數(shù)據(jù)脫敏
數(shù)據(jù)脫敏 (Data Masking) 是一種數(shù)據(jù)安全技術(shù),用于保護(hù)敏感數(shù)據(jù),以防止未經(jīng)授權(quán)的訪問。該技術(shù)通過將敏感數(shù)據(jù)替換為虛假數(shù)據(jù)或不可識(shí)別的數(shù)據(jù)來實(shí)現(xiàn)。例如可以使用數(shù)據(jù)脫敏技術(shù)將信用卡號(hào)碼、社會(huì)安全號(hào)碼等敏感信息替換為隨機(jī)生成的數(shù)字或字母,以保護(hù)這些信息的隱私和安全。
1.2 業(yè)務(wù)流程
下面用訂單表orders的兩行數(shù)據(jù)來舉例,示例數(shù)據(jù)如下:
1.2.1 設(shè)置脫敏策略
管理員配置用戶、表、字段、脫敏條件,例如下面的配置。
1.2.2 用戶訪問數(shù)據(jù)
當(dāng)用戶在 Flink 上查詢orders表的數(shù)據(jù)時(shí),會(huì)在底層結(jié)合該用戶的脫敏條件重新生成 SQL,即讓數(shù)據(jù)脫敏生效。當(dāng)用戶 A 和用戶 B 在執(zhí)行下面相同的 SQL 時(shí),會(huì)看到不同的結(jié)果數(shù)據(jù)。
用戶 A 查看到的結(jié)果數(shù)據(jù)如下,customer_name字段的數(shù)據(jù)被全部掩蓋掉。
用戶 B 查看到的結(jié)果數(shù)據(jù)如下,customer_name字段的數(shù)據(jù)只顯示前 4 位,剩下的用 x 代替。
二、Hive 數(shù)據(jù)脫敏解決方案
在離線數(shù)倉工具 Hive 領(lǐng)域,由于發(fā)展多年已有 Ranger Column Masking 方案來支持字段數(shù)據(jù)的脫敏控制,詳見參考文獻(xiàn) [1]。下圖是在 Ranger 里配置 Hive 表數(shù)據(jù)脫敏條件的頁面,供參考。
但由于 Flink 實(shí)時(shí)數(shù)倉領(lǐng)域發(fā)展相對(duì)較短,Ranger 還不支持 FlinkSQL,以及依賴 Ranger 的話會(huì)導(dǎo)致系統(tǒng)部署和運(yùn)維過重,因此開始 自研實(shí)時(shí)數(shù)倉的數(shù)據(jù)脫敏解決工具。當(dāng)然本文中的核心思想也適用于 Ranger 中,可以基于此較快開發(fā)出 ranger-flink 插件。
三、FlinkSQL 數(shù)據(jù)脫敏解決方案
3.1 解決方案
3.1.1 FlinkSQL 執(zhí)行流程
可以參考作者文章 [FlinkSQL 字段血緣解決方案及源碼],本文根據(jù) Flink1.16 修正和簡(jiǎn)化后的執(zhí)行流程如下圖所示。
在CalciteParser進(jìn)行parse()和validate()處理后會(huì)得到一個(gè) SqlNode 類型的抽象語法樹 (Abstract Syntax Tree,簡(jiǎn)稱 AST),本文會(huì)針對(duì)此抽象語法樹來組裝行級(jí)過濾條件后生成新的 AST,以實(shí)現(xiàn)數(shù)據(jù)脫敏控制。
3.1.2 Calcite 對(duì)象繼承關(guān)系
下面章節(jié)要用到 Calcite 中的 SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall 和 SqlSelect 等類,此處進(jìn)行簡(jiǎn)單介紹以及展示它們間繼承關(guān)系,以便讀者閱讀本文源碼。
3.1.3 解決思路
針對(duì)輸入的 Flink SQL,在CalciteParser進(jìn)行語法解析 (parse) 和語法校驗(yàn) (validate) 后生成抽象語法樹 (Abstract Syntax Tree,簡(jiǎn)稱 AST) 后,采用自定義Calcite SqlBasicVisitor的方法遍歷 AST 中的所有SqlSelect,獲取到里面的每個(gè)輸入表。如果輸入表中字段有配置脫敏條件,則針對(duì)輸入表生成子查詢語句,并把脫敏字段改寫成CAST(脫敏函數(shù) (字段名) AS 字段類型) AS 字段名, 再通過CalciteParser.parseExpression()把子查詢轉(zhuǎn)換成 SqlSelect,并用此 SqlSelect 替換原 AST 中的輸入表來生成新的 AST,最后得到新的 SQL 來繼續(xù)執(zhí)行。
3.2 詳細(xì)方案
3.2.1 解析輸入表
通過對(duì) Flink SQL 語法的分析和研究,最終出現(xiàn)輸入表的只包含以下兩種情況:
- SELECT 語句的 FROM 子句,如果是子查詢,則遞歸繼續(xù)遍歷。
- SELECT ... JOIN 語句的 Left 和 Right 子句,如果是多表 JOIN,則遞歸查詢遍歷。
因此,下面的主要步驟會(huì)根據(jù) FROM 子句的類型來尋找輸入表。
3.2.2 主要步驟
主要通過 Calcite 提供的訪問者模式自定義 DataMaskVisitor 來實(shí)現(xiàn),遍歷 AST 中所有的 SqlSelect 對(duì)象用子查詢替換里面的輸入表。下面詳細(xì)描述替換輸入表的步驟,整體流程如下圖所示。
- 遍歷 AST 中的 SELECT 語句。
- 判斷是否自定義的 SELECT 語句 (由下面步驟 9 生成),是則跳轉(zhuǎn)到步驟 10,否則繼續(xù)步驟 3。
- 判斷 SELECT 語句中的 FROM 類型,按照不同類型對(duì)應(yīng)執(zhí)行下面的步驟 4、5 和 10。
- 如果 FROM 是 SqlJoin 類型,則分別遍歷其左 Left 和 Right 右節(jié)點(diǎn),即執(zhí)行當(dāng)前步驟 4 和步驟 6。由于可能是三張表及以上的 Join,因此進(jìn)行遞歸處理,即針對(duì)其左節(jié)點(diǎn)跳回到步驟 3。
- 如果 FROM 是 SqlBasicCall 類型,還需要判斷是否來自子查詢,是則跳轉(zhuǎn)到步驟 10 繼續(xù)遍歷 AST,后續(xù)步驟 1 會(huì)對(duì)子查詢中的 SELECT 語句進(jìn)行處理。否則跳轉(zhuǎn)到步驟 7。
- 遞歸處理 Join 的右節(jié)點(diǎn),即跳回到步驟 3。
- 遍歷表中的每個(gè)字段,如果某個(gè)字段有定義脫敏條件,則把改字段改寫成格式CAST(脫敏函數(shù) (字段名) AS 字段類型) AS 字段名,否則用原字段名。
- 針對(duì)步驟 7 處理后的字段,構(gòu)建子查詢語句,形如 (SELECT 字段名 1, 字段名 2, CAST(脫敏函數(shù) (字段名 3) AS 字段類型) AS 字段名 3、字段名 4 FROM 表名) AS 表別名。
- 對(duì)步驟 8 的子查詢調(diào)用CalciteParser.parseExpression()進(jìn)行解析,生成自定義的 SELECT 語句,并替換掉原 FROM。
- 繼續(xù)遍歷 AST,找到里面的 SELECT 語句進(jìn)行處理,跳回到步驟 1。
3.2.3 Hive 及 Ranger 兼容性
在 Ranger 中,默認(rèn)的脫敏策略的如下所示。通過調(diào)研發(fā)現(xiàn) Ranger 的大部分脫敏策略是通過調(diào)用 Hive 自帶或自定義的系統(tǒng)函數(shù)實(shí)現(xiàn)的。
由于 Flink 支持 Hive Catalog,在 Flink 能調(diào)用 Hive 系統(tǒng)函數(shù)。因此,本方案也支持在 Flink SQL 配置 Ranger 的脫敏策略。
四、用例測(cè)試
源碼地址:https://Github.com/HamaWhiteGG/flink-sql-security
注: 如果用 IntelliJ IDEA 打開源碼,請(qǐng)?zhí)崆鞍惭b Manifold插件。
用例測(cè)試數(shù)據(jù)來自于 CDC Connectors for Apache Flink 官網(wǎng),本文給orders表增加一個(gè) region 字段,再增加'connector'='print'類型的 print_sink 表,其字段和orders表的一樣,數(shù)據(jù)庫建表及初始化 SQL 位于 data/database 目錄下。
下載本文源碼后,可通過 Maven 運(yùn)行單元測(cè)試,測(cè)試用例中的 catalog 名稱是hive,database 名稱是default。
$cdflink-sql-security $mvn test詳細(xì)測(cè)試用例可查看源碼中的單測(cè)RewriteDataMaskTest和ExecuteDataMaskTest,下面只描述兩個(gè)案例。
4.1 測(cè)試 SELECT
4.1.1 輸入 SQL
用戶 A 執(zhí)行下述 SQL:
SELECTorder_id, customer_name, product_id, region FROMorders
4.1.2 根據(jù)脫敏條件重新生成 SQL
- 輸入 SQL 是一個(gè)簡(jiǎn)單 SELECT 語句,經(jīng)過語法分析和語法校驗(yàn)后 FROM 類型是SqlBasicCall,SQL 中的表名orders會(huì)被替換為完整的hive.default.orders,別名是orders。
- 由于用戶 A 針對(duì)字段customer_name定義脫敏條件 MASK(對(duì)應(yīng)函數(shù)是脫敏函數(shù)是mask),該字段在流程圖中的步驟 8 中被改寫為CAST(mask(customer_name) AS STRING) AS customer_name,其余字段未定義脫敏條件則保持不變。
- 然后在步驟 8 的操作中,表名hive.default.orders被改寫成如下子查詢,子查詢兩側(cè)用括號(hào)()進(jìn)行包裹,并且用 AS 別名來增加表別名。
4.1.3 輸出 SQL 和運(yùn)行結(jié)果
最終執(zhí)行的改寫后 SQL 如下所示,這樣用戶 A 查詢到的顧客姓名customer_name字段都是掩蓋后的數(shù)據(jù)。
SELECT orders.order_id, orders.customer_name, orders.product_id, orders.region FROM( SELECT order_id, order_date, CAST(mask(customer_name) ASSTRING) AScustomer_name, product_id, price, order_status, region FROM hive.default.orders ) ASorders4.2 測(cè)試 INSERT-SELECT
4.2.1 輸入 SQL
用戶 A 執(zhí)行下述 SQL:
INSERTINTOprint_sink SELECT* FROMorders
4.2.2 根據(jù)脫敏條件重新生成 SQL
通過自定義 Calcite DataMaskVisitor 訪問生成的 AST,能找到對(duì)應(yīng)的 SELECT 語句SELECT * FROM orders,注意在語法校驗(yàn)階段 * 會(huì)被改寫成表中所有字段。針對(duì)此 SELECT 語句的改寫邏輯同上,不再闡述。
4.2.3 輸出 SQL 和運(yùn)行結(jié)果
最終執(zhí)行的改寫后 SQL 如下所示,注意插入到print_sink表的customer_name字段是掩蓋后的數(shù)據(jù)。
INSERTINTOprint_sink ( SELECT orders.order_id, orders.order_date, orders.customer_name, orders.product_id, orders.price, orders.order_status, orders.region FROM( SELECT order_id, order_date, CAST(mask(customer_name) ASSTRING) AScustomer_name, product_id, price, order_status, region FROM hive.default.orders ) ASorders )五、參考文獻(xiàn)
- Apache Ranger Column Masking in Hive(https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/authorization-ranger/content/dynamic_resource_based_column_masking_in_hive_with_ranger_policies.html)
- FlinkSQL 字段血緣解決方案及源碼 (https://github.com/HamaWhiteGG/flink-sql-lineage/blob/main/README_CN.md)
- 從 SQL 語句中解析出源表和結(jié)果表 (https://blog.jrwang.me/2018/parse-table-in-sql)
- 基于 Flink CDC 構(gòu)建 MySQL 和 Postgres 的 Streaming ETL(https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html)
- HiveQL—數(shù)據(jù)脫敏函數(shù) (https://blog.csdn.NET/CPP_MAYIBO/article/details/104065839)