如果你用谷歌或者百度進行搜索就會發現,當你在這些搜索引擎的框內鍵入某些內容時,它們可以根據輸入的內容智能展現輸入提示建議。本文作者正是帶著這樣的想法實現了一個具備類似功能的系統。
本文將展現如何設計一個大規模的自動完成輸入提示建議的系統,就像 google 搜索一樣,整個設計是用 Docker Compose 實現的,可以在這里找到源代碼:https://github.com/lopespm/autocomplete
系統要求
最終的系統需要適應類似 Google 的搜索規模,即每天約 50 億次搜索,也就是每秒鐘約 5.8 萬次查詢。我們可以預期這些搜索中有 20% ,也就是每天有 10 億次查詢。
如果我們選擇為這 10 億條查詢建立索引的話,平均每個查詢有 15 個字符【2】,每個字符有 2 個字節(我們將只支持英語設置),這反映在托管這些查詢所需的存儲空間大約為 30 GB。
功能要求
- 根據用戶輸入(前綴)獲取熱門的短語建議列表。
- 通過加權按給定短語 / 查詢的頻率和相似度對建議進行排序【3】。
主要的兩個 API 是:
- top-phrases(prefix) :返回給定前綴的熱門短語列表。
- collect-phrase(phrase) :將搜索到的短語提交給系統。稍后,匯編器將使用這個短語來構建數據結構,這個數據結構將前綴映射到熱門短語列表。
非功能性需求
- 高可用;
- 性能:熱門短語的響應時間應快于用戶的輸入速度(<200ms);
- 可擴展性 :系統應該能夠適應大量請求,同時保持性能;
- 持久性 :即使存在硬件故障或發生系統崩潰,先前搜索的短語(對于給定的時間跨度)也應該可用。
設計與實現
高級設計
兩個主要的子系統是:
- 分發服務器:負責處理用戶對給定前綴的熱門短語的請求。
- 匯編器:負責收集用戶搜索并將它們匯編成數據結構,稍后由分發服務器使用。
詳細設計
這個實現使用了現成的組件,如 Kafka(消息代理)、Hadoop(MapReduce 和分布式文件系統)、redis(分布式緩存)和 Nginx(負載平衡、網關、反向代理)等,但是也有用 Python 構建的定制服務,即 Trie 分發和構建服務,Trie 數據結構也是定制的。
這個實現中的后端服務被構建為可持續使用,不需要太多編排。例如,如果一個活動后端主機停止響應,則它對應的臨時節點 znode 注冊表最終會消失,而另一個備用后端節點將嘗試通過 zookeeper 上的 臨時節點 znode 聲明該位置來取代它的位置。
Trie:基礎數據結構
分發服務器使用并提供給分發服務器的數據結構是 Trie ( 譯注 :又稱前綴樹、字典樹,是一種有序樹,用于保存關聯數據),其每個前綴節點都有一個熱門短語列表。熱門短語是使用 享元模式(flyweight pattern)進行引用的,這意味著短語的字符串文字僅存儲一次。每個前綴節點都有一個熱門短語列表,這是對字符串文本的引用列表。
正如我們之前看到的,我們將需要大約 30 GB 來索引 10 億個查詢,這大約是上述 Trie 存儲 10 億個查詢所需的內存。由于我們希望將 Trie 保存在內存中,以便為給定的查詢啟用快速查找時間,因此,我們將 Trie 劃分為多個 Trie,每個 Trie 在不同的機器上進行。這一做法減輕了任何給定機器上的內存負載。
為了提高可用性,托管這些 Trie 的服務也將具有多個副本。為了提高持久性,Trie 的序列化版本將在分布式文件系統(HDFS)中可用,并且可以通過 MapReduce 作業以一種可預測的、確定性的方式重新構建。
信息流
匯編器:收集數據并匯編 Trie
1、客戶端通過 http://localhost:80/search?phrase="a user query" 將搜索到的短語提交到網關:
2、由于搜索服務器不在此實現的范圍內,網關通過 http://assembler.collector-load-balancer:6000/collect-phrase?phrase="a user query" 直接將搜索短語發送到收集器的負載均衡器:
3、收集器的負載均衡器通過 http://assembler.collector:7000/collect-phrase?phrase="a user query" 將請求轉發到其中一個收集器后端:
4、收集器后端向消息代理(Kafka)發送短語主題的消息。關鍵和價值在于短語本身【4】。
5、Kafka Connect HDFS Connector 匯編器。kafka-connect 將短語主題中的消息轉儲到 /phrases/1_sink/phrases/{30 minute window timestamp} 【5】文件夾【6】中。
6、 觸發 MapReduce 作業【7】:通過加權每個短語的新近度和頻率,它們將搜索的短語減少到一個單獨的 HDFS 文件中【8】。
- 根據當前時間生成一個 TARGET_ID ,例如: TARGET_ID=20200807_1517 。
- 第一個 MapReduce 作業針對 K【9】 最近的 /phrases/1_sink/phrases/{30 minute window timestamp 文件夾執行,并為這些文件夾中的每一個賦予一個基本權重(越近,則基本權重越高)。這個作業還將計算給定文件夾中相同短語的權重。生成的文件將存儲在 /phrases/2_with_weight/2_with_weight/{TARGET_ID} 文件夾中。
- 第二個 MapReduce 作業將把給定短語的所有權重從 /phrases/2_with_weight/2_with_weight/{TARGET_ID} 匯總到 /phrases/3_with_weight_merged/{TARGET_ID} 。
- 第三個 MapReduce 作業將通過遞減權重對條目進行排序,并將它們通過單個 Reducer,以生成單個文件。此文件放在中 /phrases/4_with_weight_ordered/{TARGET_ID} 。
- zookeper znode /phrases/assembler/last_built_target 被設置為 TARGET_ID 。
7、Trie Builder 服務正在監聽 / phrases/assembler/last_built_target zonde 中的更改,它基于 /phrases/4_with_weight_ordered/{TARGET_ID} 文件為每個分區【10】構建 Trie。例如,一個 Trie 可以覆蓋前綴直到 mod,另一個從 mod 到 racke,還有一個從 racke 開始。
- 每個 Trie 被序列化并寫入 /phrases/5_tries/{TARGET_ID}/{PARTITION} HDFS 文件(例如, /phrases/5_tries/20200807_1517/mod|racke ),而 zookeeper znode /phrases/distributor/{TARGET_ID}/partitions/{PARTITION}/trie_data_hdfs_path 被設置為前面提到的 HDFS 文件路徑。
- 該服務將 zookeper znode /phrases/distributor/next_target 設置為 TARGET_ID 。
將 Trie 轉移到分發服務器子系統
1、分發服務器后端可以處于活動模式(服務請求)或備用模式。處于備用模式的節點將獲取最近的 Trie,將它們加載到內存中,并將自己標記為準備接管活動位置。具體如下:
a. 備用節點在監聽對 znode /phrases/distributor/next_target 的更改時,檢測其修改并為每個每個分區創建一個 臨時的順序節點 znode,一次一個,位于 /phrases/distributor/{TARGET_ID}/partitions/{PARTITION}/nodes/ znode。如果創建的 znode 是第一個 R znode 之一(R 是每個分區的副本節點數【11】),繼續執行下一步。否則,從這個分區移除 znode 并嘗試加入下一個分區。
b. 備用后端節點從 /phrases/5_tries/{TARGET_ID}/{PARTITION} 獲取序列話的 Trie 文件,并開始將 Trie 加載到內存中。
c. 當 Trie 加載到內存時,備用后端節點通過將 /phrases/distributor/{TARGET_ID}/partitions/{PARTITION}/nodes/{CREATED_ZNODE} znode 設置為后端主機名,將自己標記為就緒。
2、Trie 后端應用程序服務輪詢 /phrases/distributor/{TARGET_ID}/ sub-znode ( TARGET_ID 是 /phrases/distributor/next_target 中定義的節點),并檢查是否所有分區的所有節點都標記為就緒。
a. 如果它們都為下一個 TARGET_ID 做好了準備,那么服務將在單個事務中將 /phrases/distributor/next_target znode 的值更改為空,并將 /phrases/distributor/current_target znode 設置為新的 TARGET_ID 。通過這一步驟,所有標記為就緒的備用后端節點現在都將處于活動狀態,并將用于以下分發服務器請求。
分發服務器:處理熱門短語的請求
當分發服務器的后端節點處于活動狀態并加載了它們各自的嘗試后,我們就可以開始為給定的前綴提供熱門短語請求:
1、客戶端通過 http://localhost:80/top-phrases?prefix="some prefix" 向網關請求給定前綴的熱門短語。
2、網關通過 http://distributor.load-balancer:5000/top-phrases?prefix="some prefix" 將此請求發送到分發服務器的負載均衡器。
、3 負載均衡器通過 http://distributor.frontend:8000/top-phrases?prefix="some prefix" 將請求轉發到其中一個前端。
4、前端服務器處理請求:
a. 前端服務檢查分布式緩存(redis)是否有這個前綴的條目【12】。如果是,則返回這些緩存的熱門短語,否則,繼續執行下一步。
b. 前端服務從 zookeeper( /phrases/distributor/{TARGET_ID}/partitions/ znode)獲取當前 TARGET_ID 的分區,并選擇與提供的前綴匹配的分區。
c. 前端服務從 /phrases/distributor/{TARGET_ID}/partitions/{PARTITION}/nodes/ znode 中隨機選擇一個 znode,并獲取其主機名。
d. 前端服務通過 http://{BACKEND_HOSTNAME}:8001/top-phrases="some prefix" 從選定的后端請求熱門短語。
e. 后端使用其相應的加載 Trie 返回給定前綴的熱門短語列表。
f. 前端服務將熱門短語列表插入到分布式緩存(緩存模式)中,并返回熱門短語。
- 熱門短語“響應”向用戶提供。
Zookeeper Znode 結構
注意:當系統運行時,請使用 shell 命令 docker exec -it zookeeper ./bin/zkCli.sh 查看當前 Zookeeper 的 znode。
- phrasesdistributorassemblerlast_built_target - 設置為 TARGET_IDdistributorcurrent_target - 設置為 TARGET_IDnext_target - 設置為 TARGET_ID{TARGET_ID} - 例如,20200728_2241partitions|{partition 1 end}trie_data_hdfs_path - 保存序列化的 Trie 的 HDFS 路徑nodes000000000000000000010000000002…{partition 2 start}|{partition 2 end} * …{partition 3 start}| * …
HDFS 文件夾結構
注意:當系統運行時,在瀏覽器中訪問 http://localhost:9870/explorer.html 來瀏覽當前的 HDFS 文件和文件夾。
- phrases1_sink - 搜索到的短語被轉儲到此處,分成 30 分鐘的時間塊。{e.g 20200728_2230}{e.g 20200728_2300}2_with_weight - 應用初始權重的短語,除以時間塊。{TARGET_ID}3_with_weight_merged - 所有時間塊的合并:具有最終權重的短語。{TARGET_ID}4_with_weight_ordered - 按權重遞減順序排列的單個短語文件。{TARGET_ID}5_tries - 序列化 Trie 的存儲。{TARGET_ID}|{partition 1 end}{partition 2 start}|{partition 2 end}{partition 3 start}|
客戶端交互
你可以通過在瀏覽器中訪問 http://localhost 與系統進行交互。當你輸入一個查詢時,系統會提供搜索建議,可以通過提交更多搜索來輸入更多查詢或短語到系統中。
源代碼
你可以在 https://github.com/lopespm/autocomplete 上獲得完整的源代碼。
尾注
【1】 因為這個實現的主要目標是以簡單的方式構建和共享系統,所以使用 Docker compose 代替了像 Kubernetes 或 Docker Swarm 這樣的容器編排工具。
【2】 搜索查詢的平均長度為 2.4 個詞,英語中的平均詞長為 4.7 個字符。
【3】 在本文中, 短語 和 查詢 是可以互換使用。不過,在系統內部中,只使用 短語 這一術語。
【4】 在這個實現中,為清楚起見,只使用了代理的一個實例。但是,對于大量的傳入請求,最好將該主題分為多個實例(消息將根據 短語 鍵進行分區),以便分配負載。
【5】 ** /phrases/1_sink/phrases/{30 minute window timestamp} 文件夾 :例如,假設消息 A[time: 19h02m] [time: 19h25m],C[time: 19h40m],消息 A 和 B 將放入文件夾 /phrases/1_sink/phrases/20200807_1900,而消息 C 將被放入文件夾 /phrases/1_sink/phrases/20200807_1930。
【6】 在將這些消息傳遞給 Hadoop 之前,我們還可以將它們預先聚合到另一個主題中(使用 Kafka Streams)。
【7】 為清楚起見,在這個實現中,MapReduce 作業是通過 make do_mapreduce_tasks 手動觸發的,但是在生產環境中,它們可以通過 cron job 每 30 分鐘觸發一次。
【8】 可以添加一個額外的 MapReduce 來將 /phrases/1_sink/phrases/ 文件夾聚合為更大的時間 timespan 聚合(如 1 天,5 周,10 天等)。
【9】 可在 assembler/hadoop/mapreduce-tasks/do_tasks.sh 中通過變量 MAX_NUMBER_OF_INPUT_FOLDERS 進行配置。
【10】 分區在 assembler/trie-builder/triebuilder.py 中定義。
【11】 每個分區的副本節點數是通過 docker-compose.yml 中的環境變量 NUMBER_NODES_PER_PARTITION 配置的。
【12】 在這個實現中,默認情況下分布式緩存是禁用的,因此,對于首次使用這個代碼庫的人來說,可以更清楚地了解每個更新 / 步驟中發生了什么。分布式緩存可以通過 docker-compose.yml 中的環境變量 DISTRIBUTED_CACHE_ENABLED 來啟用。
原文鏈接:
https://lopespm.github.io/2020/08/03/implementation-autocomplete-system-design.html