Spring-Boot程序员分布式架构

基于redis的三级分布式缓存实现实例

2020-04-27  本文已影响0人  简单是美美

1. 分布式系统中的领域信息模型与缓存机制

  在领域驱动设计的方法中,确定领域的信息模型(知识模型)是系统设计的重要工作。在我们识别了领域模型中的实体,值对象和聚合对象之后,需要在面向对象的系统中将其实例化。
  在分布式系统中,不同的服务(子系统)可能关注于不同的实体对象,但也有一些公共的实体对象可能会被多个服务所关注。在分布式系统中,一些实体对象会被某些服务频繁使用,因而缓存机制在系统实现中必不可少。
  在实际应用中,我们通常使用三级缓存机制,即内存(一级)、redis(二级)、数据库(三级)三级数据缓存机制。在读取数据时依次从内存、redis、数据库中读取数据。在写入数据从依次从数据库、redis、内存中写入。
  根据spring的设计惯例,我们常常将一个实体对象的访问类用一个DAO对象来表示,在DAO对象中实现分布式的三级缓存机制。则使用DAO对象对应用系统屏蔽了缓存的实现细节。应用系统则只关注如何使用DAO与领域实体进行交互。
  分布式系统中常常使用一个共享的缓存中间件实现各服务(子系统)之间的缓存的交互,这个缓存中间件我们常常使用redis集群。对于分布式系统而言,在三级缓存体系中,各服务(子系统)拥有自己的内存缓存(一级)和独立的数据库(三级),共享一个二级缓存redis集群。
  在这样一个分布式的缓存体系中,缓存数据的同步是必须要考虑的。考虑这样一个场景:A服务修改了某个实体对象,同时更新到二级缓存,B服务再次使用该实体对象时,应该使用更新后的实体对象。如果使用redis集群来实现二级缓存,应该使用订阅发布机制来实现各服务(子系统)之间的缓存同步。
  同时,在一级缓存中,缓存的过期策略通常也需要设置。设置的目的也是为了确保数据不同步引发的错误是可自愈的,参数常用的是写过期时间和读过期时间两类。
  整个分布式应用系统中的三级缓存架构如图1所示。


图1.png

2 spring提供的缓存机制

  spring框架中提供对缓存的支持,支持如图2所示的缓存类型。


图2.png

  spring框架中对缓存的支持主要有两个接口的实现来支持,一个为CacheManager接口(org.springframework.cache.CacheManager),一个为Cache接口(org.springframework.cache.Cache)。
  spring框架中对缓存的应用主要使用:@Cacheable @CachePut @CacheEvict @Caching等注解实现。
  相关的资料可从网上获取,在此不再赘诉。

3 分布式缓存实现实例

  在我们的实际应用项目中,我们设计了一种分布式的三级缓存机制,这里使用了redis作为二级缓存,caffeine作为一级缓存,数据库作为三级存储介质。

3.1 实现类的层次

  在实际应用中,针对redis的各数据类型设计了对不同数据类型的抽象操作子类,对于领域内各实体对象的操作DAO类则继承自这些redis的抽象操作子类。其类层次结构图如图3所示:


图3.png

3.2 一级缓存的实现与策略配置

  在实现中,我们使用caffeine作为一级缓存,实现了自定义的CacheManager,可实现动态与指定名称cache的生成,并可针对指定名字的cache配置不同的缓存策略。其部分实现代码如下:

/**
     * 自定义cacheManager,实现动态生成的cache使用缺省的配置
     * 
     * @author zhang.kai
     *
     */
    public class VlineCacheManager extends SimpleCacheManager {
        @Override
        protected Cache getMissingCache(String cacheName) {
            return createCaffeineCache(cacheName, cacheProperties.getDefaultspec());
        }
    }

    /**
     * CacheManager对象注入
     * 
     * @return
     */
    @Bean
    public CacheManager getCacheManager() {
        if ((null == cacheProperties) || (null == cacheProperties.getDefaultspec())
                || (null == cacheProperties.getCachespecs())) {
            log.error("cacheProperties is invalid:{}", cacheProperties);
            return null;
        }
        VlineCacheManager cm = new VlineCacheManager();
        List<Cache> caches = new ArrayList<>();
        cacheProperties.getCachespecs().keySet().forEach(cacheName -> {
            String cacheSpec = cacheProperties.getCachespecs().get(cacheName);
            if (StringUtils.isEmpty(cacheName)) {
                log.error("XXX no cacheSpec for cacheName{}", cacheName);
                return;
            }
            CaffeineCache cache = createCaffeineCache(cacheName, cacheSpec);
            caches.add(cache);
        });
        if (caches.size() > 0) {
            cm.setCaches(caches);
        } else {
            log.error("XXX no cache inited!");
            return null;
        }

        // 设置redis的发布ID
        redisPublisherId = getRedisPublishId();
        log.info("%%%%%get redisPublisherId:{}",redisPublisherId);
        return cm;
    }

  对应的配置文件中,对cache的读写策略配置如下:

vline:
  cache:
    defaultspec: initialCapacity=50,maximumSize=500,expireAfterWrite=5s,expireAfterAccess=500s
    cachespecs:
      CONFIG_INFO: initialCapacity=50,maximumSize=500,expireAfterWrite=5s,expireAfterAccess=500s
      USER_LOGIN_DEVICE_INFO: initialCapacity=60,maximumSize=500,expireAfterWrite=5s,expireAfterAccess=7s

