java 设计

基于Redis集群的通用缓存架构--项目介绍

2020-01-18  本文已影响0人  十六线程序员

写这篇博客的目的呢就是想好好总结并做一个介绍,如果哪里说不好或者有误恳请大家指点,谢谢...
项目地址https://github.com/XMUTLZY/api-cache

Redis环境

这里主要使用cluster模式搭建,楼主去阿里云买了台学生机,搭了个3个主节点和3个从节点的集群环境,这里就不具体介绍了,后续可以考虑好好总结下这个环境搭建~

主项目介绍

项目结构

项目采用springboot+maven搭建


项目结构.png
运行效果

二话不说,先试下运行效果。我们知道Redis支持五大数据类型,我们以String类型为例,执行put操作即增加缓存数据。


运行.png

这里传入三个参数,其中member表示项目的唯一表示,因为既然要实现一个通用的缓存架构,那么就要有个字段来标识不同项目,避免key冲突;再传入一个键值对即可。再去实际环境测试下是否成功,结果如下图。


测试.png

注意到这里实际的key值并不是我们传入的参数key,而是member:key,这样做的目的我们看个图你们就清楚了

redis数据缓存结构.png
这个就是redis缓存架构的思想,我们只需要调用该接口并传入指定参数就可;之后就是在代码复用性和优化上下功夫了。接下来看下具体代码实现和一些细节处理~
代码实现

SpringBoot是传统的MVC模式开发框架,这次使用String类型的获取缓存数据这个方法。
1、Controller

@Controller
@RequestMapping(value = "/cache/string")
public class StringCacheController {
    @Autowired
    private StringCacheService stringCacheService;

    /**
     * 获取缓存数据
     * @Params: member、key
     */
    @RequestMapping(value = "/get", method = RequestMethod.POST)
    @ResponseBody
    @KeyRequired
    public BaseResponse stringGet(@RequestBody CacheRequest cacheRequest) {
        return stringCacheService.get(cacheRequest);
    }
}

这里自定义了@KeyRequired注解,用于判断输入的member(用于区分不同项目)和key字段是否符合要求,如下图。

@Target({ElementType.METHOD})//标识该注解标记在方法上
@Retention(RetentionPolicy.RUNTIME)//运行时注解
public @interface KeyRequired {
}

定义好注解后,需要配合拦截器进行处理,首先添加拦截器。

@Configuration
public class InterceptorConfig implements WebMvcConfigurer {
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(keyCheckInterceptor());
    }

    @Bean
    public KeyCheckInterceptor keyCheckInterceptor() {
        return new KeyCheckInterceptor();
    }
}
/**
 * Created by Jake.lin on 2019/12/09
 * @Tips: 判断key值是否存在  拦截器
 */
public class KeyCheckInterceptor extends HandlerInterceptorAdapter {
    @Resource
    private KeyService keyService;
    public static final Integer MEMBER_BE_NULL = 431;
    public static final Integer KEY_BE_NULL = 432;
    public static final Integer KEY_NO_EXISTS = 433;
    public static final String CHARSET_ENCODING = "UTF-8";

    /*
     * 注意:这里我们使用了拦截器对请求进行处理已经获取到请求体,后续就会出现request body miss的情况
     */
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        if (!(handler instanceof HandlerMethod)) { // 如果注解不标记在方法上,则不进行拦截
            return true;
        }
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method = handlerMethod.getMethod();
        if (method.getAnnotation(KeyRequired.class) != null) {//判断方法上是否有该注解
            JSONObject jsonObject = SystemUtils.getRequestBody(request);// 这一步目的就是为了获取请求体
            String member = (String) jsonObject.get("member");
            String key = (String) jsonObject.get("key");
            if (!StringUtils.hasText(member)) {//member为空
                buildHttpServletResponse(response, MEMBER_BE_NULL, "{\"status\":" + MEMBER_BE_NULL + ",\n\"message\":\"the member can't no be null.\"}");
                return false;
            } else if (!StringUtils.hasText(key)) {//key为空
                buildHttpServletResponse(response, KEY_BE_NULL, "{\"status\":" + KEY_BE_NULL + ",\n\"message\":\"the key can't no be null.\"}");
                return false;
            } else {//(member:key)不存在
                List<String> keyList = Arrays.asList(SystemUtils.buildKey(member, key));
                CacheRequest cacheRequest = new CacheRequest();
                cacheRequest.setMemberKeyList(keyList);
                if (!keyService.isExistsByKeyList(cacheRequest).get(keyList.get(0))) {//这里做这么多的目的就是想知道该(member:key)是否存在
                    buildHttpServletResponse(response, KEY_NO_EXISTS, "{\"status\":" + KEY_NO_EXISTS + ",\n\"message\":\"the key(member+key) no exist.\"}");
                    return false;
                }
            }
        }
        return true;
    }

    private void buildHttpServletResponse(HttpServletResponse response, Integer statusCode, String message) throws IOException {
        response.setContentType(MediaType.APPLICATION_JSON_VALUE);
        response.setCharacterEncoding(CHARSET_ENCODING);
        response.setStatus(statusCode);
        response.getWriter().write(message);
        response.getWriter().close();
    }
}

