Go语言高并发Map解决方案

2021-07-11  本文已影响0人  突击手平头哥

Go语言高并发Map解决方案

Go语言基础库中的map不是并发安全的,不过基于读写锁可以实现线程安全;不过在Go1.9版本时,官方提供了sync.Map的线程安全Map

读写锁实现安全map

package main

import (
    "fmt"
    "sync"
)

type SafeMap struct {
    data map[int]string
    mutex sync.Mutex
}

func (m *SafeMap) Get(key int) string {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    if v, ok := m.data[key]; ok {
        return v
    }
    return ""
}

func (m *SafeMap) Put(key int, value string) {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    m.data[key] = value
}


func  main() {

    m := SafeMap{
        data : make(map[int]string),
    }

    m.Put(1, "1")
    fmt.Println(m.Get(1))
}

这种实现的map适用于读多写少的情况,一般场景下足以满足

sync.Map

示例代码

package main

import (
    "fmt"
    "sync"
)



func  main() {

    m := sync.Map{}
    m.Store(1, "1")
    fmt.Println(m.Load(1))
}

sync.Map的借口和普通的Map是不一样的

sync.Map的原理

读写分离

type readOnly struct {
    m       map[interface{}]*entry
    amended bool                    // 如果dirty和read数据不一致则此字段为true
}
type Map struct {
    mu Mutex
    read atomic.Value // readOnly
    dirty map[interface{}]*entry
    misses int
}

添加操作

func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() {
            // The entry was previously expunged, which implies that there is a
            // non-nil dirty map and this entry is not in it.
            m.dirty[key] = e
        }
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        e.storeLocked(&value)
    } else {
        if !read.amended {
            // We're adding the first new key to the dirty map.
            // Make sure it is allocated and mark the read-only map as incomplete.
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

删除操作

读写分离的操作可以解决写入、读取的冲突问题,以上的机制是处理不了删除的

func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            delete(m.dirty, key)
            // Regardless of whether the entry was present, record a miss: this key
            // will take the slow path until the dirty map is promoted to the read
            // map.
            m.missLocked()
        }
        m.mu.Unlock()
    }
    if ok {
        return e.delete()
    }
    return nil, false
}
func (e *entry) delete() (value interface{}, ok bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return nil, false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return *(*interface{})(p), true
        }
    }
}

  在进行删除操作时并未直接从map中删除而是通过标记的方式删除,即将read值设置为nil来实现

//统计查询失败次数
func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

//存储数据时重新生成dirty
func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

在没次查询次数超过限制时就会将dirty提升为read,但是却把dirty设置为nil

在存储、修改数据时会再次从read中复制一份数据为dirty这个时候才是真正的将数据删除掉了

总结

  对于sync.Map的代码细节就不过多分析了,内部有很多双重功能判断需要结合业务来分析;这个读写锁更加的高级,但是实际上面对写并发较多的情况下依然存在不足。

分段锁Map

并发Map更常见的解决方案是:分段锁

分段锁Map的实现思路

  创建多个Map,对于读写操作的key先进行一次HASH操作,将对此key的操作放到都放到一个Map上,这样读写操作的锁只需要针对这一个Map来做,不同Map之间互不影响。

示例代码

package concurrentmap

import "sync"

type shard struct {
    data map[string]interface{}
    mutex   sync.Mutex
}

type Map struct {
    table []*shard
    count uint32
}

//可以额外实现指定count和hash函数的接口
func NewMap() *Map {
    m := Map {}
    var count uint32 = 32
    for i := uint32(0); i < count; i++ {
        m.table = append(m.table, &shard{
            data : make(map[string]interface{}),
        })
    }
    m.count = count
    return &m
}


const prime32 = uint32(16777619)
func fnv32(key string) uint32 {
    hash := uint32(2166136261)
    for i := 0; i < len(key); i++ {
        hash *= prime32
        hash ^= uint32(key[i])
    }
    return hash
}

func (m *Map) getShared(u uint32) *shard {
    return m.table[int(u % m.count)]
}

func (m *Map) Put(key string, value interface{}) {
    shard := m.getShared(fnv32(key))
    shard.mutex.Lock()
    defer shard.mutex.Unlock()
    shard.data[key] = value
}

func (m *Map) Get(key string) (interface{}, bool) {
    shard := m.getShared(fnv32(key))
    shard.mutex.Lock()
    if v, ok := shard.data[key]; ok {
        return v, true
    }
    return nil, false
}
上一篇 下一篇

猜你喜欢

热点阅读