導讀:Apache Flink 是一個分布式大數據處理引擎,可對有限數據流和無限數據流進行有狀態計算。可部署在各種集群環境,對各種大小的數據規模進行快速計算。
滴滴基于 Apache Flink 做了大量的優化,也增加了更多的功能,比如擴展 DDL、內置消息格式解析、擴展 UDX 等,使得 Flink 能夠在滴滴的業務場景中發揮更大的作用。本文中,滴滴出行實時計算負責人、高級技術專家梁李印分享了 Apache Flink 在滴滴的應用與實踐。
主要內容包括:
- 服務化概述
- StreamSQL 實踐
- 平臺化建設
- 挑戰及規則
1. 滴滴大數據服務架構
滴滴基于開源的生態構建了比較完整的大數據體系,包括離線、實時系統,如 HBase 生態、數據檢索 Elastic Search、消息隊列 Kafka 等。在 Flink 基礎上滴滴主要發展 StreamSQL,之后會有詳細介紹。
2. 滴滴流計算發展歷程
在2017年之前,滴滴流計算主要依靠業務方自建小集群的方式,技術選型也多種多樣,包括 Storm、jstrom、Spark、Samza 等。2017年開始進行業務收斂,保留了8個 Spark Streaming 并構建了一個平臺化、服務化的大集群,并且引入了 Flink。引入 Flink 的原因是部分業務對實時性要求較高,Spark Streaming 無法支持。2018年滴滴構建了基于 Flink SQL 的名為 StreamSQL 的 SQL 化服務,并且使用 Flink CEP 解決了一些網約車實時運營問題。2019年,滴滴完成了流計算引擎的統一,絕大部分任務以 Flink 為基礎,通過 StreamSQL 開發流計算任務成為主流開發方式,達到了50%以上。
3. 滴滴流計算業務規模和場景
在業務規模方面,目前滴滴流計算服務業務線達到50多個,集群規模在千級別,流計算任務數達到3000+,每天處理的數據量達到萬億條。
在業務場景上,主要包括以下四類:
實時監控:實時監控包括交易指標監控、導航及 POI 準確率監控、業務健康度監控 ( 例如業務壓測中的水位線、當前水位同水位線的實時差距監控 ) 和車輛網監控等。
實時同步:實時同步主要作用是把數據實時地從一個地方轉移到另一個地方,數據包括業務日志、數據庫日志、軌跡數據、埋點數據。軌跡數據放在 HBase。
實時特征:實時特征是比較關鍵的業務,它會影響派單,例如派單的導航和準確性。這些特征包括司機乘客特征、上下車特征、導航軌跡特征、工單特征。滴滴每天的客戶量在百萬級別,如果檢測到高危,需要立刻觸發報警和客服介入。
實時業務:實時業務會影響業務行為,包括司乘位置語義同步 ( 接單過程中司機可以實時知道乘客位置變化、乘客也可以知道司機位置變化 )、異常停留監測、高危行程監測、個性化發券、路線偏移監測等。
4. 滴滴流計算多集群體系
滴滴隨著業務發展機房越來越多,為了更好地管理,對業務提供統一視圖,滴滴在集群體系做了三方面的改進。
- 在 YARN 的基礎上構建了路由層。路由層的職責是屏蔽多個物理集群,對業務方提供單一的邏輯集群。通過 YARN 上 queue 的劃分來決定業務運行在機房的不同集群上。
- 在物理集群內部劃分 label,通過 label 可以進行隔離,專門服務那些重要的不希望受到其他業務影響的業務。
- 同時定制了 YARN 調度器。由于實時和離線業務調度差異較大,所以兩類業務調度完全分開。對于離線業務,希望盡可能把機器資源全部應用起來,吞吐越大越好。而實時業務對均衡性要求更高,所以將調度改為基于 CPU 調度,并且可以智能過濾繁忙節點 ( 如 CPU 使用較高的節點 ),也做了動態資源推薦,并將推薦值告知用戶。
1. StreamSQL 的優勢
StreamSQL 是在 Flink SQL 基礎上做一些完善后形成的一個產品。使用 StreamSQL 具有多個優勢:
- 描述性語言:業務方不需要關心底層實現,只需要將業務邏輯描述出來即可。
- 接口穩定:Flink 版本迭代過程中只要 SQL 語法不發生變化就非常穩定。
- 問題易排查:邏輯性較強,用戶能看懂語法即可調查出錯位置。
- 批流一體化:批處理主要是 HiveSQL 和 Spark SQL,如果 Flink 任務也使用 SQL 的話,批處理任務和流處理任務在語法等方面可以進行共享,最終實現一體化的效果。
- 入門門檻低:StreamSQL 的學習入門的門檻比較低,因此受到了廣大開發者的歡迎。
2. StreamSQL 相對于 Flink SQL 的完善
完善 DDL:
包括上游的消息隊列、下游的消息隊列和各種存儲如 Druid、HBase 都進行了打通,用戶方只需要構建一個 source 就可以將上游或者下游描述出來。
內置消息格式解析:
用戶消費數據后需要將數據進行提取,但數據格式往往非常復雜,如數據庫日志 binlog,每個用戶單獨實現,難度較大。StreamSQL 將提取庫名、表名、提取列等函數內置,用戶只需創建 binlog 類型 source。并內置了去重能力。
對于 business log 業務日志 StreamSQL 內置了提取日志頭,提取業務字段并組裝成 Map 的功能。對于 json 數據,用戶無需自定義 UDF,只需通過 jsonPath 指定所需字段。
擴展 UDX:
豐富內置 UDX,如對 JSON、MAP 進行了擴展,這些在滴滴業務使用場景中較多。支持自定義 UDX,用戶自定義 UDF 并使用 jar 包即可。兼容 Hive UDX,例如用戶原來是一個 Hive SQL 任務,則轉換成實時任務不需要較多改動,有助于批流一體化。
Join 能力:
① 基于 TTL 的雙流 join:
在滴滴的流計算業務中有的 join 操作數據對應的跨度比較長,例如順風車業務發單到接單的時間跨度可能達到一個星期左右,如果這些數據的 join 基于內存操作并不可行,通常將 join 數據放在狀態中,窗口通過 TTL 實現,過期自動清理。
② 維表 join 能力:
維表支持 HBase、KVStore、MySQL 等,同時支持 inner、left、right、full join 等多種方式。
1. StreamSQL IDE
滴滴對于 StreamSQL 構建了 StreamSQL IDE,除了基本的 StreamSQL editor 外,還主要包含多個其他功能:
- SQL 模板:如果用戶想要開發流式 SQL 時不需要從零開始,只需要選擇一個 SQL 模板,并在這個模板之上進行修修改改即可達到期望的結果。
- UDF 函數說明:StreamSQL IDE 還提供了 UDF 的庫,相當于一個庫如果不知道具有什么含義以及如何使用,用戶只需要在 IDE 上搜索到這個庫,就能夠找到使用說明以及使用案例。
- 語法檢測與智能提示:用戶輸入 DB 名字可以顯示表名,對錯誤語法提示。
- DEBUG:在線 DEBUG 能力,可以上傳本地測試數據或者采樣少量 Kafka 等 source 數據 debug,此功能對流計算任務非常重要。
- 版本管理:因為業務版本需要不斷升級,而升級時也可能需要回退,因此 StreamSQL IDE 也提供了版本管理功能。
2. 任務管控
滴滴的所有流計算全部是通過 Web 化入口進行提交,提供了整個任務生命周期管理,包括任務提交、任務停止、任務升級和回滾。同時只需要在 web 化服務臺進行參數修改即可實現對內置參數 ( 如 task manager memory 等 ) 進行調優。
3. 任務運維
任務運維主要分為四個方面:
日志檢索:Flink UI 上查詢日志體驗非常糟糕,滴滴將 Flink 任務日志進行了采集,存儲在 ES 中,通過 WEB 化的界面進行檢索,方便調查。
指標監控:Flink 指標較多,通過 Flink UI 查看體驗糟糕,因此滴滴構建了一個外部的報表平臺,可以對指標進行監控。
報警:報警需要做一個平衡,如重啟報警有多類如 ( 機器宕機報警、代碼錯誤報警 ),通過設置一天內單個任務報警次數閾值進行平衡,同時也包括存活報警 ( 如 kill、start )、延遲報警、重啟報警和 Checkpoint 頻繁失敗報警 ( 如 checkpoint 周期配置不合理 ) 等。
血緣追蹤:實時計算任務鏈路較長,從采集到消息通道,流計算,再到下游的存儲經常包括4-5個環節,如果無法實現追蹤,容易產生災難性的問題。例如發現某流式任務流量暴漲后,需要先查看其消費的 topic 是否增加,topic 上游采集是否增加,采集的數據庫 DB 是否產生不恰當地批量操作或者某個業務在不斷增加日志。這類問題需要從下游到上游、從上游到下游多方向的血緣追蹤,方便調查原因。
4. Meta 化建設
對比批處理任務,流計算 Flink 任務需要先定義好 Source、Sink,需要先定義好 MetaStore,因此滴滴目前正在做實時 Meta,將實時數據如 Kafka 的數據流定義成實時表,存儲在 MetaStore 中,用戶在 IDE 中只需要寫 DML ( 數據操縱語言 Data Manipulation Language ) 語句,系統在執行時自動填補 DDL ( 數據定義語言 Data Definition Language ) 語句,將完整的 StreamSQL 提交到 Flink 中去,該工作可以極大的降低 Flink 的使用門檻。
5. 批流一體化
雖然 Flink 具備批流一體化能力,但滴滴目前并沒有完全批流一體化,希望先從產品層面實現批流一體化。通過 Meta 化建設,實現整個滴滴只有一個 MetaStore,無論是 Hive、Kafka topic、還是下游的 HBase、ES 都定義到 MetaStore 中,所有的計算引擎包括 Hive、Spark、Presto、Flink 都查詢同一個 MetaStore,實現整個 SQL 開發完全一致的效果。根據 SQL 消費的 Source 是表還是流,來區分批處理任務和流處理任務,從產品層面上實現批流一體化效果。
1. 面臨的挑戰
大狀態管理:
- Flink 作為一個有狀態的計算引擎,狀態有時會非常大,在記錄 checkpoint 過程中需要數據線對齊,磁盤 IO 變大,導致機器負載增大,checkpoint 效率的高低會影響服務穩定性。
- 目前 checkpoint 是一個黑盒,如何做狀態診斷是一個挑戰。
- 通過內置系統解決了上游不重復問題,但 Flink 本身問題沒有解決,希望構建一個端到端的 Exactly Once。
業務高可用:
- 滴滴很多內部業務是通過 golang 或者 JAVA 開發,遷移到 Flink 后,可以解決容錯問題、拓展問題、算法模型問題等。在升級時業務不可停,需要實現透明升級。
- 快速診斷解決問題。
- 資源伸縮,如滴滴的早晚高峰時流量突增情況下如何保持系統穩定。
多語言:
- 雖然今天在滴滴大部分實時任務都是通過 SQL 來開發的,但是依舊不能100%覆蓋全部的場景,有些場景下是需要寫代碼的。Flink 提供了 Java 和 Scala 這兩種 API,但這對于業務人員而言依然是不夠的,因為業務大部分是 Go 語言系或者 Python 語言系的,因此滴滴希望根據社區來提供多語言的開發 Flink 的能力,比如寫 SQL,而 UDF 也可以通過多語言來開發。
2. 未來規劃
- 提供高可用的流計算服務:使 Flink 具備支持完整線上業務能力的機制。
- 探索實時機器學習:借助 Flink 已經具備了10-15分鐘的模型更新能力,接下來希望實現秒級別的模型更新。
- 實時數倉:目前的數倉系統大部分還是 T+1 級別,如何構建實時數倉,得到實時化報表,同時口徑和離線保持一致,實現實時數據和離線數據互補。例如最長保存3個月的實時存儲系統在3個月后將數據搬至離線倉庫時,和離線產生數據保持一致,是一個較大的挑戰和希望。