上面代码就通过注解实现了对member和key进行处理的逻辑,其中getRequestBody()和buildKey()是自己定义的方法

public class SystemUtils {
    /**
     * @Tips: convert key to standard format
     */
    public static String buildKey(String member, String key) {
        return member + ":" + key;
    }

    /**
     * @Iips: build error response
     */
    public static void buildErrorResponse(BaseResponse baseResponse) {
        baseResponse.setStatus(BaseResponse.FAILD_STATUS);
        baseResponse.setStatusCode(BaseResponse.FAILD_CODE);
        baseResponse.setMessage("system error." + SystemUtils.dateToFormat(new Date()));
    }

    /**
     * @Tips: get the request body by HttpServletRequest
     */
    public static JSONObject getRequestBody(HttpServletRequest request) throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(request.getInputStream()));
        String str = "";
        String wholeStr = "";
        while ((str=bufferedReader.readLine()) != null) {
            wholeStr += str;
        }
        return JSONObject.parseObject(wholeStr);
    }
}

2、Service

@Service
public class StringCacheService {
    @Autowired
    private StringCacheRepository stringCacheRepository;

    @Override
    public CacheResponse get(CacheRequest cacheRequest) {
        CacheResponse cacheResponse = stringCacheRepository.get(cacheRequest);
        if (!StringUtils.hasText(cacheResponse.getValue())) {
            cacheResponse.setMessage("no find value.");
        }
        return cacheResponse;
    }
}

3、Repository
这里直接和Redis操作,来看代码。

@Repository
public class StringCacheRepository {
    @Autowired
    private JedisCluster jedisCluster;

    public CacheResponse get(CacheRequest cacheRequest) {
        CacheResponse cacheResponse = new CacheResponse();
        try {
            Object value = jedisCluster.get(SystemUtils.buildKey(cacheRequest.getMember(), cacheRequest.getKey()));
            cacheResponse.setValue(value.toString());
        } catch (Exception e) {
            SystemUtils.buildErrorResponse(cacheResponse);
        }
        return cacheResponse;
    }
}

这里引入了JedisCluster类,那肯定要先配置和加载该类

@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig {
    @Value("${spring.redis.cluster.nodes}")
    private String clusterNodes;
    @Value("${spring.redis.timeout}")
    private int timeout;
    @Value("${spring.redis.jedis.pool.max-idle}")
    private int maxIdle;
    @Value("${spring.redis.jedis.pool.max-wait}")
    private int maxWaitMillis;

    @Bean
    public JedisCluster getJedisCluster() {
        String[] cNodes = clusterNodes.split(",");
        Set<HostAndPort> nodes = new HashSet<>();
        for(String node : cNodes) {
            String[] hp = node.split(":");
            nodes.add(new HostAndPort(hp[0], Integer.parseInt(hp[1])));
        }
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(maxIdle);
        jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
        return new JedisCluster(nodes, jedisPoolConfig);
    }
}

上面就是Redis支持的String类型的其中一个操作,Redis支持五大数据类型(String、List、Set、Hash和ZSort(有序集合)),对应我们就得有五个Controller进行处理,还有对Key值操作也需要新建一个Controller,更多操作可以看Jedis的APl https://blog.csdn.net/zhangguanghui002/article/details/78770071

功能优化

1、piplined管道功能
Redis cluster集群模式并不支持管道模式

image.png
个人理解:从代码角度来说,Piplined对象是从JedisPool中获取的,Cluster模式多少个主节点就有多少个JedisPool对象,存入的key值并不能保证都是存在于同一个节点中,因此无法实现管道功能。
针对这个原因,思考下,能否对属于同一个节点的key使用同一个Piplined对象,三个节点我们就需要获取三个Piplined对象。但是由于JedisCluster并没有把每个主节点对应的JedisPool对象暴露给我们,我们也就不能获取到Piplined对象了。这里的解决方案我参考了https://www.jianshu.com/p/54a754c85f81这位大神的写法,讲得很清楚。
2、Mongdb+AOP监控数据请求
先写个Service来做记录服务
@Service
public class RecordService {
    @Autowired
    private MongoTemplate mongoTemplate;

