作者 | Noz
編譯 | 王瑞平
本篇文章主要介紹了Rust中流處理的概念、方法和優化。作者不僅介紹了流處理的基本概念以及Rust中常用的流處理庫,還使用這些庫實現了一個流處理程序。
最后,作者介紹了如何通過測量空閑和阻塞時間來優化流處理程序的性能,并將這些內容同步至Twitter和blog。
圖片
此外,作者還提供了一些其它方面的優化建議,例如:
- 在實際系統中,應考慮將線程固定至CPU內核上或使用一種版本的綠色線程減少上下文切換。
- 在處理流時,通常需要為結果分配內存。內存分配是昂貴的,所以,在以后的文章中,作者將會介紹一些優化內存分配的好方法。
首先,分別介紹下在同步和異步Rust中的流特質。
一、同步和異步Rust中的流特質
在同步Rust中,流核心抽象是Iterator。它提供了在序列中產生項的方法并在它們之間進行阻塞,然后,通過將迭代器傳遞給其它迭代器的構造函數完成組合。這使我們可以毫不費力地將事物連接在一起。
在異步Rust中,流核心抽象是Stream。它的行為與Iterator非常相似;但是,它并不是在每個項之間產生的阻塞,而是允許其它任務在阻塞等待時運行。
在異步Rust與同步Rust中,Read和Write分別對應AsyncRead和AsyncWrite。這些特質表明:未解析的字節通常直接來自10層(例如,來自套接字或文件)。
圖片
Rust流吸收了其它語言所具備的最佳功能;例如,它們能通過利用Rust特質系統回避Node.js的Duplex流中出現的遺留問題,也能同時實施背壓和惰性迭代,大大提升了效率。最重要的是,Rust流允許使用相同類型的異步迭代。
未來,關于Rust流還有很多值得關注之處,盡管仍有一些問題亟待解決。
二、總體概括:什么是流處理?
現在,也許你已經了解到了同步和異步Rust中的流特質,下面再來介紹下什么是“流處理”。
“流處理”是一種重要的大數據處理手段,其主要特點是處理的數據是源源不斷且實時到來的。
在不同規模的科技公司中,流處理通常被用于分析和處理具體事件,且常被應用于分布式系統。
有些領域確實會大量使用“流處理”手段,包括:視頻處理和高頻交易。我們也能夠借此尋找到新型區塊鏈之中的架構靈感。因為,區塊鏈需要處理交易和元數據流等。
如今,你可以租用具有100多個CPU的內核、100GB內存、多個GPU和100Gbps帶寬的AWS實例,還無需擁有一個節點的分布式系統。
現在,讓我們了解下流處理在Rust編程中的應用:
三、舉個例子:計算10億個數字的哈希程序
現在,讓我們寫一個用來計算10億個數字的SHA512和BLAKE3哈希程序吧!你可以想象:數字代表交易、分析事件或價格信號。散列法可用來表示對這些輸入的任意轉換。
如下是單線程解決方案程序:
圖片
當我在帶有專用CPU和16核的Digital Ocean上用發布模式運行此程序時,只需6分鐘多一點。
圖片
1.通道
現在,讓我們用“流處理”來重寫這個程序。與在單個循環中執行散列不同,我們將設置一個線程管道并行執行散列,然后收集結果。
在兩個線程之間發送數據的本地流被稱為通道。我們的新程序將生成四個線程。生成器線程將生成數字并同時將它們發送至兩個不同的哈希線程。散列線程將讀取這些數字,分別對它們進行散列,然后將它們的輸出發送給結果線程,下圖是它的架構:
圖片
我們也將使用標準庫中的mpsc通道發送和接收數據。mpsc可用來表示“多生產者-單消費者”,代表你可以從多個線程向通道發送數據,但是,只有一個管道能夠輸出數據。雖然我們不會使用這個多制作人功能,但是了解這一點很重要。
它仍是一個相當簡單的程序:
圖片
輸出結果如下:
圖片
哦!帶通道的新版本花費了兩倍時間,這是怎么了?
2.環形緩沖器
你可以用火焰圖進行測試,但還是省省時間吧!
無論多小,所有通道庫的構建都會產生額外的費用,并行化所帶來的好處必須大于此種開銷,才能保證系統正常運作。這種情況下的瓶頸是通道send()和recv()。由于Rust中的標準庫mpsc通道相對緩慢,但仍有其它替代方案,比如,crossbeam-channel。
為此,我們分析了4個不同的通道庫,結果如下:
圖片
顯然,ringbuf和rtrb速度最快。因為它們的環形緩沖區無鎖,扮演著“單個生產者-單個消費者”的角色。單個生產者意味著只有一個管道將數據放入隊列,另一個管道將負責數據輸出,這比“多生產者隊列”開銷小。
此外,這些程序庫也是非阻塞式的。當隊列已滿時,如果嘗試推送,它將提示“error”而不是“block”,“空隊列”亦是如此。
為使用這些環形緩沖區庫,我添加了自旋鎖,以便在通道阻塞時繼續重試。事實證明,這也是高頻交易架構中所使用的方法。
我還發現,在等待時增加非常短的“休眠”時間整體性能就能提高。這可能是由于當核心使用率達到100%或高于某些溫度時,啟動CPU就會發生節流的現象。
如下是新的pop()和push(value)幫助器:
圖片
我們將用新方法展示:
圖片
速度確實比以前快了,但也快不了多少,現在,就讓我們把并行化提升至另一個層次。
3.更多的并行化
目前,我們為哈希創建了兩個線程,一個用于SHA512,另一個用于BLAKE3。兩者中較慢的那個將成為我們技術發展的瓶頸。為證明這一點,我重新運行了原始的單線程示例,僅使用SHA512哈希,結果如下:
圖片
這與并行哈希示例中的性能非常接近,意味著,總體上花在哈希上的大部分時間都是由SHA512產生。
那么,如果我們同時創建更多的線程并將多個數字進行散列排列呢?讓我們試一試。我們將創建2個SHA512哈希線程和2個BLAKE3哈希線程來啟動。
4.可視化
每個線程都擁有自己的輸入和輸出隊列。我們將用循環順序將生成的數字循環發送至每個線程并用相同的順序讀取結果。
圖片
這確保了流的順序能夠在結果線程中維持不變;如果排序不重要或消息處理時間多變,那么,其它的調度機制可能會更好。
如下是循環調度代碼:
圖片
新的代碼更復雜,部分如下:
圖片
一起來看看,現在表現如何?輸出結果如下:
圖片
確實好多了!
5.測量“閑置”和“阻塞”時間
每個哈希函數應該有多少個線程?在更復雜的系統中,這很難確定,甚至可能是動態的。
實際上,有一種技術對“流處理”很有幫助,即,在某個時間窗口內測量空閑和阻塞時間。
- 空閑時間
等待空隊列接收消息所花的時間
- 全程時間
等待滿隊列發送輸出所花費的時間
空閑時間是pop()期間旋轉的時間,阻塞時間是push()期間旋轉的時間。我修改了這兩個函數,用來跟蹤花費時間。這段代碼使用了開銷很小的單元:
圖片
我還創建了一個新的線程統計這些時間,輸出結果如下:
圖片
我們可以看到,sha512線程既沒有“空閑”也沒有“阻塞”,而是100%處于活躍狀態;此外,我們還能通過增加sha512線程數量為系統提速。
注:當用測量系統的行為改變其性能時,可能會出現像“海森伯測不準原理”這樣的問題。如果遇到此種情況,請查看“粗時間庫”;通常,定時測量取近似值就足夠了。
我們在Digital Ocean實例中,經過試驗和錯誤數據總結出:最佳數量是8個SHA512線程和4個BLAKE3線程。
圖片
結果:小于初始時間的1/6。
四、下一步:為不同的流處理結果分配內存
在這篇文章中,我們用具體實例介紹了Rust中流處理的概念、方法和優化,但是還有很多細節沒有討論。在實際系統中,我們應該考慮將“線程”固定到CPU內核上,用來減少上下文切換。
此外,在流處理時,你通常需要為不同的結果分配內存。這是昂貴的,所以,在今后的文章中,我們還將討論這方面的一些策略。
參考資料:
1.https://noz.ai/hash-pipeline/
2.https://zhuanlan.zhihu.com/p/70247995?utm_id=0