分享嘉賓:張海濤 海康威視
編輯整理:Hoh
內容來源:DataFun Talk
出品社區:DataFun
導讀:大家好,很榮幸跟大家分享 Apache Beam 架構原理及應用實踐。講這門課之前大家可以想想,從進入 IT 行業以來,不停的搬運數據,不管職務為前端,還是后臺服務器端開發。隨著這兩年科技的發展,各種數據庫,數據源,應運而生,大數據組件,框架也是千變萬化,從 Hadoop 到現在的 Spark、Flink,數據庫從先前的 oracle、MySQL 到現在的 NOSQL,不斷延伸。那么有沒有統一的框架,統一的數據源搬磚工具呢?
帶著這樣的疑問,開始我們今天的分享,首先是內容概要:
- Apache Beam 是什么?
- Apache Beam 的優勢
- Apache Beam 的架構設計
- Apache Beam 的核心組件刨析
- AloT PB 級實時數據,怎么構建自己的“AI 微服務”?
▌Apache Beam 是什么?
1. Apache Beam 的前世今生

大數據起源于 google 2003年發布的三篇論文 GoogleFS、MapReduce、BigTable 史稱三駕馬車,可惜 Google 在發布論文后并沒有公布其源碼,但是 Apache 開源社區蓬勃發展,先后出現了 Hadoop,Spark,Apache Flink 等產品,而 Google 內部則使用著閉源的 BigTable、Spanner、Millwheel。這次 Google 沒有發一篇論文后便銷聲匿跡,2016年2月 Google 宣布 Google DataFlow 貢獻給 Apache 基金會孵化,成為 Apache 的一個頂級開源項目。然后就出現了 Apache Beam,這次不它不是發論文發出來的,而是谷歌開源出來的。2017年5月17日 發布了第一個穩定版本2.0。
2. Apache Beam 的定義

Apache Beam 的定義如上圖,其定位是做一個統一前后端的模型。其中,管道處理和邏輯處理是自己的,數據源和執行引擎則來自第三方。那么,Apache Beam 有哪些好處呢?
▌Apache Beam 的優勢
1. 統一性

① 統一數據源,現在已經接入的 JAVA 語言的數據源有34種,正在接入的有7種。Python 的13種。這是部分的數據源 logo,還有一些未寫上的,以及正在集成的數據源。基本涵蓋了整個 IT 界每個時代的數據源,數據庫。

② 統一編程模型,Beam 統一了流和批,抽象出統一的 API 接口。

③ 統一大數據引擎,現在支持性最好的是 flink,spark,dataflow 還有其它的大數據引擎接入進來。
2. 可移植性

Beam 的 jar 包程序可以跨平臺運行,包括 Flink、Spark 等。
3. 可擴展性

很多時候,隨著業務需求的不斷變化,用戶的需要也隨之變化,原來 Apache Beam 的功能可能需要進行擴展。程序員就會根據不同的需求擴展出新的技術需求,例如我想用 spark 新特性,能不能重寫一下 sparkrunner 換個版本。我想重寫一下 kafkaIO 可以嗎?對于數據的編碼,我可以自定義嗎?最后干脆我感覺 Pulsar 技術不錯,我想自己寫個 SDKIO,集成進去可以不?答案都是可以的。Apache Beam 是具有可擴展性的,零部件都可以重塑。
4. 支持批處理和流處理

如果在 AIoT 行業,開發過程中,我們可能經常碰到兩種數據:
- 攝像頭等傳感器的實時報警信息
- 不同數據庫的數據,進行一起處理
Beam 對這兩種數據是同時支持的。
5. 支持多語言開發

此外 Beam 支持 java,Python,go,Scala 語言,大家可以利用自己擅長的語言開發自己的 Beam 程序。
6. DAG 高度抽象

