分布式方案

一致性Hash算法

2021-05-19  本文已影响0人  肥兔子爱豆畜子

引入问题

我们要存储“图片名-图片url”,这样一个k-v对,我们使用N个缓存节点来存储。
如何决定给定的一个key,即图片名,在N个节点中的哪个存储value也就是url呢?

传统哈希取模:存储节点ID = hash(key) % N

首先,随机分配肯定不行,这样查找的时候,只能遍历N个节点,然后逐个判断key在不在这个节点上。
那么,hash再取模算法呢?也就是: 存储节点ID = hash(key) % N
这样在客户端就可以马上根据key经过计算就知道数据存在哪个节点上了,看起来这是个不错的方法。
但是,这个方法的问题在于,一旦缓存集群中的节点增加或者是减少,对应实际情况可能是扩容、或者是节点宕机。当客户端被通知到N变化为N1之后,计算存储节点ID的方法变为 节点ID = hash(key) % N1,我们分存和取两个方面来看会发生什么,首先新数据按照新的N1计算应该把数据存放在哪个节点,而取,新的数据因为是按照N1规则可以取到,但是原来按照N规则存放的老数据按照新的N1规则来取,是取不到的。所以必须将原来的旧数据重新按照N1规则来存放,势必要来一波数据的复制,而且涉及到所有的节点。
相当于对于缓存来说,占缓存中大部分的旧数据都发生了缓存失效,也就是出现了缓存雪崩。
这就是问题的所在。传统的对数据key做哈希取模来确定存放节点ID的方法没法应对节点变化的情况

解决节点变化问题的一致性hash算法

一致性hash算法是MIT的Karger等人为了解决数据如何在分布式缓存的多个节点中平均的、分片存储提出的算法。

算法思路

这个算法的思路是:
想象一个由0到2^32-1 数值组成的圆环,从0点位按照顺时针顺序在圆环上排列,对于缓存集群中的N个节点,根据Hash(集群标识,IP或者是节点名)计算出hash值,然后标注到哈希圆环上。
存放和查找数据使用的时候使用如下规则:计算出数据的hash值,比如hash(key),然后将hash值定位到哈希圆环上的一个点,从此点沿着圆环按照顺时针方向找到的第一个缓存节点就是存放或取数的节点。


一致性Hash算法

节点变化的场景

一致性Hash算法对于节点的增减都只需重定位环空间中的一小部分数据,有很好的容错性和可扩展性。
只有一少部分数据发生了缓存失效,在查找的时候这部分数据可能在新定位的节点中找不到。那么重新从后面一层比如数据库或者文件系统加载一次然后旧到缓存里了。原定位的节点中可能会有一部分冗余数据,不过缓存数据一般会设置过期策略,比如最少使用原则,这些冗余数据因为按照新的定位规则是被客户端访问不到的,所以会很快被从缓存中去除掉。
所以,笔者理解,一致性Hash算法中的“一致”,准确来说应该叫做“大部分一致”或“尽量一致”,指的是在节点数量变化的时候,大部分的数据按照哈希环算法定位到的存取节点,跟节点变化之前是一致的。只有一少部分发生了变化。这减少了缓存失效的现象,提升了整个分布式缓存集群的命中率。

NodeC宕机、节点减少 新扩容了服务器X、节点增加

虚拟节点

如果节点过少,那么有可能出现一种情况:数据根据hash(key) % (2^32-1)计算出的值再按照顺时针原则,都集中的落在了哈希环上的某一个节点上了,而我们期望的是数据能够平均的分布在各个节点上。这个时候一致性hash算法中引入了虚拟节点机制。即将当前较少数据的节点虚拟出几个分身,散落在哈希环上,用来增加数据定位到自己的概率。


虚拟节点促进平均分布

代码模拟

hash算法

/**
 * 用来计算节点和数据key对应hash值的hash算法
 */
public class HashUtil {

    /**
     * google的MurMur哈希算法 性能高、碰撞率低
     */
    public static Long MurMurHash(String key) {
        ByteBuffer buf = ByteBuffer.wrap(key.getBytes());
        int seed = 0x1234ABCD;

        ByteOrder byteOrder = buf.order();
        buf.order(ByteOrder.LITTLE_ENDIAN);

        long m = 0xc6a4a7935bd1e995L;
        int r = 47;
        long h = seed ^ (buf.remaining() * m);
        long k;
        while (buf.remaining() >= 8) {
            k = buf.getLong();
            k *= m;
            k ^= k >>> r;
            k *= m;
            h ^= k;
            h *= m;
        }

        if (buf.remaining() > 0) {
            ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
            finish.put(buf).rewind();
            h ^= finish.getLong();
            h *= m;
        }

        h ^= h >>> r;
        h *= m;
        h ^= h >>> r;

        buf.order(byteOrder);
        return h;
    }
}

哈希环:

/**
 * 哈希环
 * 有2^32个slot, 顺时针分别存放0到2^32-1的hash值
 * 用来根据node的key存放node,以及根据缓存对象的key按顺时针规则判定其存放在哪个节点
 * */
public class HashRing<T>{
    private TreeMap<Long, T> nodes = new TreeMap<Long, T>(); //缓存节点,包括虚拟节点
    private final long hashSlot = 1L<<32; //哈希环上的slot个数 2^32
    
    //向哈希环存放节点
    public void addNode(String nodeKey, T node) {
        nodes.put(slotOffset(nodeKey), node);
    }
    
    //从哈希环中删除节点
    public void removeNode(String nodeKey) {
        nodes.remove(slotOffset(nodeKey));
    }
    
