聊聊缓存

2020-03-25  本文已影响0人  红瓦李

本地缓存:mybatis实现:装饰器模式实践

public void setSize(final int size) {
    keyMap = new LinkedHashMap<Object, Object>(size, .75F, true) {
      private static final long serialVersionUID = 4267176411845948333L;

      @Override
      protected boolean removeEldestEntry(Map.Entry<Object, Object> eldest) {
        boolean tooBig = size() > size;
        if (tooBig) {
          eldestKey = eldest.getKey();
        }
        return tooBig;
      }
    };
  }
private void cycleKeyList(Object key) {
    keyList.addLast(key);
    if (keyList.size() > size) {
      Object oldestKey = keyList.removeFirst();
      delegate.removeObject(oldestKey);
    }
  }
public class BlockingCache implements Cache {
  private long timeout;
  private final Cache delegate;
  private final ConcurrentHashMap<Object, ReentrantLock> locks;
}

集中式缓存:Memcached

业务逻辑:查询缓存,若未命中,触发加载db数据进缓存

//查询缓存
Object value = cache.get(key);
if (value != null) return value;
//查询db数据放入缓存中再返回。
value = loadingFromDb(key);
cache.put(key, value)
return value;

考虑有一下几个问题:

//查询缓存
Object value = cache.get(key);
if (value != null) return value;
//多线程环境下加锁防止加载逻辑被触发多次,或者加分布式锁
reentrantLock.tryLock(ms);
//查询db数据放入缓存中再返回。
value = loadingFromDb(key);
cache.put(key, value);
//解锁
reentrantLock.unlock();
return value;
/**
     * 异步缓存重刷
     *
     * @param cachedObject          缓存数据实体
     * @param realKey               真实的缓存KEY
     * @param logicalExpireSeconds  逻辑失效
     * @param physicalExpireSeconds 物理失效
     * @param cacheNullable         是否允许cacheNull
     * @param func                  异步reload业务回调
     * @param clazz                 数据返回类型
     * @param <T>                   数据加载回调返回类型
     */
    private <T> T asyncReloadAndGetCachedResult(CacheObject<T> cachedObject, String realKey, int logicalExpireSeconds,
                                                int physicalExpireSeconds, boolean cacheNullable, Func<T> func,
                                                Class<T> clazz, Lock lock) {
        T result = null;
        if (cachedObject != null) {
            if (cachedObject.value != null) {
                if (clazz == null || clazz.isAssignableFrom(cachedObject.value.getClass())) {
                    //clazz没设定,类型转换异常只能再业务层发现了
                    result = cachedObject.value;
                    if (cachedObject.isLogicExpire()) {
                        //软过期,触发异步重刷机制,返回缓存中数据
                        asyncReload(realKey, logicalExpireSeconds, physicalExpireSeconds, cacheNullable, func, clazz,
                            lock);
                    }
                } else {
                    //如果缓存中的数据会导致类型转换异常
                    LOGGER.error("cached result class type is not match! expect:{}, actual:{}", clazz,
                        cachedObject.value.getClass());
                    result = syncReloadAndGet(realKey, logicalExpireSeconds, physicalExpireSeconds, cacheNullable,
                        func, clazz);
                }
            } else {
                if (cacheNullable) {
                    if (cachedObject.isLogicExpire()) {
                        //软过期,触发异步重刷机制,返回缓存中数据
                        asyncReload(realKey, logicalExpireSeconds, physicalExpireSeconds, true, func, clazz, lock);
                    }
                } else {
                    //不允许cache缓存却拿到了null(可能修改了nullable参数),这里强制进行一次缓存刷新,避免业务层发生错误
                    result = syncReloadAndGet(realKey, logicalExpireSeconds, physicalExpireSeconds, false,
                        func, clazz);
                }
            }
        }
        return result;
    }
