Download:https://www.itwangzi.cn/2262.html
本文基于Go版本:1.17.8
go version go1.17.8 darwin/amd64
原生Map 并發(fā)場景
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
m := make(map[int]struct{}, 0)
wg.Add(2)
go func() {
wg.Done()
for i := 0; i < 100000; i++ {
m[i] = struct{}{}
}
}()
go func() {
wg.Done()
for i := 0; i < 100000; i++ {
fmt.Println(m[i])
}
}()
wg.Wait()
time.Sleep(time.Second * 20)
}
多個協(xié)程同時寫入出現(xiàn)fatal error: concurrent map read and map write的錯誤。如何解決該問題呢?那就需要給Map加上一把讀寫互斥鎖sync.RWMutex。
測試代碼如下:
package main
import (
"fmt"
"sync"
"time"
)
type Demo struct {
Data map[int]struct{}
Lock *sync.RWMutex
}
func (d Demo) Get(k int) {
d.Lock.RLock()
fmt.Println(d.Data[k])
d.Lock.RUnlock()
}
func (d Demo) Set(k int) {
d.Lock.Lock()
defer d.Lock.Unlock()
d.Data[k] = struct{}{}
}
func main() {
d := Demo{
Data: make(map[int]struct{}, 0),
Lock: new(sync.RWMutex),
}
for i := 0; i < 2000; i++ {
go d.Set(i)
go d.Get(i)
}
time.Sleep(time.Second * 20)
}
Sync.Map 概述
map是一種特殊的數(shù)據(jù)結(jié)構(gòu); 是一種元素對的無序集合,一個元素是key而對應(yīng)的另一個元素是value,所以這個結(jié)構(gòu)也稱為關(guān)聯(lián)數(shù)組或字典,是一種快速尋找值的理想結(jié)構(gòu);給定key可快速定位到value。
Go語言中Map(映射、字典)是一種內(nèi)置的數(shù)據(jù)結(jié)構(gòu),它是一個無序的(key-value)對的集合。
*map初始化與存儲的結(jié)構(gòu)化*
Go語言原生Map并不是線程安全的,在解決并發(fā)讀寫Map的思路需要使用讀寫互斥鎖(sync.RWMutex),這種方案簡約直接,但是缺點也很明顯,性能不會太高。而sync.Map在Go 1.9的版本中引入,它是一種并發(fā)安全的map,它的設(shè)計非常巧妙,充分利用原子操作(atomic)和互斥鎖(mutex)的配合, 在使用sync.Map之后,對map的讀寫,不需要加入一大把鎖,它通過空間換時間的方式,使用read(atomic.Value)和dirty(map[interface{}]*entry)兩個原生map來進行讀寫分離,降低鎖時間來提升效率。
核心思想
sync.Map核心原則就是盡量使用原子操作,最大程度上減少了鎖的使用,從而接近lock free的效果。
- 使用了兩個原生的map作為存儲介質(zhì),分別為: 只讀字典read(atomic.Value)和 臟字典dirty(map[interface{}]*entry)。
- 只讀字典使用atomic.Value來承載,保證原子性與高性能;臟字典則使用互斥鎖sync.Mutex來進行保護,保證讀寫之間的互斥關(guān)系。
- 只讀字典和臟字典中的鍵值對集合并不是實時同步的,它們在某些時間段內(nèi)可能會有不同。
- 只讀字典和臟字典其實在本質(zhì)都是map[interface{}]*entry類型, entry就是Map的value容器。
- entry是表示具體值的指針類型,也可以表示key已刪除的狀態(tài)。
通過這種設(shè)計,規(guī)避了原生map無法并發(fā)安全刪除的問題,同時在變更某個鍵對應(yīng)的value時也可以使用原子操作。
如何使用
package main
import (
"fmt"
"sync"
)
func main() {
var sm sync.Map
//1. 寫入
sm.Store("張三", 18)
sm.Store("李四", 20)
//2. 讀取
age, _ := sm.Load("張三")
fmt.Println(age.(int))
//3. 遍歷
sm.Range(func(key, value interface{}) bool {
name := key.(string)
age := value.(int)
fmt.Println(name, age)
return true
})
//4. 刪除
sm.Delete("李四")
age, ok := sm.Load("李四")
fmt.Println(age, ok)
//5. 讀取或?qū)懭?br />sm.LoadOrStore("王二麻子", 100)
age, ok = sm.Load("王二麻子")
fmt.Println(age, ok)
}
sync.Map適用于讀多寫少的場景,
- 對于寫多的場景,會導(dǎo)致read map緩存失效,需要加鎖、導(dǎo)致沖突變多;
- 由于未命中read map次數(shù)過多,導(dǎo)致dirty map提升為read map,這是一個O(N)時間復(fù)雜度的操作,會進一步降低性能。
源碼分析
先看一下sync.Map的數(shù)據(jù)結(jié)構(gòu):
type Map struct {
// 互斥鎖 保護read 與 dirty
mu Mutex
// read map的 k,v(dirty) 是不變的, 刪除只是打標(biāo)記,插入新key會加鎖寫到dirty map 中
// 因此對read map的讀取無需加鎖。
read atomic.Value
// dirty map 對dirty map的操作需要持有互斥鎖
dirty map[interface{}]*entry
// 當(dāng)Load操作 read map中未找到,就會嘗試從dirty中進行加載(不管是否存在), misses+1
// 當(dāng)misses 達到dirty map 長度時,dirty被提升為read,并且重新分配dirty。
misses int
}
type readOnly struct {
m map[interface{}]*entry
// 為true時代表 dirty map中包含m中沒有的元素
amended bool
}
type entry struct {
p unsafe.Pointer // *interface{}
}
- read是atomic.Value類型,可以并發(fā)的讀,但是如果需要更新read,則需要加入互斥鎖保護。
- read中存儲的entry指針,可能會被并發(fā)的CAS(比較并交換)更新。但是如果更新一個之前已被刪除entry,則需要先將其狀態(tài)從刪除狀態(tài)改為nil,再拷貝到dirty map中去,然后再執(zhí)行更新。
- dirty 是一個非線程安全的原始map。包含新寫入的key,并且包含read中所有未被刪除的key。這樣可以快速的將dirty提升為read對外提供服務(wù)。如果dirty為nil,那么下一次寫入時,會新建一個新的dirty,這個初始的dirty是read的一個拷貝,但除掉了其中已經(jīng)被刪除的key。
- 每當(dāng)從read中讀取失敗,都會將 misses的計數(shù)值+1,dirty被提升為read`。
*entry.p的三種狀態(tài)*
- read和dirty 原生map都存儲都包含entry,它是一個指針,指向value read和dirty 各自維護一套key,而它指向都同一個value,只要修改了這個entry,對read和dirty都是可見的。而這個指針的狀態(tài)有三個狀態(tài):
- 當(dāng)p == nil時,說明這個鍵值對已被刪除,并且m.dirty==nil 或 m.dirty[k]指向該entry。
- 當(dāng)p == expunged時,說明這條鍵值對已被,并且m.dirty !=nil且m.dirty中沒有這個key。
- 其它情況,p指向一個正常值,表示實際interface{}的地址,并且被記錄在m.read.m[key]中、如果這時m.dirty不為nil,那么它也被記錄在m.dirty[key]中。兩者實際上指向的是同一個值.
- 當(dāng)刪除key時,并不實際刪除。一個entry可以通過原子地設(shè)置p為nil刪除。如果之后創(chuàng)建m.dirty,nil又會被原子地設(shè)置expunged,且不會拷貝dirty中。
- 如果p不為expunged和entry相關(guān)聯(lián)的這個value可以被原子地更新;p == expunged那么當(dāng)它初次設(shè)置到m.entry之后才可以被更新。
sync.Map的數(shù)據(jù)結(jié)構(gòu)
Load 讀取
Load 函數(shù)具體實現(xiàn)方式
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
// 優(yōu)先從read map中讀取數(shù)據(jù)(無鎖)
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 如果不存在并且 read.amended 字段指明 dirty map中有read map中不存在的字段,則加鎖嘗試
// 從dirty map中加載
if !ok && read.amended {
// dirty map 不是線程安全的,所以需要加上互斥鎖
m.mu.Lock()
// double check 避免在加鎖的時候dirty map提升為read map
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// 任然沒有在 read中找到這個key ,并且 amended 為true
if !ok && read.amended {
e, ok = m.dirty[key]
// 不管dirty中有沒有找到,都增加misses 計數(shù),該函數(shù)可能將dirty map提升為readmap
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
// 從entry中原子地操作 load 實際interface{}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}
// 增加misses計數(shù),并在必要的時候提升 dirty map
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
// dirty map 晉升
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
- 如果read 中 沒有這個key,且amended為false,說明dirty為空,那就直接返回空或false。
- 如果read中沒有這個key,且amended為true,說明dirty中可能存在我們要找的key,先上互斥鎖在嘗試去dirty中查找,在此之前,仍然有一個double check的操作,若還是沒有在read中找到,那么就從dirty中找,不管dirty中有沒有找到,都需要"記錄一筆",因為在dirty被提升為read之前,都會進入這條路徑。
Store 存儲
Store 函數(shù)實現(xiàn)
func (m *Map) Store(key, value interface{}) {
// 如果read map 中存在該key 則嘗試直接更改(由于修改的是 entry
//內(nèi)部的pointer, 因此 dirty map 也可見)
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
//如果 read map 中存在該key , 當(dāng)p == expunged,則說明m.dirty != nil
// 并且 m.dirty 中不存在該key 值 此時:
// a. 將p的狀態(tài) 由expunged 更改為nil
// b. dirty map 插入key
// c. 更新 entry.p = value (read map 和 dirty map 指向同一個entry )
m.dirty[key] = e
}
//如果read map中存在該key,且 p != expunged,直接更新該entry
//(此時m.dirty==nil或m.dirty[key]==e)
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 如果 read map 中不存在該key, 但是dirty map 中存在該key,
//直接寫入更新 entry(read map 中仍然沒有這個key)
e.storeLocked(&value)
} else {
if !read.amended {
// 如果 read map 和dirty map 中都不存在該key,則:
// a. 如果dirty map 為空,則需要創(chuàng)建 dirty map,并從read map 中拷貝未刪除的元素
// b. 更新amended 為true,并標(biāo)記dirty map中存在read map中沒有的key
// c. 將k ,v 寫入dirty map中, read map不做改變
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
Delete 刪除
Delete 函數(shù)實現(xiàn)
func (m *Map) Delete(key interface{}) {
m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
// 從read map 中查找, 如果存在,則設(shè)置為nil
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// double check
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// 如果 read map 中不存在,但dirty map中存在, 則直接從dirty map 刪除
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
m.missLocked()
}
m.mu.Unlock()
}
if ok {
//將 entry.p 設(shè)置為nil
return e.delete()
}
return nil, false
}
func (e *entry) delete() (value interface{}, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
//CAS操作
if atomic.CompareAndSwAppointer(&e.p, p, nil) {
return *(*interface{})(p), true
}
}
}
總結(jié)
- sync.Map是線程安全的,讀取,插入,刪除也都是保持著常數(shù)級O(1)的時間復(fù)雜度。
- 通過讀寫分離,降低鎖時間來提高效率,適用于讀多寫少的場景。
測試性能代碼
package _map
import (
"sync"
)
var (
myMap *MyMap
syncMap *sync.Map
)
type MyMap struct {
sync.RWMutex
m map[int]struct{}
}
func init() {
myMap = &MyMap{
m: make(map[int]struct{}, 0),
}
syncMap = new(sync.Map)
}
func mutexMapStore(k int) {
myMap.Lock()
myMap.m[k] = struct{}{}
myMap.Unlock()
}
func mutexMapLoad(k int) int {
myMap.RLock()
defer myMap.RUnlock()
if _, ok := myMap.m[k]; ok {
return 1
}
return 0
}
func mutexMapDelete(k int) {
myMap.Lock()
delete(myMap.m, k)
myMap.Unlock()
}
func syncMapStore(k, v int) {
syncMap.Store(k, v)
}
func syncMapLoad(k int) int {
if _, ok := syncMap.Load(k); ok {
return 1
}
return 0
}
func syncMapDelete(k int) {
syncMap.Delete(k)
}
Benchmark Test
package _map
import (
"math/rand"
"testing"
"time"
)
func BenchmarkMutexStoreParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for pb.Next() {
k := r.Intn(1000000)
mutexMapStore(k)
}
})
}
func BenchmarkMutexMapStoreParalell(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for pb.Next() {
k := r.Intn(1000000)
mutexMapStore(k)
}
})
}
func BenchmarkSyncMapStoreParalell(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for pb.Next() {
// The loop body is executed b.N times total across all goroutines.
k := r.Intn(100000000)
syncMapStore(k, k)
}
})
}
func BenchmarkMutexMapLoadParalell(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for pb.Next() {
k := r.Intn(100000000)
mutexMapLoad(k)
}
})
}
func BenchmarkSyncMapLoadParalell(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for pb.Next() {
k := r.Intn(100000000)
syncMapLoad(k)
}
})
}
func BenchmarkMutexMapDeleteParalell(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for pb.Next() {
// The loop body is executed b.N times total across all goroutines.
k := r.Intn(100000000)
mutexMapDelete(k)
}
})
}
func BenchmarkSyncMapDeleteParalell(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().Unix()))
for pb.Next() {
k := r.Intn(100000000)
syncMapDelete(k)
}
})
}
測試結(jié)果
goos: darwin
goarch: amd64
pkg: lib/sync/map
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkMutexStoreParallel-12 9638028 136.0 ns/op 2 B/op 0 allocs/op
BenchmarkMutexMapStoreParalell-12 9125176 148.9 ns/op 0 B/op 0 allocs/op
BenchmarkSyncMapStoreParalell-12 4548318 294.9 ns/op 44 B/op 3 allocs/op
BenchmarkMutexMapLoadParalell-12 20580038 51.57 ns/op 0 B/op 0 allocs/op
BenchmarkSyncMapLoadParalell-12 91573840 12.51 ns/op 0 B/op 0 allocs/op
BenchmarkMutexMapDeleteParalell-12 9240199 152.4 ns/op 0 B/op 0 allocs/op
BenchmarkSyncMapDeleteParalell-12 99910771 12.19 ns/op 0 B/op 0 allocs/op
PASS