轉載本文需注明出處:微信公眾號EAWorld,違者必究。
引言:
傳統 ETL 主要以 SQL 為主要技術手段,把數據經抽取、清洗轉換之后加載到數據倉庫。但是在如今移動互聯網大力發展的場景下,產生大量碎片化和不規則的數據。政府,公安等行業,傳統數據庫已經遠遠無法滿足需求。數據原始文件通過文件導入到基礎庫,再通過大數據 HQL等技術手段提取出二級庫,這中間的數據導入和 SQL ETL 的提取的過程,大量消耗 IO 性能和計算資源,在很多場景下已經是數據處理的瓶頸所在。
普元在實施公安項目過程中開發了一種基于 XML 描述的可編程的函數 ETL 轉換方法。主要用于大數據文件處理領域,能從原始數據文件直接、快速加載到專題庫的技術手段。技術方案主要解決了用 XML 的技術手段描述數據文件的格式,包含文件字段切分、字段類型、默認值、異常值校驗、時間格式校驗。在處理時可添加自行開發的 JAVA UDF 函數,函數實參支持變量、常量、表達式、函數和運算符重載。同時函數支持多層嵌套,即內部函數的返回值最為外部函數的實參。該方案實現了 XML 內函數體的語法解析并在運行過程中直接編譯為 Java 字節碼的技術。有效的解決了政府、公安、電信行業巨量的數據處理需要的大量計算資源和 IO 性能瓶頸,有效的提高了數據處理效率和降低了數據處理開發難度。
目錄:
一、基于 XML 控制文件解析數據文件方案介紹
二、XML 控制文件結構和語法
三、函數和多層嵌套函數傳參
四、UDF 函數編寫方法
五、數據測試工具
六、FlumeOnYarn 架構和分布式部署
一、基于 XML 控制文件解析數據文件方案介紹
對于數據開發項目,我們常常會面臨眾多的數據對接,部分場景不僅數據量大,且數據種類多,數據解析開發工作量巨大。對于大量數據對接,一般設計的 RPC 接口和 WebService 一般都達不到數據性能要求的。并且他們都是點對點的服務,一旦上下游系統故障,都會造成整個數據對接異常。因此大部分都會選擇使用文件的方式進行數據對接。
對于非實時數據對接需求,這種方式的優點:
- 在數據量大的情況下,可以通過文件傳輸,上游只寫入,無需關心數據業務和故障;
- 方案簡單,避免了網絡協議相關的概念;
- 維護簡單,只需保證磁盤寫入穩定性即可;
我們常常會面臨基于此架構的數據對接。但基于此架構數據處理工作都在下游(即數據使用方)。
面對大量數據對接和眾多的數據類型,我們對于每種數據文件解析、解碼、清洗消耗大量的人力,并且基于編碼的方式對于較多數據類型的場景代碼量大,且難以管理。因此經過多次數據開發實踐,我們開發了一種基于 XML 描述的方式來解析和清洗數據文件的實現。
本架構實現適合以下幾個方面:
- 基于文件的數據對接;
- 文件無法直接導入到目標數據庫,需要做轉換,清洗為目標格式;
如上數據對接架構圖,Flume 基本實現了基于文件系統的自動掃描和讀取,因此架構實現了基于 Flume Sink 的模塊。本架構也可作為SDK 作為框架集成到現有數據處理方案中。
二、XML數據控制文件結構和語法
<?xml version="1.0" encoding="UTF-8"?> <schema> <key>JD_TYPE_V1</key> <type>textfile</type> <delimiter>,</delimiter> <fields> <field type="int">exp_flag</field> <field type="string">sender_id</field> <field type="string">sender_num</field> <field type="string" value="unknown">sender_address</field> <field type="string">receiver_num</field> <field type="date" pattern="yyyy-MM-dd HH:mm:ss">expect_time</field> <field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field> <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field> <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field> <field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field> <field type="string" default="true" value="province_code(sender_province)">sender_province_code</field> </fields> </schema>
如上 XML 描述了一種數據文件類型及該類型的切分方法,數據每行經過切分后,產生的多個數據列的轉換方法。
理論上,每種數據類型應該對應一個控制文件,意味著控制文件來描述該種數據類型如何解析和轉換。
- Key 主要標注該控制文件處理的類型ID;
- Delimiter 為文件列切割字符;
- Fields 中包含每列的字段描述;
- 數據類型支持Java基本類型和date類型;
- Skip為數據對齊語法,控制在列中忽略某列的值;
- Default = true 屬性為數據對齊語法,給某列提供默認值,提供默認值的列在數據列中不移動位移;
- Value 提供了給該字段提供當列中無值時提供默認值;value=null則指定列值為null;
- Date 類型需 pattern 屬性;
三、函數和多層嵌套函數傳參
默認值
詞法分析時字段field 的value 屬性值沒有以英文小括號閉合的實體。如下示例中的primeton:
<field type="string" default="true" value="primeton">data_vendor</field>
函數
函數是由一組字符串、數字、下劃線組成的合法函數名和0 到多個形式參數組成。在詞法分析時字段field 的 value 屬性值由英文小括號閉合的實體。如下示例中的:
location(),yn(),concat(); <field type="string" default="true" value=" unix_timestamp ">curr_time</field> <field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field> <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field> <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field>
函數名
函數體小括號前面的部分。一般由字符串、數字、下劃線組成的一組特定的名稱。如location(receiver_tel),location 即為該函數的函數名稱。
函數的形式參數:
1.無參數
詞法分析時value的值滿足函數條件且函數體內無參數。如下示例中:unix_timestamp() 獲得當前系統內的 Unix 時間戳;
<field type="string" default="true" value=" unix_timestamp()">curr_time</field>
2.常量型形參
詞法分析時函數體內以英文單引號引用的值為函數體的常量型形參。如’100’,函數示例為:random_int(‘100’),生成 0-100 以內的隨機整形數值;
<field type="string" default="true" value="random_int(‘100’)">rand_num</field>
3.變量型形參
詞法分析時函數體內參數沒有英文單引號引用并且不以英文小括號閉合的為函數體的變量型形參。如下示例中的receiver_tel;
<field type="string" default="true" value="location(receiver_tel)">r_num_loc</field>
4.函數型形參
詞法分析時函數體內沒有英文單引號并且以英文小括號閉合的參數類型參數為函數體的函數型參數。如下示例中的:none(sender_num)和none(receiver_num);
<field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field>
詞法分析獲得到函數體的同時,使用函數名調用UdfRegistors.getUdf(udfName) 函數,以檢驗當前系統必要存在該函數,否則則拋出無法識別的函數異常。
5.類型校驗
詞法分析階段獲得了字段 field 的取值是默認值或者函數,下一步需校驗其默認值或函數的返回值是否能和定義的字段類型相匹配。如果是函數同時校驗函數的形參和實參類型是否相匹配。
<field type="string" default="true" value="primeton">data_vendor</field> <field type="int" default="true" value="2">call_flag</field>
如上示例中的primeton 需能轉換為 string 類型,call_flag 需能轉換為 int 類型。如果類型不能轉換,則會拋出類型無法轉換異常。對于函數,通過 returnType 返回類型和字段類型進行校驗,可匹配或者是該類型的子類型則類型驗證通過。
四、UDF 函數編寫方法
編寫一個UDF函數的步驟:
- 繼承 UDF 類,實現 eval 方法;
- Eval 方法傳入的是一個數組參數;
- 判斷參數長度是否和預期的一致;
- 判斷位置參數類型是否和預期的一致;
- 實現函數體;
- 返回eval函數執行的返回值,理論上該返回值的類型應該一致,不應該同一函數返回多種類型值;
- 函數編寫者應該保證函數體內是線程安全的;
UDF 實現如下:
public abstract class UDF { /** * 是否支持該組參數類型,不支持拋出UnsupportedTypeException異常。默認返回 true */ public void support(Class<?>... paramsClass)throws UnsupportedTypeException; /** * 該 UDF 返回值類型,用于校驗嵌套函數類型是否匹配。可返回簡單類型,map,array,record 等類型.默認返回 String 類型 */ public Class<?> returnType(); /** * UDF 執行函數,當輸入不符合預期時,向外拋出異常 * @param params 函數的輸入實參 * @return 函數輸出結果,簡單類型或者復雜類型,支持簡單類型,map,array,record 類型 */ public abstract Object eval(Object... params); }
一個判斷是否包含子串的UDF 寫法:
所有的UDF都通過一個核心注冊類(這點類似 Hive 的FunctionRegistry)
public final class UdfRegistors { /** * UDF 函數映射 */ static final Map<String, UDF> UDF_CACHED = new HashMap<String, UDF>(); static { UDF_CACHED.put("copy", new CopyUDF()); // 復制一個變量的值 UDF_CACHED.put("eq", new EqUDF()); // 判斷兩個變量是否相等 UDF_CACHED.put("yn", new YnUDF()); // 根據輸入true,false 轉換為 Y、N UDF_CACHED.put("null", new NullUDF()); // 判斷變量是否為null // add udf method UDF_CACHED.put("location", new LocationUDF()); // 獲得手機號碼的歸屬地 UDF_CACHED.put("nation_code", new NationCodeUDF()); // 根據國家名稱獲取國家代碼 UDF_CACHED.put("province_code", new ProvinceCodeUDF()); //根據省名稱獲取省代碼 UDF_CACHED.put("city_code", new CityCodeUDF()); // 根據城市名稱獲取城市代碼 UDF_CACHED.put("phone_num", new PhoneNumUDF()); // 校驗是否是手機號或者固話 UDF_CACHED.put("number_format", new NumberFormatUDF()); //校驗是否可以轉化成數字 } /** * 添加一個UDF函數 * @param key UDF 函數 * @param value UDF 函數 eval 應線程安全 * @return */ public static boolean addUdf(String key, UDF value) { return UDF_CACHED.put(Optional.of(key).map((it)->it.toLowerCase()).get(), value) != null; } /** * 獲得內置的 udf 函數 */ public static UDF getUdf(String udfName) { return UDF_CACHED.get(udfName.toLowerCase()); } }
UDF 函數注冊時期:
- 可在編譯期綁定內置的 UDF 函數;
- 可在系統啟動時配置自加載的 UDF 函數;
- 可在運行期動態注入UDF 函數;
五、數據測試工具
數據對接過程,面對數據是否能轉換為目標結果常常無從所知。基于XML 控制文件的數據解析,可實現一個測試工具。該工具通過上傳數據文件和上傳 XML 控制文件,可對數據文件隨機的讀取行進行匹配測試,只要數據列和目標 XML文件能通過列匹配測試,則數據可通過 ETL 解析清洗。否則繼續修改 XML 控制文件,直到順利通過匹配。
六、FlumeOnYarn 架構和分布式部署
本架構適合以文件作為數據對接的方案,另一方面,通過擴展 Flume 即可實現拿來主義。Flume 內部實現對 Channel 的 Transaction,對于每個以文件構造的 Event 對象是原子操作,要么全部成功,要么失敗。flume依賴事務來保證event的可靠性。Flume 默認沒有分布式實現,因此開發了 FlumeOnYarn 的架構,用于支持 Flume 的分布式部署。
FlumeOnYarn優勢:
- 無需每個節點安裝 Flume,可一鍵啟動和停止;
- 配置文件在客戶端節點修改,自動復制到 Yarn 上各實例,無需每個節點修改;
- 基于 CDH或HDP的發行版,即使實現了 Web 可視化化的配置和分布式部署,但是對于 Flume 只能實現單配置文件實例,無法實現多配置實例;
- 集群的規模可以根據數據量大小進行實時的調整(增減節點),實現彈性處理。通過命令或者 api 即可控制(CDH 等需要在頁面添加 host,繁瑣且不易動態調整);
- 多個租戶或者同一租戶多個處理實例互不影響,且能隔離(Yarn Container);
FlumeOnYarn 架構
上圖所示,提交FlumeOnYarn 需要客戶端,該客戶端沒有太多和Flume安裝包結構特殊的地方,只是在 lib 下添加了 flume-yarn 的架構支持和 bin 下 flume-on-yarn 的啟動腳本。
Flume OnYarn 客戶端程序
通過 bin/flume-on-yarn 即可提交 FlumeOnYarn Application 集群。如下的命令即可一次性申請多個 Yarn 資源節點,實現一鍵部署:
bin/flume-on-yarn yarn -s --name agent_name –conf conf/flume-hdfs.conf --num-instances 5
總結
關于作者:震秦,普元資深開發工程師,專注于大數據開發 8 年,擅長 Hadoop 生態內各工具的使用和優化。參與某公關廣告(上市)公司DMP 建設,負責數據分層設計和批處理,調度實現,完成交付使用;參與國內多省市公安社交網絡項目部署,負責產品開發(Spark 分析應用);參與數據清洗加工為我方主題庫并部署上層應用。
關于EAWorld:微服務,DevOps,數據治理,移動架構原創技術分享。