譯者 | 彎月
譯者 | 彎月 責編 | 歐陽姝黎
出品 | CSDN(ID:CSDNnews)
Rust的異步功能很強大,但也以晦澀難懂著稱。在本文中,我將總結之前提過的一些想法,并給出一些新的點子,看看這些想法放在一起能產生什么效果。
本文只是一個思想實驗。對Rust進行大改造很麻煩,因此我們需要一個精確的方法來找出優缺點,并確定某個改動是否值得。我知道一些觀點會產生完全相反的看法,所以我建議你用一種開放的心態閱讀本文。
在對Rust中實現異步的不同方式進行探索之前,我們應該首先了解何時應該使用異步編程。畢竟,異步編程并不像僅僅使用線程那么容易。那么異步的好處是什么?有人會說是性能原因,異步代碼更快,因為線程的開銷太大了。實際情況更復雜。根據具體情況不同,在以I/O為主的應用程序中使用線程有可能更快。例如,一個基于線程的echo服務器在并發數小于100的時候比異步更快。但在并發數超過100之后,線程的性能就會下降,但也不是急劇下降。
我認為,使用異步的更好的理由是可以更有效地針對復雜的流程控制進行建模。例如,如果不適用異步編程,那么暫停或取消一個正在進行的操作就會非常困難。而且,使用線程時,在各個連接之間進行協調需要使用同步原語,這就會導致競爭。使用異步編程,可以在同一個線程中對多個連接進行操作,從而避免了同步原語。
Rust的異步模型能夠非常好地對復雜流程控制進行建模。例如,mini-redis的subscribe命令(
https://github.com/tokio-rs/mini-redis/blob/master/src/cmd/subscribe.rs#L94-L156)就非常精練、非常優雅。但異步也不是萬能靈藥。許多人都認為異步Rust的學習曲線非常復雜。盡管入門很容易,但很快就會遇到陡峭的曲線。很多人付出了很多努力,盡管有幾個方面有待改進,但我相信,異步Rust最大的問題就在于會違反“最小驚訝原則”。
舉個例子。同學A在學習Rust時閱讀了Rust的教科書和Tokio的指南,打算寫一個聊天服務器作為練習。他選了一個基于行的簡單協議,將每一行編碼,添加前綴表示行的長度。解析行的函數如下:
let len = socket.read_u32.await?;
let mut line = vec![0; len];
socket.read_exact(&mut line).await?;
let line = str::from_utf8(line)?;
Ok(line)
}
這段代碼除了async和await關鍵字之外,跟阻塞的Rust代碼沒有什么兩樣。盡管同學A從來沒有寫過Rust,但閱讀并理解這個函數完全沒問題,至少從他自己的角度看如此。在本地測試時,聊天服務器似乎也能正常工作,于是他給同學B發送了一個鏈接。但很不幸,在進行了一些聊天后,服務器崩潰了,并返回了“invalid UTF-8”的錯誤。同學A很迷惑,他檢查了代碼,但并沒有發現什么錯誤。
那么問題在哪兒?似乎該任務在調用棧的更高層的位置使用了一個select!:
loop {
select! {
line_in = parse_line(&socket) => {
if let Some(line_in) = line_in {
broadcast_line(line_in);
} else {
// connection closed, exit loop
break;
}
}
line_out = channel.recv => {
write_line(&socket, line_out).await;
}
}
}
假設channel上收到了一條消息,而此時parse_line在等待更多數據,那么select!就會放棄parse_line操作,從而導致丟失解析中的狀態。在后面的循環迭代中,parse_line再次被調用,從一幀的中間開始,從而導致讀入了錯誤數據。
問題在此:任何Rust異步函數都可能被調用者隨時取消,而且與阻塞Rust不同,這里的取消是一個常見的異步操作。更糟糕的是,沒有任何新手教程提到了這一點。
Future
如果能改變這一點,讓異步Rust每一步的行為符合初學者預期呢?如果行為必須根據預期得到,那么必然有一個能接受的點,為初學者指引正確的方向。此外,我們還希望最大程度地減少學習過程中的意料之外,特別是剛開始的時候。
我們先來改變意料之外的取消問題,即讓異步函數總是能夠完成執行。當future能夠保證完成后,同學A發現異步Rust的行為跟阻塞Rust完全相同,只不過是多了兩個關鍵字async和await而已。生成新任務會增加并發,也會增加任務之間的協調通道數量。select!不再能夠接受任意異步語句,而只能與通道或類似通道的類型(例如JoinHandle)一起使用。
使用能保證完成的future后,同學A的聊天服務器如下:
async fn handle_connection(socket: TcpStream, channel: Channel) {
let reader = Arc::new(socket);
let writer = reader.clone;
let read_task = task::spawn(async move {
while let Some(line_in) in parse_line(&reader).await {
broadcast_line(line_in);
}
});
loop {
// `channel` and JoinHandle are both "channel-like" types.
select! {
res = read_task.join => {
// The connection closed, exit loop
break;
}
line_out = channel.recv => {
write_line(&writer, line_out).await;
}
}
}
}
這段代碼與前面的示例很相似,但由于所有異步語句必然會完成,而且select!只接受類似于通道的類型,因此parse_line的調用被移動到了一個生成的任務中。select要求類似于通道的類型,這能夠保證放棄丟失的分支是安全的。通道可以存儲值,而且接收值是原子操作。丟失select的分支并不會導致取消時丟失數據。
取消
如果寫入時發生錯誤會怎樣?現狀下read_task會繼續執行。然而,同學A希望它能出錯,并優雅地關閉連接和所有任務。不幸的是,這里就會遇到設計上的難題。如果我們能夠隨時放棄任何異步語句,那么取消就非常容易了,只需要放棄future就可以。我們需要一種方法來取消正在執行的操作,因為這是使用異步編程的主要目的之一。為了實現這一點,JoinHandle提供了cancel方法:
async fn handle_connection(socket: TcpStream, channel: Channel) {
let reader = Arc::new(socket);
let writer = reader.clone;
let read_task = task::spawn(async move {
while let Some(line_in) in parse_line(&reader).await? {
broadcast_line(line_in)?;
}
Ok()
});
loop {
// `channel` and JoinHandle are both "channel-like" types.
select! {
_ = read_task.join => {
// The connection closed or we encountered an error,
// exit the loop
break;
}
line_out = channel.recv => {
if write_line(&writer, line_out).await.is_err {
read_task.cancel;
read_task.join;
}
}
}
}
}
但是cancel能做什么呢?它并不能立即終止任務,因為現在異步語句是保證能夠執行完成的。但我們的確需要停止處理并盡快返回。相反,被取消的任務中的所有資源類型都應該停止執行,并返回“被中斷”的錯誤。進一步的嘗試也應該返回錯誤。這種策略與Kotlin很相似,只不過Kotlin會拋出異常而已。如果在任務取消時,read_task正在parse_line中等待socket.read_u32,那么read_u32函數會立即返回Err(
io::ErrorKind::Interrupted)。操作符?會在任務中向上冒泡,導致整個任務中斷。
乍一看,這種行為非常像其他任務停止的行為,但其實不一樣。對于同學A而言,當前的異步Rust的終止行為看起來就像任務不確定地發生掛起一樣。如果能強制資源(例如套接字)在取消時返回錯誤,就能跟蹤取消的流程。同學A可以添加println!語句或使用其他調試策略來調查什么導致了任務中斷。
AsyncDrop
然而,同學A并不知道,他的聊天服務器使用了io-uring來避免了絕大部分系統調用。由于future能保證完成,再加上AsyncDrop,就可以透明底使用io-uring API。當同學A在handle_connection的末尾drop TcpStream時,套接字會異步地關閉。為了實現這一點,TcpStream的AsyncDrop實現如下:
impl AsyncDrop for TcpStream {
async fn drop(&mut self) {
self.uring.close(self.fd).await;
}
}
有人提出了一個絕妙的方法在traits中使用async(
https://hackmd.io/bKfiVPRpTvyX8JK_Ng2EWA?view)。唯一的問題就是如何處理隱含的.await點。目前,異步地等待一個future需要進行一次.await調用。而當一個值離開async上下文的范圍時,編譯器會為AsyncDrop trait添加一個隱藏的yield點。這個行為違反了最少意料之外原則。那么,既然其他的點都是明示的,為何此處需要一個隱含的await點?
解決“有時需要隱含drop”的問題的提議之一就是,要求使用明示的函數調用執行異步的drop:
my_tcp_stream.read(&mut buf).await?;
async_drop(my_tcp_stream).await;
當然,如果用戶忘記調用async drop怎么辦?畢竟,編譯器在阻塞Rust中會自動處理drop,而且這是個非常強大的功能。而且,注意上述代碼有一個小問題:?操作符在讀取錯誤時會跳過async_drop調用。Rust編譯器能對此問題給出警告,但怎么修復呢?有辦法讓?與明示的async_drop兼容嗎?
去掉.await
如果不要求明示的async drop調用,而是去掉await關鍵字怎么樣?同學A就不需要在調用異步函數(如socket.read_u32().await)之后使用.await了。在異步上下文中調用異步函數時,.await就變成了隱含的。
似乎這是如今Rust的一大進步。但我們依然可以對這個假設提出質疑。隱含的.await只能在異步語句中發生,因此它的應用比較有限,而且依賴于上下文。同學A只有通過查看函數定義,才能知道自己位于某個異步上下文中。此外,如果IDE能高亮顯示某個yield點,就會非常方便。
去掉.await還有另一個好處:它能讓異步Rust與阻塞Rust一直。阻塞的概念已經是隱含的了。在阻塞Rust中,我們并不會寫my_socket.read(buffer).block?。當同學A編寫異步聊天服務器時,他注意到的唯一區別就是必須用async關鍵字來標記函數。同學A可以憑直覺想象異步代碼的執行。“懶future”的問題不再出現,而同學A也不能無意間做下面的事,并對先輸出“two”的情況感到困惑。
async fn my_fn_one {
println!("one");
}
async fn my_fn_two {
println!("two");
}
async fn mixup {
let one = my_fn_one;
let two = my_fn_two;
join!(two, one);
}
.await的RFC中的確有一些對于隱含await的討論。當時,反對隱含await的最有力的觀點是,await調用正好標記了async語句可以被中止的點。如果采用保證完成的future,這個觀點就不那么有力了。當然,對于可以安全中止的異步語句,我們還應該保留await關鍵字嗎?這個問題需要一個答案。但無論如何,去掉“.await”是一個非常大的挑戰,必須謹慎行事。需要進行易用性研究,表明其優點大于缺點才行。
帶有作用域的任務
到目前為止,同學A已經可以使用異步Rust構建聊天服務器,而且不需要學習太多新概念,也不會遇到無法預測的行為。他了解了select!,但編譯器會強制在類似于通道的類型中進行選擇。除此之外,同學A還給函數添加了async,而且運行良好。他把代碼展示給同學B看,并詢問是否需要將套接字放在一個Arc中。同學B建議他閱讀一下帶有作用域的任務(scoped tasks),借此避免分配。
帶有作用域的任務等價于crossbeam的“帶有作用域的線程”,只不過是異步的。這個任務可以通過生成者借用數據。同學A可以使用帶有作用域的任務來避免在連接處理函數中使用Arc:
async fn handle_connection(socket: TcpStream, channel: Channel) {
task::scope(async |scope| {
let read_task = scope.spawn(async || {
while let Some(line_in) in parse_line(&socket)? {
broadcast_line(line_in)?;
}
Ok()
});
loop {
// `channel` and JoinHandle are both "channel-like" types.
select! {
_ = read_task.join => {
// The connection closed or we encountered an error,
// exit the loop
break;
}
line_out = channel.recv => {
if write_line(&writer, line_out).is_err {
break;
}
}
}
}
});
}
保證安全的關鍵是要保證,作用域的生存周期要大于在該作用域范圍內生成的所有任務,換句話說,確保異步語句能夠完成。但有一個缺點。啟用帶有作用域的任務會使“Future::poll”變得不安全,因為必須對future的完成情況進行輪詢,以保證內存安全性。這種不安全性會導致Future的實現更難。為了降低難度,我們需要盡可能避免用戶自己實現Future,包括實現類似于AsyncRead、AsyncIterator等traits。我相信這是一個可以達到的目標。
除了帶有作用域的任務之外,保證異步語句的完成,還可以在使用io-uring或與C++ future集成時,讓指針能正確地從任務傳遞到內核。某些情況下,還可能在生成子任務時避免分配,對于某些嵌入式環境非常有用,盡管這種情況需要一個略微不同的API。
通過生成的方式增加并發
利用今天的異步Rust,應用程序可以通過利用select!或FutureUnordered生成新任務的方式增加并發。到目前為止,我們討論了任務生成和select!。我建議去掉FuturesUnordered,因為它經常會導致bug。在使用FutureUnordered時,很容易認為生成的任務會在后臺執行,然后出乎意料地發現這些任務不會有任何進展。
相反,我們可以利用帶有作用域的任務實現類似的方案:
let greeting = "Hello".to_string;
task::scope(async |scope| {
let mut task_set = scope.task_set;
for i in 0..10 {
task_set.spawn(async {
println!("{} from task {}", greeting, i);
i
});
}
async for res in task_set {
println!("task completed {:?}", res);
}
});
每個生成的任務都會并發執行,從生成者那里借用數據,而TaskSet能提供一個類似于FuturesUnordered,但不會導致災難的API。至于緩存流等其他原語也可以在帶有作用域的任務上實現。
還可以在這些原語之上實現一些新的并發原語。例如,可以實現類似于Kotlin的結構化并發。之前有人曾討論過這個問題(
https://github.com/tokio-rs/tokio/issues/1879),但異步Rust的當前模型無法實現這一點。而將異步Rust改為保證完成,就能解鎖這一領域。
select!怎么辦?
本文開頭我說過,使用異步編程可以更有效地對復雜的流程控制進行建模。目前最有效的原語為select!。我還提議,將select!改為只接受類似于通道的類型,這樣可以強制同學A為每個連接生成兩個任務,實現讀寫的并發性。生成任務能防止在取消讀操作的時候出現bug,還能重寫讀操作,以處理意料之外的取消。例如,mini-redis在解析幀的時候,我們首先將接收到的數據保存到緩沖區中。當讀操作被取消時,位于緩沖區中的數據不會丟失。下次調用讀操作會從中斷的地方繼續。因此Mini-redis的讀操作對于中止是安全的(abort-safe)。
如果不將select!限制在類似于通道的類型上,而是將其限制在對于中止是安全的操作上,會怎樣?從通道中接收數據是中止安全的,但從帶有緩沖區的I/O處理函數中讀取也是中止安全的。這里的關鍵是,不應該假設所有異步操作都是中止安全的,而是應該要求開發者向函數定義中添加#[abort_safe](或async(abort))。這種策略有幾個好處。首先,當同學A學習異步Rust時,它不需要知道任何有關安全性的概念。即使不理解這個概念,僅通過生成任務來獲得并發性,也可以實現一切:
#[abort_safe]
async fn read_line(&mut self) -> io::Result<Option<String>> {
loop {
// Consume a full line from the buffer
if let Some(line) = self.parse_line? {
return Ok(line);
}
// Not enough data has been buffered to parse a full line
if 0 == self.socket.read_buf(&mut self.buffer)? {
// The remote closed the connection.
if self.buffer.is_empty {
return Ok(None);
} else {
return Err("connection reset by peer".into);
}
}
}
}
不再默認要求中止安全語句,而是由開發者自行標注。這種自行標注的策略符合撤銷安全性的模式。當新的開發者閱讀代碼時,這個標注會告訴他們該函數必須保證中止安全。rust編譯器甚至可以對于標注了#[abort_safe]的函數提供額外的檢查和警告。
現在同學A可以在select!的循環中使用read_line了:
loop {
select! {
line_in = connection.read_line? => {
if let Some(line_in) = line_in {
broadcast_line(line_in);
} else {
// connection closed, exit loop
break;
}
}
line_out = channel.recv => {
connection.write_line(line_out)?;
}
}
}
混合使用中止安全和非中止安全
#[abort_safe]注釋引入了兩個異步語句的變種。混合使用中止安全和非中止安全需要特別考慮。不論從中止安全還是從非中止安全的上下文中,都可以調用一個中止安全的函數。然而,Rust編譯器會阻止從中止安全的上下文中調用非中止安全的函數,并提供一個有幫助的錯誤信息:
async fn must_complete { ... }
#[abort_safe]
async fn can_abort {
// Invalid call => compiler error
must_complete;
}
async fn must_complete { ... }
#[abort_safe]
async fn can_abort {
// Valid call
spawn(async { must_complete }).join;
}
開發者可以通過生成新任務的方式,從非中止安全函數中獲得中止安全的上下文。
異步語句的兩個新變種會增加語言的復雜性,但這個復雜性僅在學習曲線的后期才出現。在剛開始學習Rust時,默認的異步語句是非中止安全的。從這個上下文中,學習者可以不用關心中止安全性而直接調用異步函數。中止安全會在異步Rust的教程的后期作為一個可選的話題出現。
漫漫長路
從目前的默認要求中止安全的異步模型改變成保證完成的模型,需要一個全新的Rust版本。處于討論的目的,我們假設Rust 2026版引入了該變動。那么該版本中的Future trait將改變為保證完成的future,因此無法與老版本的trait兼容。相反,2026版中的舊trait將改名為AbortSafeFuture(名稱暫定)。
在2026版中,給異步語句添加#[abort_safe]會生成一個AbortSafeFuture實現,而不是Future。2026之前版本中編寫的任何異步函數都實現了AbortSafeFuture trait,因此任何已有的異步代碼都能與新版本兼容(別忘了,中止安全的函數可以從任何上下文中調用)。
一些想法
我討論了Rust可能出現的一些改動。簡單地總結一下:
-
異步函數保證完成
-
去掉await關鍵字
-
引入#[abort_safe]標注,表示異步函數是中止安全的
-
限制select!,僅接受中止安全的分支
-
取消已生成的任務的方式是禁止資源完成
-
支持帶有作用域的任務
我相信,這些改動可以極大地簡化Rust異步,盡管進行這些改動會影響現狀。在進行決定之前,我們還需要更多數據。如今的異步代碼有多少是中止安全的?我們能否進行易用性研究,以評價這些改動的好處?Rust擁有兩種風格的異步語句,會帶來多少認知上的困難?
我也希望本文能拋磚引玉,期待其他人能提出別的觀點。現在Rust需要許多觀點來決定其未來。
原文鏈接:
https://carllerche.com/2021/06/17/six-ways-to-make-async-rust-easier/
聲明:本文由CSDN翻譯,轉載請注明來源。