目錄
- 導(dǎo)語
- 作者簡(jiǎn)介
- 實(shí)踐 1:大流量場(chǎng)景下的 K8s 部署實(shí)踐
- 實(shí)踐 2:非持久化 Topic 的應(yīng)用
- 實(shí)踐 3:負(fù)載均衡與 Broker 緩存優(yōu)化
- 實(shí)踐 4:COS Offloader 開發(fā)與應(yīng)用
- 未來展望與計(jì)劃
導(dǎo)語
本文整理自 8 月 Apache Pulsar Meetup 上,劉燊題為《Apache Pulsar 在微信的大流量實(shí)時(shí)推薦場(chǎng)景實(shí)踐》的分享。本文介紹了微信團(tuán)隊(duì)在大流量場(chǎng)景下將 Pulsar 部署在 K8s 上的實(shí)踐與優(yōu)化、非持久化 Topic 的應(yīng)用、負(fù)載均衡與 Broker 緩存優(yōu)化實(shí)踐與COS Offloader 開發(fā)與應(yīng)用。
作者簡(jiǎn)介
劉燊,騰訊微信高級(jí)研發(fā)工程師,Apache Pulsar Contributor。
在通信社交領(lǐng)域,微信已經(jīng)成為國內(nèi)當(dāng)之無愧的社交霸主。用戶人數(shù)在 2018 年突破了 10 億,截至 2021 年第三季度末,微信每月活動(dòng)賬戶總數(shù)已達(dá)到 12.6 億人,可以說,微信已經(jīng)成為國人生活的一部分。
微信的業(yè)務(wù)場(chǎng)景包括推薦業(yè)務(wù)、風(fēng)控、監(jiān)控系統(tǒng)、AI 平臺(tái)等。數(shù)據(jù)通過 SDK 和數(shù)據(jù)采集方式接入,經(jīng)由 MQ、Kafka、Pulsar 消息中間件,其中 Pulsar 發(fā)揮了很大的作用。中間件下游接入數(shù)據(jù)計(jì)算層 Hadoop、Spark、Flink、ClickHouse、TensorFlow 等計(jì)算平臺(tái),由于本次介紹實(shí)時(shí)推薦場(chǎng)景,因此較多使用 Flink 和 TensorFlow。落地存儲(chǔ)平臺(tái)則包括 HDFS、HBase、Redis 以及各類自研 KV。
團(tuán)隊(duì)選型 Pulsar 的初期目標(biāo)是獲得一個(gè)滿足大數(shù)據(jù)流量場(chǎng)景并且運(yùn)維管理便捷的消息隊(duì)列系統(tǒng)。最終選擇 Pulsar 的主要原因有五點(diǎn):
- 在騰訊自研上云的大背景下,團(tuán)隊(duì)非常看重云原生特性。Pulsar 的云原生特性,包括分布式、彈性伸縮、讀寫分離等都體現(xiàn)出優(yōu)勢(shì)。Pulsar 邏輯層 Broker 無狀態(tài),直接提供服務(wù)。存儲(chǔ)層 Bookie 有狀態(tài),但是節(jié)點(diǎn)對(duì)等,且 Bookie 自帶多副本容災(zāi);
- Pulsar 支持資源隔離,可以軟隔離或硬隔離,避免不同業(yè)務(wù)之間互相影響;
- Pulsar 支持靈活的 Namespace/Topic 策略管控,對(duì)集群的管理和維護(hù)有很大幫助;
- Pulsar 能夠便捷擴(kuò)容,邏輯層 Broker 的無狀態(tài)和負(fù)載均衡策略允許快速擴(kuò)容,存儲(chǔ)層 Bookie 節(jié)點(diǎn)之間互相對(duì)等也便于快速擴(kuò)容,可以輕松應(yīng)對(duì)流量暴漲場(chǎng)景;
- Pulsar 具備多語言客戶端能力,微信的業(yè)務(wù)場(chǎng)景中涉及 C/C++、TensorFlow、Python 等語言,Pulsar 可以滿足需求。
實(shí)踐 1:大流量場(chǎng)景下的 K8s 部署實(shí)踐
微信團(tuán)隊(duì)使用了 Pulsar 官網(wǎng)提供的 K8s Helm chart 部署方式。
原生部署架構(gòu)中,流量從 Proxy 代理層進(jìn)入,經(jīng)過 Broker 邏輯服務(wù)層寫入 Bookie 存儲(chǔ)層。Proxy 代理層代理客戶端和 Broker 之間的連接,Broker 層管理 Topic,Bookie 層負(fù)責(zé)持久化消息存儲(chǔ)。在上圖中,入流量和出流量分別用 In 和 Out 進(jìn)行標(biāo)記,Replica 是配置的副本。
在應(yīng)用的過程中團(tuán)隊(duì)發(fā)現(xiàn)了兩個(gè)問題:首先 Proxy 代理了 Pulsar 客戶端的請(qǐng)求,導(dǎo)致 Broker 無法獲取客戶端 IP,增加了運(yùn)維難度;其次,當(dāng)集群流量較大時(shí),集群內(nèi)部帶寬會(huì)成為瓶頸。上圖架構(gòu)內(nèi),集群入流量為 (2+ 副本數(shù))倍;出流量最大為 3 倍,Consumer、Proxy、Broker 和 Bookie 間分別有一倍流量,但是僅極端情況下流量會(huì)全量從 Bookie 流出。假設(shè)出入流量都是 10 GBps,副本數(shù)為 3,集群內(nèi)入流量會(huì)放大為 50 GBps,出流量會(huì)放大為 30 GBps。另外默認(rèn)情況下 Proxy 服務(wù)只有一個(gè)負(fù)載均衡器承載所有流量,壓力巨大。
這里可以看出瓶頸主要出現(xiàn)在 Proxy 層,該層造成了很大流量浪費(fèi)。而 Pulsar 實(shí)際上支持 Broker 直連,因此團(tuán)隊(duì)在此基礎(chǔ)上進(jìn)行了一些優(yōu)化:
團(tuán)隊(duì)利用了騰訊云 K8s 集群的能力,給 Broker 配置了彈性網(wǎng)卡,并使 Broker 的 IP 直接暴露在集群外,可以被外部客戶端直接訪問。Broker 服務(wù)也配置了負(fù)載均衡器。這樣客戶端可以直接訪問負(fù)載均衡器 IP,再經(jīng)過 Pulsar 內(nèi)部協(xié)議的 Lookup 操作找到要訪問的 Topic 所處的 Broker。由此節(jié)省了 Proxy 帶來的額外帶寬消耗。
團(tuán)隊(duì)在 K8s 部署方面還做了以下優(yōu)化工作:
- 如上文所述去 Proxy;
- Bookie 使用多盤多目錄 + 本地 SSD 提升性能,由于原社區(qū)版本 Pulsar 不支持多盤多目錄,這里團(tuán)隊(duì)做了改進(jìn)支持并合并入社區(qū)(github.com/apache/puls…);
- 日志采集使用騰訊云 CLS(日志服務(wù)),統(tǒng)一的日志服務(wù)可以簡(jiǎn)化分布式多節(jié)點(diǎn)系統(tǒng)的運(yùn)維、問題查詢操作;
- 指標(biāo)采集使用 Grafana + Kvass + Thanos,默認(rèn)指標(biāo)采集使用了單機(jī)服務(wù),很快出現(xiàn)了性能瓶頸,優(yōu)化后問題解決且支持水平擴(kuò)容。
實(shí)踐 2:非持久化 Topic 的應(yīng)用
生產(chǎn)者和消費(fèi)者是同 Broker 中的 Dispatcher 模塊交互的,而持久化 Topic 中生產(chǎn)者數(shù)據(jù)會(huì)通過 Dispatcher 進(jìn)入 Managed Ledger 模塊,再調(diào)用 Bookie 客戶端與 Bookie 交互。非持久化 Topic 中數(shù)據(jù)不會(huì)進(jìn)入 Managed Ledger,而是直接發(fā)送給消費(fèi)者。在大流量場(chǎng)景中,非持久化 Topic 由于不需要與 Bookie 交互,對(duì)集群的帶寬壓力會(huì)明顯降低。
非持久化 Topic 在大流量實(shí)時(shí)推薦場(chǎng)景中有應(yīng)用,但具體的應(yīng)用場(chǎng)景必須滿足“可容忍少量數(shù)據(jù)丟失”的要求。實(shí)踐中有三種場(chǎng)景滿足這一要求:
- 大流量 + 消費(fèi)端處理能力不足的實(shí)時(shí)訓(xùn)練任務(wù);
- 時(shí)效性敏感的實(shí)時(shí)訓(xùn)練任務(wù);
- 抽樣評(píng)測(cè)任務(wù)。
實(shí)踐 3:負(fù)載均衡與 Broker 緩存優(yōu)化
以上是一個(gè)線上真實(shí)的場(chǎng)景。生產(chǎn)環(huán)境中出現(xiàn)了反復(fù) bundle unload 的問題,導(dǎo)致 Broker 負(fù)載反復(fù)波動(dòng)。
該場(chǎng)景中使用了以下負(fù)載均衡配置:
loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder loadBalancerBrokerThresholdShedderPercentage=10 loadBalancerBrokerOverloadedThresholdPercentage=70 Load bundle處理類(select for broker):org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate
如上圖,假設(shè)三個(gè) Broker 平均負(fù)載是 50%,則閾值就是 60%,超出 60% 的部分需要均衡。但實(shí)際應(yīng)用中發(fā)現(xiàn) Broker 1 的多余 20% 負(fù)載會(huì)卸載到 Broker 2 上,之后由于 Broker 2 超載所以又會(huì)卸載下來,還會(huì)回到 Broker 1 上。結(jié)果流量就在 Broker 1 和 Broker 2 上反復(fù)橫跳。
跟蹤代碼發(fā)現(xiàn),Load Bundle 處理類是根據(jù) Broker 的消息量判斷該承載多余流量的 Broker,但生產(chǎn)中消息量與機(jī)器負(fù)載并不完全正相關(guān),且 Threshold shedder 是根據(jù) CPU、出入流量、內(nèi)存等多種指標(biāo)平均加權(quán)得出 Broker 負(fù)載,所以 bundle 的加載和卸載邏輯并不一致。
對(duì)此團(tuán)隊(duì)進(jìn)行了代碼優(yōu)化改進(jìn):
loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder loadBalancerBrokerThresholdShedderPercentage=10 loadBalancerBrokerOverloadedThresholdPercentage=70 Load bundle處理類(select for broker):在低于平均負(fù)載的broker中隨機(jī)選擇 loadBalancerDistributeBundlesEvenlyEnabled=false (相同的代碼實(shí)現(xiàn):PR-16059)
優(yōu)化后的效果如下,可以看到集群流量穩(wěn)定許多:
團(tuán)隊(duì)還在實(shí)時(shí)推薦場(chǎng)景下優(yōu)化了 Broker 緩存。這種場(chǎng)景有以下特征:
- 消費(fèi)任務(wù)數(shù)量眾多;
- 消費(fèi)速度參差不齊;
- 消費(fèi)任務(wù)經(jīng)常重啟。
對(duì)此,社區(qū)原有的 Broker 緩存邏輯效果不佳。以下是 Broker 緩存的原有驅(qū)逐邏輯:
void doCacheEviction(long maxTimestamp) { if (entryCache.getsize() <= 0) { return; } // Always remove all entries already read by active cursors PositionImpl slowestReaderPos = getEarlierReadPositionForActiveCursors); if (slowestReaderPos != null) { entryCache.invalidateEntries(slowestReaderPos): } // Remove entries older than the cutoff threshold entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp); }
默認(rèn)策略會(huì)找出當(dāng)前消費(fèi)不活躍(由閾值控制,Cursor 消費(fèi)的 entry 超過閾值即被認(rèn)為是不活躍)的 Cursor,對(duì) Cursor 之前的數(shù)據(jù)做驅(qū)逐。對(duì)此,騰訊工程師向社區(qū)提交了代碼改進(jìn):
void doCacheEviction (long maxTimestamp){ if (entryCache.getSize() (= 0) { return; ) PositionImpl evictionPos; if (config.isCacheEvictionByMarkDeletedPosition()){ evictionPos=getEarlierMarkDeletedPositionForActiveCursors().getNext(); } else { // Always remove all entries already read by active cursors evictionPos=getEarlierReadPositionForActiveCursors(); } if (evictionPos != null) { entryCache.invalidateEntries(evictionPos); } // Remove entries older than the cutoff threshold entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp); }
這里將選擇非活躍 Cursor 的邏輯改成了尋找需要?jiǎng)h除的數(shù)據(jù)位置。這樣消費(fèi)速度相對(duì)較慢的數(shù)據(jù)就不會(huì)穿越到 Bookie 中增加集群壓力,只要數(shù)據(jù)有 Backlog 就會(huì)被緩存。但這種方法會(huì)導(dǎo)致緩存空間吃緊,因?yàn)橄M(fèi)任務(wù)重啟期間仍舊要無意義地保留緩存,占用緩存空間。
對(duì)此微信團(tuán)隊(duì)在社區(qū)改進(jìn)的基礎(chǔ)上又做了調(diào)整:
void doCacheEviction(long maxTimestamp){ if (entryCache.getSize() <= 0) { return; } if (factory.getConfig().isRemoveReadEntriesInCache()){ PositionImpl evictionPos; if (config.isCacheEvictionByMarkDeletedPosition()){ PositionImplearlierMarkDeletedPosition=getEarlierMarkDeletedPositionForActiveCursors(); evictionPos = earlierMarkDeletedPosition != null? earlierMarkDeletedPosition.getNext() : null; } else { // Always remove all entries already read by active cursors evictionPos=getEarlierReadPositionForActiveCursors(); } if (evictionPos != null) { entryCache.invalidateEntries(evictionPos); } } //Remove entries older than the cutoff threshold entryCache.invalidateEntriesBeforeTimestamp(maxTimestamp); }
這里簡(jiǎn)單地將一定時(shí)間內(nèi)的數(shù)據(jù)緩存到 Broker 中,有效提升了場(chǎng)景中的緩存效率:
實(shí)踐 4:COS Offloader 開發(fā)與應(yīng)用
Pulsar 提供了分層存儲(chǔ)能力,可以將存儲(chǔ)轉(zhuǎn)移到廉價(jià)的存儲(chǔ)層。Pulsar Offloader 可以將超過一定時(shí)長(zhǎng)的 Ledger 搬運(yùn)到遠(yuǎn)端存儲(chǔ),不再停留在 Bookie 層,由 Broker 接管這部分的數(shù)據(jù)管理。
團(tuán)隊(duì)使用 Pulsar Offloader 的原因有:
- Bookie Journal/Ledger 盤都使用 SSD,成本較高;
- 業(yè)務(wù)需求存儲(chǔ)時(shí)間長(zhǎng)、數(shù)據(jù)存儲(chǔ)量大;
- 數(shù)據(jù)消費(fèi)任務(wù)異常,需要容忍較長(zhǎng)時(shí)間的數(shù)據(jù) Backlog;
- 數(shù)據(jù)回放需求。
Pulsar 社區(qū)版本并不支持騰訊云對(duì)象存儲(chǔ)(COS),所以團(tuán)隊(duì)開發(fā)了內(nèi)部云上 COS Offloader 插件并應(yīng)用于線上。
未來展望與計(jì)劃
團(tuán)隊(duì)在部署與使用過程中一直和社區(qū)密切溝通,團(tuán)隊(duì)未來計(jì)劃跟進(jìn)社區(qū)版本升級(jí)與 bug 修復(fù)。微信團(tuán)隊(duì)將著重參與一些特性,比如 PIP 192 Broker 負(fù)載均衡與緩存優(yōu)化,計(jì)劃重構(gòu)負(fù)載均衡器;PIP 180 通過影子 Topic 解決讀放大問題,幫助精細(xì)化管理 Topic。微信團(tuán)隊(duì)也在關(guān)注 Pulsar 生態(tài)進(jìn)展,如 Flink、Pulsar、數(shù)據(jù)湖全鏈路打通。
以上就是Apache Pulsar 微信大流量實(shí)時(shí)推薦場(chǎng)景下實(shí)踐詳解的詳細(xì)內(nèi)容,更多關(guān)于Apache Pulsar微信大流量推薦的資料請(qǐng)關(guān)注其它相關(guān)文章!