1、場景描述
例如訂單庫進行了分庫分表,其實例如下圖所示:
現在的需求是希望創建一個任務就將數據同步到MQ集群,而不是為每一個數據庫實例單獨創建一個任務,將其數據導入到MQ集群,因為同步任務除了庫不同之外,表的結構、數據映射規則都是一致的。
2、flinkx 的解決方案詳解
2.1 fink Stream API 開發基本流程
使用 Flink Stream API 編程的通用步驟如下圖所示:
溫馨提示:有關 Stream API 的詳細內容將在后續的文章中展開,本文主要是關注 InputFormatSourceFunction,重點關注數據源的拆分。
2.2 flinkx Reader(數據源)核心類圖
在 flinkx 中將不同的數據源封裝成一個個 Reader,其基類為 BaseDataReader,上圖中主要羅列了如下幾個關鍵的類體系:
- InputFormat
flink 核心API,主要是對輸入源進行數據切分、讀取數據的抽象,其核心接口說明如下:1)void configure(Configuration parameters)對輸入源進行額外的配置,該方法在 Input 的生命周期中只需調用一次。2)BaseStatistics getStatistics(BaseStatistics cachedStatistics)返回 input 的統計數據,如果不需要統計,在實現的時候可以直接返回 null。3)T[] createInputSplits(int minNumSplits)對輸入數據進行數據切片,使之支持并行處理,數據切片相關類體系見:InputSplit。4)InputSplitAssigner getInputSplitAssigner(T[] inputSplits)獲取 InputSplit 分配器,主要是在具體執行任務時如何獲取下一個 InputSplit,其聲明如下圖所示:
5)void open(T split)
根據指定的數據分片 (InputSplit) 打開數據通道。為了加深對該方法的理解,下面看一下 Flink
x 關于 jdbc、es 的寫入示例:
6)boolean reachedEnd()
數據是否已結束,在 Flink 中通常 InputFormat 的數據源通常表示有界數據 (DataSet)。
7)OT nextRecord(OT reuse)
從通道中獲取下一條記錄。
8)void close()
關閉。
- InputSplit
數據分片根接口,只定義了如下方法:1) int getSplitNumber()獲取當前分片所在所有分片中的序號。本文先簡單介紹一下其通用實現子類:GenericInputSplit。1)int partitionNumber當前 split 所在的序號2)int totalNumberOfPartitions總分片數為了方便理解我們可以思考一下如下場景,對于一個數據量超過千萬級別的表,在進行數據切分時可以考慮使用10個線程,即切割成 10分,那每一個數據線程查詢數據時可以 id % totalNumberOfPartitions = partitionNumber,進行數據讀取。 - SourceFunction
Flink 源的抽象定義。 - RichFunction
富函數,定義了生命周期、可獲取運行時環境上下文。 - ParallelSourceFunction
支持并行的 source function。 - RichParallelSourceFunction
并行的富函數 - InputFormatSourceFunction
Flink 默認提供的 RichParallelSourceFunction 實現類,可以當成是RichParallelSourceFunction 的通用寫法,其內部的數據讀取邏輯由 InputFormat 實現。 - BaseDataReader
flinkx 數據讀取基類,在 flinkx 中將所有的數據讀取源封裝成 Reader 。
2.3 flinkx Reader構建 DataStream 流程
經過了上面類圖的梳理,大家應該 flink 中提到的上述類的含義有了一個大概的理解,但如何運用呢?接下來將通過查閱 flinkx 的 DistributedJdbcDataReader(BaseDataReader的子類)的 readData 調用流程,體會一下其使用方法。
基本遵循創建 InputFormat、從而創建對應的 SourceFunction,然后通過 StreamExecutionEnvironment 的 addSource 方法將 SourceFunction 創建對應的 DataStreamSource。
2.4 flinkx 針對數據庫分庫分表任務拆分解決方案
正如本文開頭部分的場景描述那樣,某訂單系統被設計成4庫8表,每一個庫(Schema)中包含2個表,如何提高數據導出的性能呢,如何提高數據的抽取性能呢?通常的解決方案如下:
- 首先按庫按表進行拆分,即4庫8表,可以進行切分8份,每一個數據分配處理一個實例中的1個表。
- 單個表的數據抽取再進行拆分,例如按ID進行取模進一步分解。
flinkx 就是采取上面的策略,我們來看一下其具體做法。
Step1:首先先根據數據庫實例、表進行拆分,按表維度組織成一個 DataSource 列表,后續將基于這個原始數據執行拆分算法。
接下來具體的任務拆分在 InputFormat 中實現,本實例在 DistributedJdbcInputFormat 的 createInputSplitsInternal 中。
DistributedJdbcInputFormat#createInputSplitsInternal
Step2:根據分區創建 inputSplit 數組,這里分區的概念就相當于上文提到方案中的第一條。
DistributedJdbcInputFormat#createInputSplitsInternal
Step3:如果指定了 splitKey 的任務拆分算法,首先 DistributedJdbcInputSplit 繼承自 GenericInputSplit,總分區數為 numPartitions,然后生成數據庫的參數,這里主要是生成 SQL Where 語句中的 splitKey mod totalNumberOfPartitions = partitionNumber,其中 splitKey 為分片鍵,例如 id,而 totalNumberOfPartitions 表示分區總數,partitionNumber 表示當前分片的序號,通過 SQL 取模函數進行數據拆分。
DistributedJdbcInputFormat#createInputSplitsInternal
Step4:如果未指定表級別的數據拆分鍵,則拆分策略是對 sourceList 進行拆分,即一些分區處理其中幾個表。
關于 flinkx 中關于任務切分的介紹就到這里了。
3、總結
本文主要是基于 flinkx 介紹 MySQL 分庫分表情況下如何基于 flink 進行任務切分,簡單介紹了 Flink 中關于基本的編程范式、InputFormat、SourceFunction 的基本類體系。
溫馨提示:本文并沒有太詳細對 Flink API 進行深入研究,后續會單獨對 Flink 內容進行逐一剖析,但 Flink 系列的文章組織,其文章的組織并不具備順序性,筆者會在不斷實踐 Flink 的過程中對 FLink 進行剖析。