一致性哈希

2019-05-21  本文已影响0人  小怪兽狂殴奥特曼

背景:单调哈希的问题是一旦添加或者删除节点(节点挂掉),需要对所有节点上的key重新哈希并迁移数据。此间会导致服务不可用。
实节点:真正提供服务的节点。
虚拟槽:圆环上的一个位置。当有实节点占据这个位置的槽,则该槽的状态为可用。否则为不可以用状态。
原理:固定数量N个槽均匀沿圆环分布,一个key经过hash后落到对应的槽,每个实节点负责处理落到槽中的key。当节点失效时,可用的槽变为不可以用。这个时候沿着圆环顺时针找到一个能用的槽。这样子避免了单调哈希需要重新哈希的问题。
要点:
1.给实节点分配虚拟槽时,一定要避免发生碰撞。一旦发生碰撞,会导致同一个槽对应不同的实节点,此时添加实节点失败。
2.每个实节点占用的虚拟槽的数量并不是越多越好。实节点占据的虚拟槽越多,添加新节点的失败概率越大。理论上,只要哈希后key能均匀分布即可满足要求。测试用例中,50个复制数已经可以实现比较好的分布效果。
3.为什么采用crc32哈希?因为crc32对随机的key进行哈希得到值分布比较均匀。

package main

import (
    "crypto/md5"
    "hash/crc32"
    "strconv"
    "fmt"
    "errors"
)


type Node struct {
    id string
    ip string
}

const SLOT_MAX int = 0xffff

type CHashing struct {
    replicas int
    count   int
    nodes [SLOT_MAX]*Node
}

func (s *CHashing)Init(rel int, nodes []*Node) error {
    s.replicas = rel
    s.count = 0

    for _,nodes := range nodes {
        err := s.Add(nodes)
        if err!=nil {
            return err
        }
        s.count++;
    }
    return nil
}

func (s *CHashing)hash_func(key string) int {
    md5Chan := make(chan []byte, 1)
    md5Sum := md5.Sum([]byte(key))
    md5Chan <- md5Sum[:]
    return int(crc32.ChecksumIEEE(<-md5Chan))%SLOT_MAX
}

func (s *CHashing)SetReplicas(repl int) {
    s.replicas = repl
}

func (s *CHashing)Add(n *Node) error {
    for i:=0;i<s.replicas;i++ {
        slot := s.hash_func(n.id+strconv.Itoa(i*11))
        // 可能会有碰撞情况
        // 解决方法:id加一个新的盐,遇到碰撞则不许添加节点
        if s.nodes[slot] != nil {
            return errors.New("collision on allocating slot.")
        }
        s.nodes[slot] = n
    }
    s.count++;

    return nil
}

func (s *CHashing)Del(n *Node) {
    for i:=0;i<s.replicas;i++ {
        slot := s.hash_func(n.id+strconv.Itoa(i*11))
        s.nodes[slot] = nil
    }
    s.count--;
    if(s.count<0) {
        s.count=0
    }
}

func (s *CHashing)Clear() {
    for idx := range s.nodes {
        s.nodes[idx] = nil
    }
    s.replicas = 0
    s.count = 0
}

func (s *CHashing)Get(key string) *Node {
    slot := s.hash_func(key)

    if(s.nodes[slot] != nil) {
        return s.nodes[slot]
    }

    for i:=slot+1;i<SLOT_MAX;i++ {
        if(s.nodes[i]!=nil) {
            return s.nodes[i]
        }
    }

    for i:=0;i<slot;i++ {
        if(s.nodes[i]!=nil) {
            return s.nodes[i]
        }       
    }

    //this will never happen
    return nil
}

func test(ch *CHashing, key_num int) {
    fmt.Printf("%d个实节点,每个实节点使用%d个虚拟槽,%d个测试key 访问分布结果 :\n",ch.count,ch.replicas,key_num)
    stat := make(map[string]int)
    for i:=0;i<key_num;i++ {
        nodes := ch.Get(strconv.Itoa(i))
        count := stat[nodes.id]
        stat[nodes.id] = count+1
    }

    // 统计各个key的分布情况
    for k,v := range stat {
        fmt.Printf("%s\t%d\n",k,v)
    }
    fmt.Printf("========================\n")
}

func main() {

    var ch CHashing

    nodes := [5]Node{
        Node{"0", "192.168.0.0.0"},
        Node{"1", "192.168.0.0.1"},
        Node{"2", "192.168.0.0.2"},
        Node{"3", "192.168.0.0.3"},
        Node{"4", "192.168.0.0.4"},
    }

    ch.SetReplicas(2)

    // 先测试2个实节点,2个虚拟槽的情况
    err := ch.Add(&nodes[0])
    if err!=nil {
        fmt.Println("add nodes fails:",err)
        return      
    }
    err = ch.Add(&nodes[1])
    if err!=nil {
        fmt.Println("add nodes fails:",err)
        return      
    }
    test(&ch, SLOT_MAX)
    test(&ch, SLOT_MAX*2)
    test(&ch, SLOT_MAX*3)

    // 先测试3个实节点,2个虚拟槽的情况
    err = ch.Add(&nodes[2])
    if err!=nil {
        fmt.Println("add nodes fails:",err)
        return      
    }   
    test(&ch, SLOT_MAX)
    test(&ch, SLOT_MAX*2)
    test(&ch, SLOT_MAX*3)

    // 先测试4个实节点,2个虚拟槽的情况
    err = ch.Add(&nodes[3])
    if err!=nil {
        fmt.Println("add nodes fails:",err)
        return      
    }   
    test(&ch, SLOT_MAX)
    test(&ch, SLOT_MAX*2)
    test(&ch, SLOT_MAX*3)

    
    // 2个实节点,50个虚拟槽的情况
    ch.Clear()
    ch.SetReplicas(50)
    err = ch.Add(&nodes[0])
    if err!=nil {
        fmt.Println("add nodes fails:",err)
        return      
    }
    err = ch.Add(&nodes[1])
    if err!=nil {
        fmt.Println("add nodes fails:",err)
        return      
    }
    test(&ch, SLOT_MAX)
    test(&ch, SLOT_MAX*2)
    test(&ch, SLOT_MAX*3)   

    // 3个实节点,50个虚拟槽的情况
    err = ch.Add(&nodes[2])
    if err!=nil {
        fmt.Println("add nodes fails:",err)
        return      
    }

    test(&ch, SLOT_MAX)
    test(&ch, SLOT_MAX*2)
    test(&ch, SLOT_MAX*3)   
    // 4个实节点,50个虚拟槽的情况
    err = ch.Add(&nodes[3])
    if err!=nil {
        fmt.Println("add nodes fails:",err)
        return      
    }

    test(&ch, SLOT_MAX)
    test(&ch, SLOT_MAX*2)
    test(&ch, SLOT_MAX*3)
}
上一篇下一篇

猜你喜欢

热点阅读