DAG,中文名“有向無環圖”。“有向”指的是有方向,準確的說應該是同一個方向,“無環”則指夠不成閉環。如果做一些去重、統計、分組等,開發人員不用再做 Map Reduce ,Beam 已經封裝提供了相應的高級操作。
▌Apache Beam 的架構設計
我們接下來看一下 Beam 架構是怎樣的:
1. Apache Beam 的總體架構

Apache Beam 的總體架構是這樣的,上面有各種語言,編寫了不同的 SDKs,Beam 通過連接這些 SDK 的數據源進行管道的邏輯操作,最后發布到大數據引擎上去執行。需要注意的是,Local 雖然是一個 runner 但是不能用于生產上,它是用于調試/開發使用的。
2. Apache Beam 的部署流程圖

讓我們一起看下 Apache Beam 總體的部署流程。首先我們去構建這個 Beam jobAPI .jar 通過 job 服務器以及設置大數據執行平臺,最后提交 flink 或 spark 的任務集群去執行任務。
▌Apache Beam 的核心組件刨析
1. SDks+Pipeline+Runners (前后端分離)

如上圖,前端是不同語言的 SDKs,讀取數據寫入管道, 最后用這些大數據引擎去運行??梢园l現完整的 beam 程序由 SDks+Pipeline+Runners 構成的。
2. 什么是 SDK?

什么是 SDK,就是一個編寫 beam 管道構成的一部分,一個客戶端或一個類庫組件也可以,最后提交到大數據運行平臺上。
3. Beam 版本和 Kafka-clients 依賴情況表

我們以 kafka 為例,看一下 Kafka-client 對版本的依賴情況,從圖中可以看出 beam 2.6.0 版本的 api 改變基本是穩定的。當然,現在用的比較多的2.4、2.5版本。吐個槽,2.6版本之前的兼容性問題,上個版本還有這個類或方法,下一個版本就沒有了,兼容性不是很好。
4. SDK beam-sdks-java-io-kafka 讀取源碼剖析





