簡介
原子類型在構建無鎖數據結構,跨線程共享數據,線程間同步等多線程并發編程場景中起到至關重要的作用。本文將從Rust提供的原子類型和原子類型的內存排序問題兩方面來介紹。
Rust原子類型
Rust標準庫提供的原子類型在std::sync::atomic模塊下。Rust提供了AtomicBool, AtomicU8, AtomicU16, AtomicUsize等原子類型。下面我們以AtomicUsize為例介紹原子類型提供的原子操作。基本的load,store, swap原子操作就不過多介紹了。第一個要介紹的就是重要的compare-and-swap(CAS)原子操作,絕大部分無鎖數據結構都或多或少依賴CAS原子操作。Rust提供的compare_and_swap接口如下:
pub fn compare_and_swap(&self,
current: usize,
new: usize,
order: Ordering
) -> usize
compare_and_swap接受一個期望的值和一個新的值,這里我們先忽略掉Ordering,后面會詳細介紹,如果變量的值等于期望的值,就將變量值替換成新的值返回成功,否則不做任何修改并返回失敗。compare_and_swap從語義上包含了讀(load)語義和寫(store)語義,先讀出變量的值,和期望值比較,然后寫入內存。原子操作保證了這三個步驟是原子的,在三個步驟之間不會插入其他指令從而導致變量值被修改。從1.50.0開始compare_and_swap被deprecated了,現在需要使用compare_exchange和compare_exchange_weak接口來實現CAS原子操作。
pub fn compare_exchange(
&self,
current: usize,
new: usize,
success: Ordering,
failure: Ordering
) -> Result<usize, usize>
pub fn compare_exchange_weak(
&self,
current: usize,
new: usize,
success: Ordering,
failure: Ordering
) -> Result<usize, usize>
compare_exchange比compare_and_swap多了一個Ordering,兩個Ordering分別作為CAS成功的Ordering和失敗的Ordering,后面會有講解,這里先跳過。從源代碼可以看出compare_and_swap就是用compare_exchange實現的,只是compare_and_swap直接用成功情況下的Ordering生成在失敗情況下的Ordering,compare_exchange則有更高的靈活性。
pub fn compare_and_swap(&self, current: $int_type, new: $int_type, order: Ordering) -> $int_type {
match self.compare_exchange(current,
new,
order,
strongest_failure_ordering(order)) {
Ok(x) => x,
Err(x) => x,
}
}
既然有了compare_exchange,那compare_exchange_weak是做什么用的呢?從官方文檔中可以看出兩個API的唯一區別是compare_exchange_weak允許spuriously fail。那么什么是spuriously fail,在x86平臺上CAS是一條指令完成的,這兩個API在x86平臺上效果沒有區別,但是在arm平臺上,CAS是由兩條指令完成的LL/SC(
Load-link/store-conditional),在arm平臺下會發生spuriously fail,來自Wikipedia的解釋
Real implementations of LL/SC do not always succeed even if there are no concurrent updates to the memory location in question. Any exceptional events between the two operations, such as a context switch, another load-link, or even (on many platforms) another load or store operation, will cause the store-conditional to spuriously fail.
簡單的翻譯就是,即使變量的值沒有被更新LL/SC也不是100%成功,在LL/SC之間的異常事件如上下文切換,另外的LL,甚至load或者store都會導致spuriously fail。由于spuriously fail的存在,arm平臺上compare_exchange是compare_exchange_weak加上一個loop實現的。通常我們在使用CAS的時候會把它放在一個loop中,反復重試直到成功,在這種情況下用compare_exchange_weak會獲得一定的性能提升,如果用compare_exchange則會導致循環套循環。那我們該如何選擇compare_change和compare_exchange_weak呢?如果你想在loop中使用CAS,絕大部分情況下使用compare_exchange_weak,除非你在每一次loop中做的事情很多,spuriously fail會導致很大的overhead即使它很少發生,這種情況下使用compare_exchange。再或者你使用loop就是為了避免spuriously fail,那直接使用compare_exchange就可以達到你的目的。
接下來介紹另外一個重要的原子操作fetch-and-add。
pub fn fetch_add(&self, val: usize, order: Ordering) -> usize
fetch_add也包含了讀寫兩層語義,只是和CAS比起來它不關心變量當前的值,所以它一定成功。fetch_add一般用來做全局計數器。Rust提供了一系列的fetch_and_xxx操作,其中比較有趣的是fetch_update:
pub fn fetch_update<F>(
&self,
set_order: Ordering,
fetch_order: Ordering,
f: F
) -> Result<usize, usize>
where
F: FnMut(usize) -> Option<usize>,
它會接受一個函數,并將函數應用到變量上,把生成的值寫回變量中,因為CPU不支持類似的指令,所以其實fetch_update是使用CAS來實現原子性的。源代碼如下,我們可以看這里使用的是compare_exchange_weak,因為它在一個loop中。
pub fn fetch_update<F>(&self,
set_order: Ordering,
fetch_order: Ordering,
mut f: F) -> Result<$int_type, $int_type>
where F: FnMut($int_type) -> Option<$int_type> {
let mut prev = self.load(fetch_order);
while let Some(next) = f(prev) {
match self.compare_exchange_weak(prev, next, set_order, fetch_order) {
x @ Ok(_) => return x,
Err(next_prev) => prev = next_prev
}
}
Err(prev)
}
內存排序
Rust提供了五種內存排序,由弱到強如下,并且內存排序被標記為#[non_exhaustive]表示未來可能會加入新的類型。
#[non_exhaustive]
pub enum Ordering {
Relaxed,
Release,
Acquire,
AcqRel,
SeqCst,
}
Rust的內存排序和C++20保持一致。內存排序作用是通過限制編譯器和CPU的reorder,來使得多個線程看到的內存順序和我們程序所期望的一樣,所以內存排序主要針對的是內存的讀(load)寫(store)操作。編譯器和CPU會在編譯時和運行時來reorder指令來達到提升性能的目的,從而導致程序中代碼順序會和真正執行的順序可能會不一樣,但是reorder的前提是不會影響程序的最終結果,也就是說編譯器和CPU不會reorder相互有依賴的指令從而破壞程序原本的語義。比方說兩條CPU指令,指令A讀取一塊內存,指令B寫一塊內存,如果CPU發現指令A要讀取的內容在cache中沒有命中需要去內存中讀取,需要花額外的CPU cycle,如果指令B要操作的內存已經在cache中命中了,它可以選擇先執后面的指令B。這時候內存排序的作用就體現出來了,內存排序告訴編譯器和CPU哪些指令可以reorder哪些不可以。接下來分別介紹每一種內存排序的意義。
- Relaxed:Relaxed Ordering不施加任何限制,CPU和編譯器可以自由reorder,使用了Relaxed Ordering的原子操作只保證原子性。
// Global varible
static x: AtomicU32 = AtomicU32::new(0);
static y: AtomicU32 = AtomicU32::new(0);
// Thread 1
let r1 = y.load(Ordering::Relaxed);
x.store(r1, Ordering::Relaxed);
// Thread 2
let r2 = x.load(Ordering::Relaxed); // A
y.store(42, Ordering::Relaxed); // B
這段程序是允許產生r1 == r2 == 42。按照正常的程序執行,這個結果看似不合理,但是因為使用了Relaxed Ordering,CPU和編譯器可以自由reorder指令,指令B被reorder到指令A之前,那就會產生r1 == r2 == 42
- Release:Release Ordering是針對寫操作(store)的,一個使用了Release Ordering的寫操作,任何讀和寫操作(不限于對當前原子變量)都不能被reorder到該寫操作之后。并且所有當前線程中在該原子操作之前的所有寫操作(不限于對當前原子變量)都對另一個對同一個原子變量使用Acquire Ordering讀操作的線程可見。Release Ordering寫和Acquire Ordering讀要配對使用從而在兩個或多個線程間建立一種同步關系。具體例子在介紹完Acquire之后一起給出。
- Acquire:Acquire Ordering是針對讀操作(load)的,一個使用了Acquire Ordering的讀操作,任何讀和寫操作(不限于對當前原子變量)都不能被reorder到該讀操作之前。并且當前線程可以看到另一個線程對同一個原子變量使用Release Ordering寫操作之前的所有寫操作(不限于對當前原子變量)。
如果前面的例子中load使用Acquire Ordering,store使用Release Ordering,那么reorder就不會發生,r1 == r2 == 42的結果就不會產生。Acquire和Release動作特別像鎖的加鎖和釋放鎖的操作,因此Acquire Ordering和Release Ordering常被用在實現鎖的場景。看下面的例子
// Global varible
static DATA: AtomicU32 = AtomicU32::new(0);
static FLAG: AtomicBool = AtomicBool::new(false);
// Thread 1
DATA.store(10, Ordering::Relaxed); // A
FLAG.store(true, Ordering::Release); // B
// Thread 2
while !FLAG.load(Ordering::Acquire) {} // C
assert!(DATA.load(Ordering::Relaxed) == 10); // D
這段程序展示了兩個線程之間同步的一種方式,在線程1中我們在共享內存中寫入數據,然后把FLAG置成true,表明數據寫入完成,在線程2中,我們用一個while循環等待FLAG被置成true,當FLAG被置成true之后,線程2一定會讀到共享內存中的數據。線程1中的Release Ordering寫和線程2中的Acquire Ordering讀建立了順序。當線程2跳出C行的循環表明它可以讀到線程1在B行對FLAG的寫入,按照Release-Acquire Ordering的保證,A行對DATA的寫入不會被reorder到把FLAG置成true之后,并且對DATA的寫入也會對線程2可見。假如這里沒有使用Release-Acquire Ordering,那么線程對DATA的寫入用可能會被reorder到寫FLAG之后,那么線程2就會出現讀到了FLAG但是讀不到DATA的情況。
- AcqRel:AcqRel Ordering主要用于read-modify-write類型的操作,比如compare_and_swap,表明了它同時具有Acquire和Release的語義,讀的部分用Acquire Ordering,寫的部分用Release Ordering。
- SeqCst:SeqCst是Sequential Consistent的縮寫,是一種最強的Ordering,在對讀使用Acquire Ordering,對寫使用Release Ordering,對read-modify-write使用AcqRel Ordering的基礎上再保證所有線程看到所有使用了SeqCst Ordering的操作是同一個順序,不論操作的是不是同一個變量。
這里包含了兩層意思,第一層意思SeqCst禁止了所有的reorder,針對內存讀(load)寫(store)的reorder一共有四種情況:
- loadload reorder:Arquire Ordering保證
- loadstore reorder:Arquire Ordering和Release Ordering保證
- storestore reorder:Release Ordering保證
- storeload reorder:SeqCst Ordering保證
看下面的例子
// Global varible
static x: AtomicU32 = AtomicU32::new(0);
static y: AtomicU32 = AtomicU32::new(0);
// Thread 1
x.store(1, Ordering::SeqCst); // A
let r1 = y.load(Ordering::SeqCst); // B
// Thread 2
y.store(1, Ordering::SeqCst); // C
let r2 = x.load(Ordering::SeqCst); // D
這里如果不使用SeqCst Ordering就會出現r1 == r2 == 0的結果,原因是每一個線程中的load可以被reorder到store之前,即使我們分別對load和store使用Acquire Ordering和Release Ordering,因為它們都不保證storeload的reorder。
SeqCst Ordering的第二層意思是所有使用了SeqCst Ordering的操作在全局有一個順序,并且所有的線程都看到同樣的順序。比如說全局的順序是A->B->C->D,那么r1 == 0 && r2 == 1,并且第三個線程如果看到了y == 1,那么它一定能看到x == 1,這就是SeqCst Ordering全局唯一順序代表的意義。雖然使用SeqCst Ordering可以保證更嚴格的全局一致性,但是它也會帶來性能損失,使用正確并且合適的內存排序才能獲得最優的性能。
最后解釋一下compare_exchange兩個Ordering的含義,CAS包含1.讀變量,2.和期望值比較,3.寫變量三個步驟,第一個Ordering表示CAS成功下即變量當前的值等于期望值時候,整個操作的Ordering,第二個Ordering表示如果當前比較失敗了情況下,第一步讀操作的Ordering。看一個用CAS實現自旋鎖的例子
// Global lock
static LOCK: AtomicBool = AtomicBool::new(false);
// Thread 1
// Get lock
while(LOCK.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed).is_err()) {}
do_something();
// Unlock
LOCK.store(false, Ordering::Release);
// Thread 2
// Get lock
while(LOCK.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed).is_err()) {}
do_something();
// Unlock
LOCK.store(false, Ordering::Release);
總結
本文介紹了Rust提供的原子類型,原子操作以及和原子操作配合使用的內存排序。深入地理解內存排序才能寫出正確并且性能最優的程序。內存排序是一個很深的話題,如有錯誤,歡迎指正,歡迎在評論區留言交流。