導讀:本文將討論關于使用 Calcite 解析單條 SQL 獲取源表和結果表的思路。
實現思路
Apache Calcite是一款開源的動態數據管理框架,它提供了標準的SQL語言、多種查詢優化和連接各種數據源的能力,但不包括數據存儲、處理數據的算法和存儲元數據的存儲庫。
我們可以借助 Calcite SqlParser 解析器分析 SQL 并生成 AST 語法樹,并通過訪問 AST 的各個節點獲取到我們想要的信息。
具體實現
構建 Calcite SqlParser 解析器解析 SQL 并生成 AST 語法樹
@Test
void test() {
String sql = "insert into ... select ...";
// 在解析前可以對 SQL 語句進行預處理,比如將不支持的 && 替換為 AND, != 替換為 <>
SqlParser.Config config =
SqlParser.configBuilder()
.setParserFactory(FlinkSqlParserImpl.FACTORY)
.setLex(Lex.JAVA)
.setIdentifierMaxLength(256)
.build();
// 創建解析器
SqlParser sqlParser = SqlParser
.create(sql, config);
// 生成 AST 語法樹
SqlNode sqlNode;
try {
sqlNode = sqlParser.parseStmt();
} catch (SqlParseException e) {
throw new RuntimeException("使用 Calcite 進行語法分析發生了異常", e);
}
SqlBloodRes res = new SqlBloodRes();
// 遞歸遍歷語法樹
getDependencies(sqlNode, res, false);
}
此處筆者參考了 從 SQL 語句中解析出源表和結果表 - JR's Blog 所提供的思路。在 Calcite 解析出來的 AST 是以 SqlNode 的形式表現的,一個 SqlNode 即是 AST 中的一個節點。SqlNode 有許多類型,我們關注的 Source 和 Sink 表表名在 AST 中會是一個 SqlIdentifier 的葉子結點。(注意:并非所有 SqlIdentifier 葉子結點都對應表名,列名也對應 SqlIdentifier)
在一條 SQL 中,最終出現表的引用的情況歸結于以下兩種情況:
- SELECT 語句的 FROM clause 中的直接引用
- JOIN 語句中 LEFT 和 RIGHT clause 中的直接引用
嵌套子查詢的 SQL 語句中,最終進入到子查詢的 AST 子樹中,只要出現了對表的引用,一定會分解出以上兩種結構。因此,對于一個 SqlIdentifier 類型的葉子節點,在以下兩種情況下,該葉子結點就是一個表的引用:
- 父節點是 SqlSelect,且當前節點是父節點的 FROM 子句派生出的子節點
- 父節點是 SqlJoin(如果是 Lookup join 則節點為 SNAPSHOT 類型,需繼續深入子節點)
另外,一種特殊的情況需要加以考慮。在 SQL 中 AS 常用作起別名,因而可能 SqlIdentifier 的父節點是 AS,而 AS 的父節點是 SELECT 或 JOIN。這種情況下,我們可以將 AS 看作一種 “轉發” 結點,即 AS 的父節點和子節點忽略掉 AS 結點,直接構成父子關系。
從根結點開始遍歷 AST,解析所有的子查詢,找到符合上述兩種情況的子結構,就可以提取出所有對表的引用。
private SqlBloodRes getDependencies(SqlNode sqlNode, SqlBloodRes res, Boolean fromOrJoin) {
if (sqlNode.getKind() == JOIN) {
SqlJoin sqlKind = (SqlJoin) sqlNode;
getDependencies(sqlKind.getLeft(), res, true);
getDependencies(sqlKind.getRight(), res, true);
} else if (sqlNode.getKind() == IDENTIFIER) {
if (fromOrJoin) {
// 獲取 source 表名
res.getSourceTables().put(sqlNode.toString(), sqlNode.toString());
}
} else if (sqlNode.getKind() == AS) {
SqlBasicCall sqlKind = (SqlBasicCall) sqlNode;
if (sqlKind.getOperandList().size() >= 2) {
getDependencies(sqlKind.getOperandList().get(0), res, fromOrJoin);
}
} else if (sqlNode.getKind() == INSERT) {
SqlInsert sqlKind = (SqlInsert) sqlNode;
// 獲取 sink 表名
res.setSinkTable(sqlKind.getTargetTable().toString());
getDependencies(sqlKind.getSource(), res, false);
} else if (sqlNode.getKind() == SELECT) {
SqlSelect sqlKind = (SqlSelect) sqlNode;
List<SqlNode> list = sqlKind.getSelectList().getList();
for (SqlNode i : list) {
getDependencies(i, res, false);
}
getDependencies(sqlKind.getFrom(), res, true);
} else if (sqlNode.getKind() == SNAPSHOT) {
// 處理 Lookup join 的情況
SqlSnapshot sqlKind = (SqlSnapshot) sqlNode;
getDependencies(sqlKind.getTableRef(), res, true);
} else {
// TODO 這里可根據需求拓展處理其他類型的 sqlNode
}
return res;
}
結果封裝
@Data
public class SqlBloodRes {
private Map<String, String> sourceTables = new HashMap<>();
private String sinkTable;
}
最后
上面就是使用 Calcite 解析 SQL 獲取源表和結果表的思路,Demo 實現比較粗糙,各位可以根據實際場景及自身需求進行優化豐富。