/**
     * 异步缓存重刷
     *
     * @param realKey               真实的缓存KEY
     * @param logicalExpireSeconds  逻辑失效
     * @param physicalExpireSeconds 物理失效
     * @param cacheNullable         是否允许cacheNull
     * @param func                  异步reload业务回调
     * @param tClass                缓存的对象类型
     * @param <T>                   数据加载回调返回类型
     */
    private <T> void asyncReload(String realKey, int logicalExpireSeconds, int physicalExpireSeconds,
                                 boolean cacheNullable, Func<T> func, Class<T> tClass, Lock lock) {
        try {
            //if (!LOCAL_QUEUED_WORKING_TASK_SET.contains(realKey)) {
            JedisLock jedisLock = cacheRefreshLockFactory.buildLock(realKey);
            if (jedisLock.tryLock()) {
                try {
                    //更新待处理任务列表,避免重入
                    //LOCAL_QUEUED_WORKING_TASK_SET.add(realKey);
                    CACHE_REFRESH_EXECUTOR.execute(new CacheRefreshTask(realKey, () -> {
                        try {
                            if (lock != null) {
                                //阻塞等待主线程完成剩余操作
                                lock.lock();
                            }
                            try {
                                T value = func.invoke();
                                if (cacheNullable) {
                                    refreshCache(realKey, value, logicalExpireSeconds, physicalExpireSeconds, tClass);
                                } else {
                                    if (value != null) {
                                        refreshCache(realKey, value, logicalExpireSeconds, physicalExpireSeconds,
                                            tClass);
                                    }
                                }
                            } finally {
                                if (lock != null) {
                                    lock.unlock();
                                }
                            }
                        } catch (Exception ex) {
                            LOGGER.error("reload cache of {} failed!", realKey, ex);
                        } finally {
                            //移除任务
                            //LOCAL_QUEUED_WORKING_TASK_SET.remove(realKey);
                        }
                    }));
                } finally {
                    //do nothing
                }
            } else {
                //若任务正在处理中,不做任何处理。
                //此处不能更新LOCAL_QUEUED_WORKING_TASK_SET,因为任务不再当前实例上得到运行,任务不会得到更新。
            }
            //} else {
            //    LOGGER.warn("task is in queue waiting for being executed!");
            //}
        } catch (Exception e) {
            LOGGER.error("asyncReload failed! realKey:{}", realKey, e);
        }
    }

分布式缓存

参考:https://www.cnblogs.com/moonandstar08/p/5405991.html
当缓存的数据量超过单机缓存的上线,就需要引入多台缓存服务器 组成缓存集群,对缓存请求做负载均衡,每台机器负责 1/n的 请求,常用算法有:轮循算法(Round Robin)、哈希算法(HASH)、最少连接算法(Least Connection)、响应速度算法(Response Time)、加权法(Weighted ),但考虑到缓存服务的特殊性,他需要在机器上存储数据,常见的这些算法(除hash算法外)会造成缓存数据冗余,并且命中率不高,造成集群利用率降低,而且随着系统访问量上升,压力越来越大,实际压力超过服务器的能承受的上限,出现节点宕机的情况,后续会通过加节点机器的形式来缓解压力,在分布式缓存中,经常会出现增删机器的情况,引发缓存的重分布,在重分布过程中,大量请求到达db,极端情况下引起数据库宕机

哈希算法:i = hash(key) mod N,对将要存入的key做hash计算,然后对可用的机器总数取模,得出的编号i,就是实际缓存需要存放的目标机器编号,在增删节点机器的情况下,N发生变化,导致现有缓存失效,引发大量的缓存重分布,造成db压力过大,极端情况下引起数据库宕机

一致性哈希算法:Consistent Hashing,思路是将所有缓存机器编号,计算hash值,放入一个拥有2的31次方的hash环中,对将要存入的缓存key做hash计算,得出hash值,取hash环按顺时针寻找,找到的第一个可用的hash值,它所对应的机器,就是目标缓存机器。

分析:

缺点:一致性哈希算法的使用,因为集群机器的hash值在hash环中的位置问题,可能会引发数据倾斜,出现集群中某些节点量很大,其他节点存储量很小,可以引入虚拟节点(即为集群中每台机器计算多次hash值)hash环中放置虚拟节点,多个虚拟节点和机器之间维护一份映射,存储数据时,通过虚拟节点找到机器,通过这总方式,让集群机器在hash环中的分布更分散,已达到均匀分布的效率。

举例:dubbo消费方对提供方的调用,采用一致性hash算法来做的话,会为每个提供方Invoker生成160份虚拟节点,以此让请求的分发更加均匀。

实现:TreeMap(存储hash环) + hashMap(维护虚拟节点和机器的映射),参考dubbo的一致性hash算法实现

private static final class ConsistentHashSelector<T> {
        private final TreeMap<Long, Invoker<T>> virtualInvokers;
        private final int                       replicaNumber;
        private final int                       identityHashCode;  
        private final int[]                     argumentIndex;
        //初始化hash环
        public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = System.identityHashCode(invokers);
            URL url = invokers.get(0).getUrl();
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i ++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(invoker.getUrl().toFullString() + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }
        //顺时针查找满足条件的机器
        private Invoker<T> sekectForKey(long hash) {
            Invoker<T> invoker;
            Long key = hash;
            if (!virtualInvokers.containsKey(key)) {
                SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
                if (tailMap.isEmpty()) {
                    key = virtualInvokers.firstKey();
                } else {
                    key = tailMap.firstKey();
                }
            }
            invoker = virtualInvokers.get(key);
            return invoker;
        }
}
上一篇 下一篇

猜你喜欢

热点阅读