    @Transactional
    public RedisLogDoc insert(RecordRequest recordRequest) {
        RedisLogDoc redisLogDoc = new RedisLogDoc(); 
        BeanUtils.copyProperties(recordRequest, redisLogDoc);
        return mongoTemplate.save(redisLogDoc);
    }
}

再就是定义aop实现请求监控了

@Component
@Aspect
public class RecordControllerRequestAop {
    @Autowired
    private RecordService recordService;
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Before("pointCut()")//切入点之前
    public void Before() {
        logger.info("before controller init");
    }

    @Pointcut("execution(* sch.xmut.jake.cache.apicache.web.controller..*.*(..))")//切入点
    public void pointCut() {
        logger.info("pointCut controller init");
    }

    @AfterReturning(returning = "response", value = "pointCut()")//执行完切入点之后
    public void afterRunning(JoinPoint joinPoint, Object response) {
        logger.info("afterRunning init");
        BaseResponse baseResponse = (BaseResponse) response; // 切入点方法的返回值
        CacheRequest cacheRequest = (CacheRequest) joinPoint.getArgs()[0];// request参数,根据实际需求写
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        RecordRequest recordRequest = buildRecordReuqest(request, cacheRequest);// build一个参数传入记录的service
        recordRequest.setMethod(joinPoint.getSignature().getName());
        if (BaseResponse.SUCCESS_CODE == baseResponse.getStatusCode()) {
            recordService.insert(recordRequest);
        } else {
            logger.warn("response error, skip controller record");
        }
        logger.info("complete controller record");
    }

    private RecordRequest buildRecordReuqest(HttpServletRequest request, CacheRequest cacheRequest) {
        RecordRequest recordRequest = new RecordRequest();
        recordRequest.setUrl(request.getRequestURL().toString());
        recordRequest.setContentType(request.getContentType());
        recordRequest.setMethodType(request.getMethod());
        recordRequest.setParams(JSONObject.toJSONString(cacheRequest));
        recordRequest.setProjectMember(cacheRequest.getMember());
        recordRequest.setRecordType(CacheConstans.RECORD_TYPE_CONTROLLER);
        recordRequest.setCreateTime(SystemUtils.dateToFormat(new Date()));
        return recordRequest;
    }
}

效果如下


Mongodb.png

其中record_type字段是后续加上去的,因为当我使用Dubbo+Zookeeper作为调用框架之后,需要把service暴露出来,这时候对该项目的调用就只是实现对service的调用了,并不会走接口。传统的Rest或者httpClient调用还是从Controller作为入口。

3、Dubbo+Zookeeper集成
环境搭建:我也只是简单的在服务器上搭建了单机的环境,先将Zookeeper部署下来,再去获取Dubbon-admin的war包,记得修改Dubbo的配置,把注册中心改为Zookeeper的服务器地址,在把war包放入tomcat,启动之后就可以了。在这个过程中,楼主遇到了很多问题...比如zookeeper启动成功,但是状态显示缺提示没启动,后来发现是8080端口被占用了...具体搭建楼主也没有做研究,就只是了解~~后续还需加强学习...

搭建好了先引入两个包

<!-- zookeeper客户端 -->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.7</version>
            <exclusions><!-- 包冲突 -->
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.gitee.reger/spring-boot-starter-dubbo -->
        <dependency>
            <groupId>com.gitee.reger</groupId>
            <artifactId>spring-boot-starter-dubbo</artifactId>
            <version>1.1.3</version>
        </dependency>

dubbo和zookeeper配置

#dubbo+zookeeper
spring.dubbo.application.name=api-cache-provider
spring.dubbo.base-package=sch.xmut.jake.cache.apicache.service //需要暴露的服务所在的包
spring.dubbo.registry.address=your ip
spring.dubbo.registry.port=2181
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20890
spring.dubbo.provider.timeout=5000

最后再把service包下的@Service注解改成Dubbo包下的注解

import com.alibaba.dubbo.config.annotation.Service;

总结

楼主是个小白实习生...写这篇博客只是做个总结,你们可以以怀疑的态度来看这篇博客,中间可能会有哪里说法不对或者写的有误,我不一定都是对的,请大家指教... 后续我会对这次用到的技术进行系统的学习,谢谢~

上一篇下一篇

猜你喜欢

热点阅读