    //根据节点的key查询节点
    public T getNodeByKey(String key) {
        Long slot = slotOffset(key);
        return nodes.get(slot);
    }
    
    //根据缓存对象的key,按顺时针规则判定其应该存放在哪个节点
    public T findNodeForObjectKey(String objectKey) {
        Long slot = slotOffset(objectKey);
        Entry<Long,T> entry = nodes.ceilingEntry(slot);//按顺时针原则找到第一个节点
        if(null==entry) //转了一圈没找到,那就是环上的第一个节点
            return nodes.firstEntry().getValue();
        else
            return entry.getValue();
    }
    
    //给定一个key,返回其映射到哈希环上的一个slot编号,映射方法是hash再对slot个数取模
    private Long slotOffset(String key) {
        return HashUtil.MurMurHash(key) % hashSlot;
    }
}

有了hash算法和哈希环,就可以构建我们的模拟缓存集群了:

/**
 * 缓存集群中的节点
 * */
public class CacheNode {
    private String ip;  //节点ip
    private String nodeName; //节点名
    private Map cache;
    private boolean dummyNode = false; //是否是虚拟节点
    
    public CacheNode(String ip, String nodeName) {
        this.ip = ip;
        this.nodeName = nodeName;
        cache = new HashMap();
    }
    
    public String getIp() {
        return ip;
    }

    public String getNodeName() {
        return nodeName;
    }

    public boolean isDummyNode() {
        return dummyNode;
    }

    public void setDummyNode(boolean dummyNode) {
        this.dummyNode = dummyNode;
    }
    
    public int getCacheObjectCount() {
        return this.cache.size();
    }

    public void put(String key, Object value) {
        cache.put(key, value);
    }
    
    public Object get(String key) {
        return cache.get(key);
    }
}
/**
 * 分布式缓存集群
 * */
public class DistributedCacheCluster {
    private HashRing<CacheNode> hashRing;
    private int dummyNodePerNode = 300; //集群中每个节点需要额外分出来多少个虚拟节点
    
    public DistributedCacheCluster() {
        hashRing = new HashRing<CacheNode>();
    }
    
    //增加节点及其虚拟节点到集群
    public void addCacheNode(CacheNode cacheNode) {
        String nodeIp = cacheNode.getIp();
        //添加真实节点
        hashRing.addNode(nodeIp, cacheNode);
        //添加虚拟节点
        for(int i=1; i<=dummyNodePerNode; i++) {
            CacheNode vnode = new CacheNode(nodeIp, nodeIp+"虚拟节点-"+i);
            vnode.setDummyNode(true);
            hashRing.addNode(nodeIp+"-"+i, vnode);
        }
    }
    
    //从集群中删除指定ip的节点及其虚拟节点
    public void deleteCacheNode(String ip) {
        hashRing.removeNode(ip);
        for(int i=1; i<=dummyNodePerNode; i++) {
            hashRing.removeNode(ip+"-"+i);
        }
    }
        
    //根据ip查询真实缓存节点
    public CacheNode getCacheNode(String ip) {
        return hashRing.getNodeByKey(ip);
    }
    
    //根据缓存对象的key来定位节点(到哪个节点进行存取)
    public CacheNode findNodeForCacheObjectKey(String objectKey) {
        CacheNode node = hashRing.findNodeForObjectKey(objectKey);
        if(node.isDummyNode()) { //如果是虚拟节点,要么找到对应的真实节点返回
            return getCacheNode(node.getIp());
        }else {
            return node;
        }
    }

}

测试验证:

int nodeCount = 8; //节点数
        int cacheObjectCount = 10000; //需要缓存的对象个数
        String ip_prefix = "192.168.0.";
        
        //缓存集群初始化
        DistributedCacheCluster cluster = new DistributedCacheCluster();
        for(int i=1; i<=nodeCount; i++) {
            CacheNode node = new CacheNode(ip_prefix+i, ip_prefix+i+"缓存节点");
            cluster.addCacheNode(node);
        }
        
        //向缓存集群添加10000个缓存对象
        for(int i=0; i<cacheObjectCount; i++) {
            String key = UUID.randomUUID().toString(); //缓存对象的key
            Object value = "valueOf-"+key;  //缓存对象的value
            CacheNode node = cluster.findNodeForCacheObjectKey(key); //根据key定位到集群中的一个节点
            node.put(key, value); //把缓存对象存入这个节点
        }
        
        //看一下缓存对象在集群中各(真实)节点中的分布情况
        int total = 0;
        for(int i=1; i<=nodeCount; i++) {
            CacheNode node = cluster.getCacheNode(ip_prefix+i);
            logger.info("节点" + node.getIp() + "缓存了" + node.getCacheObjectCount() + "个数据");
            total = total + node.getCacheObjectCount();
        }
        logger.info("集群中一共缓存了" + total + "个数据");

运行结果:

节点192.168.0.1缓存了1166个数据
节点192.168.0.2缓存了1285个数据
节点192.168.0.3缓存了1194个数据
节点192.168.0.4缓存了1108个数据
节点192.168.0.5缓存了1258个数据
节点192.168.0.6缓存了1268个数据
节点192.168.0.7缓存了1414个数据
节点192.168.0.8缓存了1307个数据
集群中一共缓存了10000个数据

本文是自己的理解的笔记性质,一些个内容和图片并非原创而是来自于网络,笔者只是看过之后试着凭自己的理解和印象再复述和实现一遍,向原作者们表示感谢。

上一篇下一篇

猜你喜欢

热点阅读