順序鎖為寫者賦予了較高的優先級,即使在讀者正在讀的時候,也允許寫著繼續運行。這種策略的好處是,寫者永遠不會等待,缺點是有時候讀者不得不反復多次讀相同的數據,直到它獲得有效的副本。
在linux內核代碼中,順序鎖被定義成seqlock_t結構體(代碼位于include/linux/seqlock.h中):
typedef struct {
struct seqcount seqcount;
spinlock_t lock;
} seqlock_t;
所以,包含一個自旋鎖lock和一個表示當前鎖的順序數的seqcount。seqcount結構體被定義為:
typedef struct seqcount {
unsigned sequence;
......
} seqcount_t;
就是包含了一個無符號整型的sequence變量。
注:需要C/C++ Linux高級服務器架構師學習資料私信“資料”(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享
初始化
順序鎖在使用之前需要先初始化,一般有兩種方法:
DEFINE_SEQLOCK(lock1);
seqlock_t lock2;
seqlock_init(&lock2);
可以看到,第一種方法是用宏直接定義并且初始化一個順序鎖變量:
#define SEQCNT_ZERO(lockname) { .sequence = 0, ......}
......
#define __SEQLOCK_UNLOCKED(lockname)
{
.seqcount = SEQCNT_ZERO(lockname),
.lock = __SPIN_LOCK_UNLOCKED(lockname)
}
......
#define DEFINE_SEQLOCK(x)
seqlock_t x = __SEQLOCK_UNLOCKED(x)
所以,直接通過宏定義初始化就是定義了一個seqlock_t結構體變量,將其內部的自旋鎖lock初始化為未加鎖,并將表示順序數的seqcount變量初始化為0。
第二種方法是自己定義一個seqlock_t結構體變量,然后調用seqlock_init函數將其初始化:
static inline void __seqcount_init(seqcount_t *s, const char *name,
struct lock_class_key *key)
{
......
s->sequence = 0;
}
......
# define seqcount_init(s) __seqcount_init(s, NULL, NULL)
......
#define seqlock_init(x)
do {
seqcount_init(&(x)->seqcount);
spin_lock_init(&(x)->lock);
} while (0)
也是初始化內部的自旋鎖變量lock,并將表示順序數的seqcount變量初始化為0。
寫操作
順序鎖區分寫者與讀者,對于寫者來說,一般使用下面的用法:
write_seqlock(&seq_lock);
/* 修改數據 */
......
write_sequnlock(&seq_lock);
將對數據的修改代碼夾在write_seqlock和write_sequnlock函數之間就行了。
獲得寫順序鎖的write_seqlock函數定義如下:
static inline void write_seqlock(seqlock_t *sl)
{
/* 獲得自旋鎖 */
spin_lock(&sl->lock);
write_seqcount_begin(&sl->seqcount);
}
首先要獲得順序鎖內部的自旋鎖,然后調用write_seqcount_begin函數:
static inline void write_seqcount_begin(seqcount_t *s)
{
write_seqcount_begin_nested(s, 0);
}
接著調用write_seqcount_begin_nested函數:
static inline void write_seqcount_begin_nested(seqcount_t *s, int subclass)
{
raw_write_seqcount_begin(s);
......
}
最終調用了raw_write_seqcount_begin函數:
static inline void raw_write_seqcount_begin(seqcount_t *s)
{
/* 累加順序數 */
s->sequence++;
/* 寫內存屏障 */
smp_wmb();
}
累加了順序鎖內的順序數,這之后添加了一個寫內存屏障。這是為了保證在還沒有正式執行write_seqlock函數之后的修改數據代碼之前,保證系統中的其它模塊能感知到順序數已經被累加了。也就是保證累加順序數的指令不會被重排序到后面的修改數據代碼中,否則,有可能修改數據代碼的代碼已經執行了一點了,別的CPU還沒感知到順序數被更改了,會造成讀取數據不一致的情況。當然,應該也要在讀取的時候對應的添加上讀內存屏障。
釋放寫順序鎖的write_sequnlock函數的功能基本上就是把獲得鎖的過程倒過來,定義如下:
static inline void write_sequnlock(seqlock_t *sl)
{
write_seqcount_end(&sl->seqcount);
/* 釋放自旋鎖 */
spin_unlock(&sl->lock);
}
先調用write_seqcount_end函數,然后釋放自旋鎖:
static inline void write_seqcount_end(seqcount_t *s)
{
......
raw_write_seqcount_end(s);
}
接著調用raw_write_seqcount_end函數:
static inline void raw_write_seqcount_end(seqcount_t *s)
{
/* 寫內存屏障 */
smp_wmb();
/* 累加順序數 */
s->sequence++;
}
先加寫內存屏障,再累加順序數。這是為了保證修改數據的代碼都執行完畢后才能將順序數累加。所以,在前面write_seqlock的時候使用的寫內存屏障和這里write_sequnlock使用的寫內存屏障是成對的,組成了一個臨界區,用來執行修改數據的操作。
同時,由于順序鎖的順序數被初始化為1,當上寫鎖的時候會將順序數加1,且開寫鎖的時候仍然會加1,所以當讀到順序數是奇數的時候就一定表示有一個寫者獲得了寫順序鎖,而當讀到順序數是偶數的時候就一定表示當前沒有任何寫者獲得了寫順序鎖。
而且,對于不同寫入者來說,序列鎖是有自旋鎖保護的,所以同一時間只能有一個寫入者。
最后,非常關鍵的,寫順序鎖不會造成當前進程休眠。
讀操作
接下來,我們分析一下順序鎖的讀者。對于讀者來說,一般使用下面的用法:
unsigned int seq;
do {
seq = read_seqbegin(&seq_lock);
/* 讀取數據 */
......
} while read_seqretry(&seq_lock, seq);
一般都是先使用read_seqbegin函數讀取出順序鎖的順序數,接著執行實際的讀取數據操作,最后調用read_seqretry函數,看看當前順序鎖的順序數是否和前面讀到的順序鎖一致。如果一致,那證明讀取的過程中沒有寫者在寫入,可以直接退出了;如果不一致,說明在讀取的過程中已經至少有一個寫者修改了數據,那就循環重新執行上面的步驟,直到前后讀到的順序數一致為止。
讀取當前順序鎖順序數的read_seqbegin函數定義如下:
static inline unsigned read_seqbegin(const seqlock_t *sl)
{
return read_seqcount_begin(&sl->seqcount);
}
調用了raw_read_seqcount_begin函數:
static inline unsigned read_seqcount_begin(const seqcount_t *s)
{
......
return raw_read_seqcount_begin(s);
}
接著調用了raw_read_seqcount_begin函數:
static inline unsigned raw_read_seqcount_begin(const seqcount_t *s)
{
/* 讀取順序數 */
unsigned ret = __read_seqcount_begin(s);
/* 讀內存屏障 */
smp_rmb();
return ret;
}
先調用了__read_seqcount_begin函數讀取了當前順序鎖的順序數,然后加上了一個讀內存屏障。
static inline unsigned __read_seqcount_begin(const seqcount_t *s)
{
unsigned ret;
repeat:
/* 讀取順序鎖的順序數 */
ret = READ_ONCE(s->sequence);
/* 如果順序數是奇數表明有寫者正在寫入 */
if (unlikely(ret & 1)) {
/* 循環等待 */
cpu_relax();
goto repeat;
}
/* 返回順序數直到其為偶數 */
return ret;
}
先讀取順序鎖的順序數,加上READ_ONCE是為了防止編譯器將其和后面的條件判斷一起優化,打亂了執行次序。然后,判斷順序數是否是奇數,前面提過,如果是奇數的話說明有一個寫者正在持有寫順序鎖,這時候就調用cpu_relax函數,讓出CPU的控制權,并且再次從頭循環讀取順序數,直到其為偶數為止。
cpu_relax函數由各個架構自己實現,Arm64架構的實現如下(代碼位于arch/arm64/include/asm/processor.h中):
static inline void cpu_relax(void)
{
asm volatile("yield" ::: "memory");
}
在大多數的Arm64的實現中,yield指令等同于nop空指令。它只是告訴當前CPU核,目前執行的線程無事可做,當前CPU核可以去做點別的。通常,這種指令只對支持超線程的CPU核有用,但是目前所有Arm64的實現都不支持超線程技術,所以只是作為空指令來處理。
接著我們來看看判斷當前順序鎖的順序數是否和前面讀到的順序鎖一致的read_seqretry函數的實現:
static inline unsigned read_seqretry(const seqlock_t *sl, unsigned start)
{
return read_seqcount_retry(&sl->seqcount, start);
}
調用了read_seqcount_retry函數:
static inline int read_seqcount_retry(const seqcount_t *s, unsigned start)
{
/* 讀內存屏障 */
smp_rmb();
return __read_seqcount_retry(s, start);
}
先加上了一個讀內存屏障,和在前面read_seqbegin的時候使用的讀內存屏障是成對的,組成了一個臨界區,用來執行讀取數據的操作。接著,調用了__read_seqcount_retry函數:
static inline int __read_seqcount_retry(const seqcount_t *s, unsigned start)
{
return unlikely(s->sequence != start);
}
就是簡單判斷了一下當前順序鎖的順序數是否和傳入的start參數的值一致。
讀者是沒有自旋鎖保護的,所以可以多個讀者同時讀取數據,并且讀順序鎖也不會造成當前進程休眠。
使用場景
順序鎖并不是萬能的,適合它的使用場景要滿足下面的條件:
- 比較適合讀多寫少的場景。前面分析代碼的時候看到了,寫者是有自旋鎖保護的,因此一次只能有一個寫者寫入數據,而讀者沒有任何其它鎖保護,是并發讀取的。所以,本來寫的性能就不高,而讀者要保證在讀數據的整個期間不會有寫者寫入,如果寫者有很多的話,就會不停的重新嘗試讀取,也會嚴重影響性能。
- 被保護的數據一般不會太大太多,否則也會影響性能。
- 被保護的數據結構不包括被寫者修改和被讀者間接引用的指針。否則,寫者可能會在讀者正在讀指針指向的數據的時候就將該指針變失效了。
- 讀者的臨界區代碼除了讀數據外沒有別的會引起其它副作用的操作。否則,多個讀者的操作會互相競爭。這是因為順序鎖的讀者并沒有任何其它鎖來保護,大家是并發讀取的,只是簡單的用了一對讀內存屏障來保護。
- 順序鎖不會造成讀者和寫者休眠。
最常見的,在Linux內核中,更新系統jiffies就是使用的順序鎖。