① 指定 KafkaIO 的模型,從源碼中不難看出這個地方的 KafkaIO<K,V> 類型是 Long 和 String 類型,也可以換成其他類型。
pipeline.Apply(KafkaIO.<Long, String>read() pipeline.apply(KafkaIO.<Long, String>read()
② 設置 Kafka 集群的集群地址。
.withBootstrapServers("broker_1:9092,broker_2:9092")
③ 設置 Kafka 的主題類型,源碼中使用了單個主題類型,如果是多個主題類型則用withTopics(List) 方法進行設置。設置情況基本跟 Kafka 原生是一樣的。
.withTopic("my_topic") // use withTopics(List<String>) to read from multiple topics.
④ 設置序列化類型。Apache Beam KafkaIO 在序列化的時候做了很大的簡化,例如原生 Kafka 可能要通過 Properties 類去設置 ,還要加上很長一段 jar 包的名字。
Beam KafkaIO 的寫法:
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
原生 Kafka 的設置:
Properties props = new Properties();
props.put("key.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
⑤ 設置 Kafka 的消費者屬性,這個地方還可以設置其他的屬性。源碼中是針對消費分組進行設置。
.updateConsumerProperties(ImmutableMap.of("group.id", my_beam_app_1"))
⑥ 設置 Kafka 吞吐量的時間戳,可以是默認的,也可以自定義。
.withLogAppendTime()
⑦ 相當于 Kafka 中 "isolation.level" , "read_committed",指定 KafkaConsumer 只應讀取非事務性消息,或從其輸入主題中提交事務性消息。流處理應用程序通常在多個讀取處理寫入階段處理其數據,每個階段使用前一階段的輸出作為其輸入。通過指定read_committed 模式,我們可以在所有階段完成一次處理。針對 "Exactly-once" 語義,支持 Kafka 0.11 版本。
.withReadCommitted()
⑧ 設置 Kafka 是否自動提交屬性 "AUTO_COMMIT",默認為自動提交,使用 Beam 的方法來設置。
set CommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize)
.commitOffsetsInFinalize()
⑨ 設置是否返回 Kafka 的其他數據,例如 offset 信息和分區信息,不用可以去掉。
.withoutMetadata() // PCollection<KV<Long, String>>
⑩ 設置只返回 values 值,不用返回 key。例如 PCollection,而不是 PCollection<Long,String>。
.apply(Values.<String>create()) // PCollection<String>
在寫入 Kafka 時完全一次性地提供語義,這使得應用程序能夠在 Beam 管道中的一次性語義之上提供端到端的一次性保證。它確保寫入接收器的記錄僅在 Kafka 上提交一次,即使在管道執行期間重試某些處理也是如此。重試通常在應用程序重新啟動時發生(如在故障恢復中)或者在重新分配任務時(如在自動縮放事件中)。Flink runner 通常為流水線的結果提供精確一次的語義,但不提供變換中用戶代碼的副作用。如果諸如 Kafka 接收器之類的轉換寫入外部系統,則這些寫入可能會多次發生。
在此處啟用 EOS 時,接收器轉換將兼容的 Beam Runners 中的檢查點語義與 Kafka 中的事務聯系起來,以確保只寫入一次記錄。由于實現依賴于 runners checkpoint 語義,因此并非所有 runners 都兼容。Beam 中 FlinkRunner 針對 Kafka 0.11+ 版本才支持,然而 Dataflow runner 和 Spark runner 如果操作 kafkaIO 是完全支持的。
關于性能的注意事項:
"Exactly-once" 在接收初始消息的時候,除了將原來的數據進行格式化轉換外,還經歷了 2 個序列化 - 反序列化循環。根據序列化的數量和成本,CPU 可能會漲的很明顯。通過寫入二進制格式數據(即在寫入 Kafka 接收器之前將數據序列化為二進制數據)可以降低 CPU 成本。
5. Pipeline

- 您輸入的數據存儲在哪里?
首先要確定你要構造幾條數據源,在 Beam 可以構建多條,構建之前可以選擇自己的 SDK 的 IO。
- 您的數據類型是什么樣的?
Beam 提供的是鍵值對的數據類型,你的數據可能是日志文本,格式化設備事件,數據庫的行,所以在 PCollection 就應該確定數據集的類型。
- 您想怎么去處理數據?
對數據進行轉換,過濾處理,窗口計算,SQL 處理等。在管道中提供了通用的 ParDo 轉換類,算子計算以及 BeamSQL 等操作。
- 您打算把數據最后輸出到哪里去?
在管道末尾進行 Write 操作,把數據最后寫入您自己想存放或最后流向的地方。

重要的是要理解變換不消耗 PCollections;相反,他們會考慮 a 的每個元素 PCollection 并創建一個新 PCollection 的輸出。這樣,您可以對不同的元素執行不同的操作 PCollection。這里是出現了兩條管,例如輸入 AR,AI,VAR,BT,BMP。

例如不同的數據源,有數據庫,文件,以及緩存等輸入進行合并。

一種是收費的拓藍公司出品叫 Talend Big Data Studio,有沒有免費的呢?

有的,它叫 kettle-beam。例如不同的數據源,有數據庫,文件,以及緩存等輸入進行合并。大家可以去 github 去看一下插件相應的安裝及使用說明。從圖中可以看出大部分 beam 的輸入輸出現在都是支持的。
https://github.com/mattcasters/kettle-beam
6. Runners

我們在看一下運行平臺,這是運行平臺支持度的截圖。例如不同的數據源,有數據庫,文件,以及緩存等輸入進行合并。
Runners 在 Beam Model 模型中有4個支持的維度:
- What,如何對數據進行計算?例如,機器學習中訓練學習模型可以用 Sum 或者 Join 等。在 Beam SDK 中由 Pipeline 中的操作符指定。
- Where,數據在什么范圍中計算?例如,基于 Process-Time 的時間窗口、基于 Event-Time 的時間窗口、滑動窗口等等。在 Beam SDK 中由 Pipeline 的窗口指定。
- When,何時輸出計算結果?例如,在 1 小時的 Event-Time 時間窗口中,每隔 1 分鐘將當前窗口計算結果輸出。在 Beam SDK 中由 Pipeline 的 Watermark 和觸發器指定。
- How,遲到數據如何處理?例如,將遲到數據計算增量結果輸出,或是將遲到數據計算結果和窗口內數據計算結果合并成全量結果輸出。在 Beam SDK 中由 Accumulation 指定。
① What

對數據如果處理,計算。分組的矩陣圖,提到這里說一下,這些運行平臺已經集成到 Beam,只是沒有更新到官方首頁而已。以及或者是官方不打算主推的,就沒有寫上去。
② Where

窗口處理矩陣能力圖,大家從圖中可以看出很多都是全部支持的。
③ When

對于事件處理,流計算引擎Apache Flink,Google Cloud ,Dataflow 以及 Jstorm 都支持性比較好。
④ How

最后是對遲到數據的數據處理能力矩陣圖。
7. FlinkRunner Beam

我們以最近兩年最火的 Apache Flink 為例子,幫大家解析一下 beam 集成情況。大家可以從圖中看出,flink 集成情況。

然后看一下,FlinkRunner 具體解析了哪些參數,以及代碼中怎樣設置。
8. Beam SQL

Apache Calcite 是一種保準 SQL 的解析器,用于大數據處理和一些流增強功能,基于它做 SQL 引擎的有很多,例如 spark,Cassandra,druid 和我們的 Beam。

我們看一下 Beam SQL 的設計思路:首先是我們寫的 SQL 語句,進行查詢解析,驗證來源的類型,數據格式,建一個執行計劃,然后通過優化,設計計劃規則或邏輯,封裝在 Beam 管道中,進行編譯器編譯,最后提交 job 到運行平臺執行。

表中是 beam SQL 和 Calcite 的類型支持度,是把 Calcite 進行映射。

Beam SQL 和 Apache Calcite 函數的支持度。里面有一些現在不支持的,需要大家做的時候多多關注,特別是架構師設計時候。

從圖中可以看出,首先要設置好數據類型,在設置數據,最后填充到管道數據集,最后做 SQL 的操作。其實這樣寫還是不方便的。有沒有很好的解決方式,有。大家繼續往下看…

Beam SQL 的擴展。Beam SQL 的 CREATE EXTERNAL TABLE 語句注冊一個映射到外部存儲系統的虛擬表 。對于某些存儲系統,CREATE EXTERNAL TABLE 在寫入發生之前不會創建物理表。物理表存在后,您可以使用訪問表 SELECT,JOIN 和 INSERT INTO 語句。通過虛擬表,可以動態的操作數據,最后寫入到數據庫就可以了。這塊可以做成視圖抽象的。
Create 創建一個動態表,tableName 后面是列名。TYPE 是數據來源的類型,限制支持 bigquery,pubsub,kafka,text 等。Location 下面為表的數據類型配置, 這里以 kafka 為例。
▌AloT PB 級實時數據,怎么構建自己的“AI微服務”?
在 AIoT 里面,實時性數據比較大,例如視頻分析,視頻挖掘,合規檢測,語音分析等等。130W 路的攝像頭每秒寫入300多 G 的視頻,一天就是 25PB,有人說可以晚上用批方式上數據,其實 AIoT 場景跟其他的場景是不一樣的,例如做智能兒童手表,我們晚上上報數據的頻度可以變低,白天兒童上學放學路上可以正常上報數據。AIoT 場景下攝像頭24小時監控的,并且寬帶主桿線都換成千兆光線,其實也支持不了每秒 300G 的實時寫入。我們是怎么處理呢?

首先在設計架構方案的時候,相信很多架構師都會這樣想,不想第一個去吃螃蟹,因為穩定性,安全性,及不確定性原因會導致整個項目的成敗。那我們看一下 Beam 有哪些大廠在使用。
知道他們使用 Beam ,咱們了解一下他們用 Beam 做了什么?例如:
- 使用 Apache Beam 進行大規模流分析
- 使用 Apache Beam 運行定量分析
- 使用 Apache Beam 構建大數據管道
- 從遷移到 Apache Beam 進行地理數據可視化
- 使用 Apache Beam & tf.Transform 對 TensorFlow 管道進行預處理
- 衛星圖像的土地利用分類
- 智慧城市大數據集成
- 平安城市及質量實時風控
- 電商平臺雙十一活動實時數據處理
國外的可以從官方網站上找到案例的原文,國內可以從新聞或者官方網站找到相應的案例。
在 AloT 場景下我們為什么會選擇 Beam 呢?
- 數據源可以適配,因為平安城市,雪亮工程數據源千奇百怪。
- 能夠進行數據多樣處理,連接,過濾,合并,拆分。
- 具有清洗臟數據功能,例如警情去重誤報警,合規檢測等。
- 具有大數據集群虛擬化部署功能,可擴展性,伸縮性。
- 具有實時處理和離線處理能力。
1. 案列系統架構圖

這是案例的總架構圖,底層是 Beam SDK,上層是抽象封裝的輸入輸出組件,以及清洗組件,類型管理,第三方 SDK,在往上層是組件配置管理,及版本控制,最上層是 jar 可視化配置,以及 SQL 可視化,最后把 jar 通過運維一體化平臺提交給執行引擎集群,當然這里有個解析器,是我們自己開發的。
2. 示例架構圖

以下為示例架構圖:
① 攝像頭以及 AI 智能設備產生的報警以及抓取的信息上報到后端智能設備。
② 智能設備產生的 AI 分析結果進行通過網關集群進行傳輸,注意網關集群地方要做流控及雪崩控制。
③ 消息通過網關集群發送到消息中間件。注意:這邊這個規則下發是針對前段的數據進行 ETL 清洗的清洗規則的下發。
④ Beam 集群接收下發規則的更新,并且根據規則進行數據清洗。
⑤ 對于文檔性的數據我們實時存儲到實時搜索引擎。
⑥ 需要復雜查詢,統計以及報表的數據存儲到 ClickHouse。
⑦ 進行 BI 套件的展示以及前端大屏幕的展示。
3. 示例代碼

核心示例代碼,首先創建管道工廠,然后顯示設置執行引擎,根據 SDKIO 進行讀取 kafka 的消息。

序列化消息,寫入 es 進行備份,因為 es 數據是 json 的寫入的時候首先要考慮轉換成 json 類型。這個地方我設置了一個編碼,實體類的編碼類型為 AvroCoder ,編碼類型是每個管道都要設置的。

把 kafka 的數據轉換成 row 類型,這里就是運用了管道設計中的流分支處理。

最后一步是寫入咱們的 clickhouse,大家可能對 clickhouse 不是很了解,這是俄羅斯的一家高科技公司研發的。查詢速度非???,比 Hive 快279倍,比 MySQL 快801倍的神器。
4. 示例效果展示
以下為寫入 es 的效果。這個字段寫入時候自動創建。


今天的分享就到這里,謝謝大家。
▌關于持續問題咨詢:
- Apache Beam 官方網站
https://beam.apache.org/
- Apache Beam 開源地址
https://github.com/apache/beam
- Apache Beam Example 地址
https://github.com/xsm110/Apache-Beam-Example

嘉賓介紹
張海濤,??低暯鹑谑聵I部架構師,國際注冊云安全系統認證專家。目前負責全國金融行業AI大數據的基礎架構工作,主導建設過云基礎平臺的架構設計及核心開發,并自研大數據組件獲得過國家發明專利。專注安防及 AloT 云計算大數據方向,是 Apache Beam 中文社區發起人之一及 ClickHouse 開源社區的核心開發人員。