大家好呀,我是樓仔。
今天第一天開工,收拾心情,又要開始好好學習,好好工作了。
對于使用 JAVA 的小伙伴,其實我們完全不用手動擼一個分布式鎖,直接使用 redisson 就行。
但是因為這些封裝好的組建,讓我們越來越懶。
我們使用一些封裝好的開源組建時,可以了解其中的原理,或者自己動手寫一個,可以更好提升你的技術水平。
今天我就教大家用原生的 Redis,手動擼一個 Redis 分布式鎖,很有意思。
01 問題引入
其實通過 Redis 實現分布式鎖,經常會有面試官會問,很多同學都知道用 SetNx() 去獲取鎖,解決并發問題。
SetNx() 是什么?我簡單解答一下。
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在時,為 key 設置指定的值。
對于下面 2 種問題,你知道如何解決么?
- 如果獲取鎖的機器掛掉,如何處理?
- 當鎖超時時,A、B 兩個線程同時獲取鎖,可能導致鎖被同時獲取,如何解決?
這個就是我們實現 Redis 分布式鎖時,需要重點解決的 2 個問題。
02 理論知識
剛才說過,通過 SetNx() 去獲取鎖,可以解決并發問題。
當獲取到鎖,處理完業務邏輯后,會將鎖釋放。
圖片
但當機器宕機,或者重啟時,沒有執行 Del() 刪除鎖操作,會導致鎖一直沒有釋放。
所以,我們還需要記錄鎖的超時時間,判斷鎖是否超時。
圖片
這里我們通過 GetKey() 獲取鎖的超時時間 A,通過和當前時間比較,判斷鎖是否超時。
如果鎖未超時,直接返回,如果鎖超時,重新設置鎖的超時時間,成功獲取鎖。
還有其它問題么?當然!
因為在并發場景下,會存在 A、B 兩個線程同時執行 SetNx(),導致兩個線程同時獲取到鎖。
那如何解決呢?將 SetNx() 用 GetSet() 替換。
圖片
GetSet() 是什么?我簡單解答一下。
Redis Getset 命令用于設置指定 key 的值,并返回 key 的舊值。
這里不太好理解,我舉個例子。
假如 A、B 兩個線程,A 先執行,B 后執行:
- 對于線程 A 和 B,通過 GetKey 獲取的超時時間都是 T1 = 100;
- 對于線程 A,將超時時間 Ta = 200 通過 GetSet() 設置,返回 T2 = 100,此時滿足條件 “T1 == T2”,獲取鎖成功;
- 對于線程 B,將超時時間 Tb = 201 通過 GetSet() 設置,由于鎖超時時間已經被 A 重新設置,所以返回 T2 = 200,此時不滿足條件 “T1 == T2”,獲取鎖失敗。
可能有同學會繼續問,之前設置的超時是 Ta = 200,現在變成了 Tb = 201,延長或縮短了鎖的超時時間,不會有問題么?
其實在現實并發場景中,能走到這一步,基本是“同時”進來的,兩者的時間差非常小,可以忽略此影響。
03 代碼實戰
這里給出 Go 代碼,注釋都寫得非常詳細,即使你不會 Go,讀注釋也能讀懂。
// 獲取分布式鎖,需要考慮以下情況:
// 1. 機器A獲取到鎖,但是在未釋放鎖之前,機器掛掉或者重啟,會導致其它機器全部hang住,這時需要根據鎖的超時時間,判斷該鎖是否需要重置;
// 2. 當鎖超時時,需要考慮兩臺機器同時去獲取該鎖,需要通過GETSET方法,讓先執行該方法的機器獲取鎖,另外一臺繼續等待。
func GetDistributeLock(key string, expireTime int64) bool {
currentTime := time.Now().Unix()
expires := currentTime + expireTime
redisAlias := "jointly"
// 1.獲取鎖,并將value值設置為鎖的超時時間
redisRet, err := redis.SetNx(redisAlias, key, expires)
if nil == err && utils.MustInt64(1) == redisRet {
// 成功獲取到鎖
return true
}
// 2.當獲取到鎖的機器突然重啟&掛掉時,就需要判斷鎖的超時時間,如果鎖超時,新的機器可以重新獲取鎖
// 2.1 獲取鎖的超時時間
currentLockTime, err := redis.GetKey(redisAlias, key)
if err != nil {
return false
}
// 2.2 當"鎖的超時時間"大于等于"當前時間",證明鎖未超時,直接返回
if utils.MustInt64(currentLockTime) >= currentTime {
return false
}
// 2.3 將最新的超時時間,更新到鎖的value值,并返回舊的鎖的超時時間
oldLockTime, err := redis.GetSet(redisAlias, key, expires)
if err != nil {
return false
}
// 2.4 當鎖的兩個"舊的超時時間"相等時,證明之前沒有其它機器進行GetSet操作,成功獲取鎖
// 說明:這里存在并發情況,如果有A和B同時競爭,A會先GetSet,當B再去GetSet時,oldLockTime就等于A設置的超時時間
if utils.MustString(oldLockTime) == currentLockTime {
return true
}
return false
}
刪除鎖邏輯:
// 刪除分布式鎖
// @return bool true-刪除成功;false-刪除失敗
func DelDistributeLock(key string) bool {
redisAlias := "jointly"
redisRet := redis.Del(redisAlias, key)
if redisRet != nil {
return false
}
return true
}
業務邏輯:
func DoProcess(processId int) {
fmt.Printf("啟動第%d個線程n", processId)
redisKey := "redis_lock_key"
for {
// 獲取分布式鎖
isGetLock := GetDistributeLock(redisKey, 10)
if isGetLock {
fmt.Printf("Get Redis Key Success, id:%dn", processId)
time.Sleep(time.Second * 3)
// 刪除分布式鎖
DelDistributeLock(redisKey)
} else {
// 如果未獲取到該鎖,為了避免redis負載過高,先睡一會
time.Sleep(time.Second * 1)
}
}
}
最后起個 10 個多線程,去執行這個 DoProcess():
func mAIn() {
// 初始化資源
var group string = "group"
var name string = "name"
var host string
// 初始化資源
host = "http://ip:port"
_, err := xrpc.NewXRpcDefault(group, name, host)
if err != nil {
panic(fmt.Sprintf("initRpc when init rpc failed, err:%v", err))
}
redis.SetRedis("louzai", "redis_louzai")
// 開啟10個線程,去搶Redis分布式鎖
for i := 0; i <= 9; i ++ {
go DoProcess(i)
}
// 避免子線程退出,主線程睡一會
time.Sleep(time.Second * 100)
return
}
程序跑了100 s,我們可以看到,每次都只有 1 個線程獲取到鎖,分別是 2、1、5、9、3,執行結果如下:
啟動第0個線程
啟動第6個線程
啟動第9個線程
啟動第4個線程
啟動第5個線程
啟動第2個線程
啟動第1個線程
啟動第8個線程
啟動第7個線程
啟動第3個線程
Get Redis Key Success, id:2
Get Redis Key Success, id:2
Get Redis Key Success, id:1
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:5
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:9
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
Get Redis Key Success, id:3
04 后記
這個代碼,其實是我很久之前寫的,因為當時 Go 沒有開源的分布式鎖,但是我又需要通過單機去執行某個任務,所以就自己手動擼了一個,后來在線上跑了 2 年,一直都沒有問題。
不過期間也遇到過一個坑,就是我們服務遷移時,忘了將舊機器的分布式鎖停掉,導致鎖經常被舊機器搶占,當時覺得很奇怪,我的鎖呢?
寫這篇文章時,又讓我想到當時工作的場景。
最后再切回正題,本文由淺入深,詳細講解了 Redis 實現的詳細過程,以及鎖超時、并發場景下,如何保證鎖能正常釋放,且只有一個線程去獲取鎖。