3.3 缓存读取方式

  对于三级缓存依次从内存,redis,数据库中获取,其代码片段如下所示:

/**
     * 从内存,redis,DB中依次获取实体
     * 
     * @param key
     * @return value
     */
    public T get(String key) {
        T value = null;

        // 1.如果在内存缓存中,则获取内存缓存
        if (enableMemoryCached) {
            value = getFromCacheManager(key);
            if (null != value) {
                log.debug("***get from memory***{}->{}", getRedisKey(key), value);
                return value;
            }
        }
        // 2.从redis中获取,对于键不存在的hash,redis返回empty map
        value = getFromRedis(key);
        if (null != value) {
            // 从redis中获取并回写到内存
            log.debug("***get from redis***{}->{}", getRedisKey(key), value);
            putToCacheManager(key, value);
        } else {
            if (enableDbLoad) {
                // 3.从数据库中获取并回写到redis和内存,可能存在竞争,重试一定次数
                RedisLock redisLock = new RedisLock(srt, getRedisKey(key) + "_sync_lock");
                int retryCount = 0;
                do {
                    try {
                        // 如果获取到分布式锁,则从数据库中获取,取完后返回
                        if (redisLock.lock()) {
                            value = loadValueFromDb(key);
                            log.debug("***get from db***{}->{}", getRedisKey(key), value);
                            if (null != value) {
                                set(key, value); // 回写入redis与内存
                            }
                            threadAwait.signalAll(getRedisKey(key)); // 唤醒所有等待该key的线程
                            return value;
                        }
                        // 如果未获取到分布式锁,说明别的进程正在进行查询数据库,等待一段时间后查询redis
                        threadAwait.await(getRedisKey(key), CacheConsts.LOAD_FROM_DB_WAIT_TIME);
                        value = getFromRedis(key);
                        if (null != value) {
                            if (enableMemoryCached) {
                                putToCacheManager(key, value);
                            }
                            return value;
                        }
                    } catch (Exception e) {
                        log.error("get" + getRedisKey(key) + "fail!", e);
                    } finally {
                        redisLock.unlock();
                    }
                    retryCount++;
                } while (retryCount < this.retryTimes);
            }
        }
        return value;
    }  

  这里考虑当多个进程(线程)读取同一key时,使用分布式锁来实现。使得只有第一个获取锁的进程(线程),会去三级存储数据库中获取并存入redis中。其它读取该key的进程(线程)将在redis中获取。这里代码中使用的分布式锁和线程唤醒对象参考了来自https://github.com/wyh-spring-ecosystem-student/spring-boot-student处的代码。

3.4 缓存的失效与同步

  在一级缓存caffeine中,我们设置了缓存的失效策略。同时在分布式系统中,不同的进程可能都会在一级缓存中缓存相同的key,因此在对应的redis key发生变化时(修改、删除),应该通知倒各进程,这个通过redis的订阅发布机制实现。
  一个进程可能是某个key发生改变的发起者,则它应该在订阅到该key发生变化时不做任何处理,这里我们通过在发布信息中增加一个发布者的唯一标识来实现,发布者的唯一标识在public CacheManager getCacheManager()函数中实现。在redis中发布的信息形式为:{redisPublishId}${keyPrefix}:{key}。
  每个进程的监听处理函数是这样的:

@Override
    public void onMessage(Message message, byte[] pattern) {
        if (null != message.getBody()) {
            String key = new String(message.getBody(), Charset.forName("UTF8"));
            log.debug("+++reveive msg:{} which topic is:{}", key,
                    new String(message.getChannel(), Charset.forName("UTF8")));
            String publisherId = findRedisPublisherId(key);
            // 如果是本进程发布的,不处理
            if (StringUtils.isEmpty(publisherId) || publisherId.equals(CacheConfiguration.redisPublisherId)) {
                return;
            }
            // 不是本进程发布的,在一级缓存中失效.发布的key形式为:{redisPublishId}${keyPrefix}:{key}
            String redisKey = key.substring(key.indexOf(CacheConsts.REDIS_PUBLISHER_ID_SEPARATE) + 1);
            String keyPrefix = findKeyprefixFromRedisKey(redisKey);
            Cache cache = cm.getCache(keyPrefix);
            if (null != cache) {
                // 从一级缓存中删除
                log.debug("---evict key:{} from cache:{}", redisKey, cache.getName());
                cache.evict(redisKey);
            }
        } else {
            log.error("RedisMsgListener onMessage null!");
        }
    }

4 小结

  本文所描述的示例代码可从:https://github.com/solarkai/distributedcache
处获取。
  对于三级缓存方式,网上有很多不同的实现方式。在我们的实现中,针对redis的不同数据类型衍生出不同的针对领域实体的DAO对象,这些对象屏蔽了缓存使用的细节;同时,在我们的实现中,实现了动态生成与指定名称的缓存生成方式,可针对不同的缓存配置不同的策略;针对一级缓存的同步,我们使用redis的订阅发布机制实现,并在发布内容中增加发布者的标识,避免重复的同步操作。

上一篇下一篇

猜你喜欢

热点阅读