基于Redis集群的通用缓存架构--项目介绍
写这篇博客的目的呢就是想好好总结并做一个介绍,如果哪里说不好或者有误恳请大家指点,谢谢...
项目地址:https://github.com/XMUTLZY/api-cache
Redis环境
这里主要使用cluster模式搭建,楼主去阿里云买了台学生机,搭了个3个主节点和3个从节点的集群环境,这里就不具体介绍了,后续可以考虑好好总结下这个环境搭建~
主项目介绍
项目结构
项目采用springboot+maven搭建
项目结构.png
- constants包:存放一些项目用到的常量
- document包:存放和Mongodb映射起来的实体类
-
http包:
http包.png
如图,包括request、response和vo实体
- repository包:存放直接和Redis进行操作的类
- service包:业务层,具体业务都在这里处理了
- web包:
annotation包——>存放自定义的注解
aop包——>利用Spring AOP特性,定义具体的aop实现类
config包——>系统用到的一些配置,比如Redis、Mongodb等
interceptor包——>拦截器配置
utils包——>存放一些方便开发的方法,一般就是一些经常要用到的方法给它抽出来
运行效果
二话不说,先试下运行效果。我们知道Redis支持五大数据类型,我们以String类型为例,执行put操作即增加缓存数据。
运行.png
这里传入三个参数,其中member表示项目的唯一表示,因为既然要实现一个通用的缓存架构,那么就要有个字段来标识不同项目,避免key冲突;再传入一个键值对即可。再去实际环境测试下是否成功,结果如下图。
测试.png
注意到这里实际的key值并不是我们传入的参数key,而是member:key,这样做的目的我们看个图你们就清楚了
这个就是redis缓存架构的思想,我们只需要调用该接口并传入指定参数就可;之后就是在代码复用性和优化上下功夫了。接下来看下具体代码实现和一些细节处理~
代码实现
SpringBoot是传统的MVC模式开发框架,这次使用String类型的获取缓存数据这个方法。
1、Controller
@Controller
@RequestMapping(value = "/cache/string")
public class StringCacheController {
@Autowired
private StringCacheService stringCacheService;
/**
* 获取缓存数据
* @Params: member、key
*/
@RequestMapping(value = "/get", method = RequestMethod.POST)
@ResponseBody
@KeyRequired
public BaseResponse stringGet(@RequestBody CacheRequest cacheRequest) {
return stringCacheService.get(cacheRequest);
}
}
这里自定义了@KeyRequired注解,用于判断输入的member(用于区分不同项目)和key字段是否符合要求,如下图。
@Target({ElementType.METHOD})//标识该注解标记在方法上
@Retention(RetentionPolicy.RUNTIME)//运行时注解
public @interface KeyRequired {
}
定义好注解后,需要配合拦截器进行处理,首先添加拦截器。
@Configuration
public class InterceptorConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(keyCheckInterceptor());
}
@Bean
public KeyCheckInterceptor keyCheckInterceptor() {
return new KeyCheckInterceptor();
}
}
/**
* Created by Jake.lin on 2019/12/09
* @Tips: 判断key值是否存在 拦截器
*/
public class KeyCheckInterceptor extends HandlerInterceptorAdapter {
@Resource
private KeyService keyService;
public static final Integer MEMBER_BE_NULL = 431;
public static final Integer KEY_BE_NULL = 432;
public static final Integer KEY_NO_EXISTS = 433;
public static final String CHARSET_ENCODING = "UTF-8";
/*
* 注意:这里我们使用了拦截器对请求进行处理已经获取到请求体,后续就会出现request body miss的情况
*/
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (!(handler instanceof HandlerMethod)) { // 如果注解不标记在方法上,则不进行拦截
return true;
}
HandlerMethod handlerMethod = (HandlerMethod) handler;
Method method = handlerMethod.getMethod();
if (method.getAnnotation(KeyRequired.class) != null) {//判断方法上是否有该注解
JSONObject jsonObject = SystemUtils.getRequestBody(request);// 这一步目的就是为了获取请求体
String member = (String) jsonObject.get("member");
String key = (String) jsonObject.get("key");
if (!StringUtils.hasText(member)) {//member为空
buildHttpServletResponse(response, MEMBER_BE_NULL, "{\"status\":" + MEMBER_BE_NULL + ",\n\"message\":\"the member can't no be null.\"}");
return false;
} else if (!StringUtils.hasText(key)) {//key为空
buildHttpServletResponse(response, KEY_BE_NULL, "{\"status\":" + KEY_BE_NULL + ",\n\"message\":\"the key can't no be null.\"}");
return false;
} else {//(member:key)不存在
List<String> keyList = Arrays.asList(SystemUtils.buildKey(member, key));
CacheRequest cacheRequest = new CacheRequest();
cacheRequest.setMemberKeyList(keyList);
if (!keyService.isExistsByKeyList(cacheRequest).get(keyList.get(0))) {//这里做这么多的目的就是想知道该(member:key)是否存在
buildHttpServletResponse(response, KEY_NO_EXISTS, "{\"status\":" + KEY_NO_EXISTS + ",\n\"message\":\"the key(member+key) no exist.\"}");
return false;
}
}
}
return true;
}
private void buildHttpServletResponse(HttpServletResponse response, Integer statusCode, String message) throws IOException {
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
response.setCharacterEncoding(CHARSET_ENCODING);
response.setStatus(statusCode);
response.getWriter().write(message);
response.getWriter().close();
}
}
上面代码就通过注解实现了对member和key进行处理的逻辑,其中getRequestBody()和buildKey()是自己定义的方法
public class SystemUtils {
/**
* @Tips: convert key to standard format
*/
public static String buildKey(String member, String key) {
return member + ":" + key;
}
/**
* @Iips: build error response
*/
public static void buildErrorResponse(BaseResponse baseResponse) {
baseResponse.setStatus(BaseResponse.FAILD_STATUS);
baseResponse.setStatusCode(BaseResponse.FAILD_CODE);
baseResponse.setMessage("system error." + SystemUtils.dateToFormat(new Date()));
}
/**
* @Tips: get the request body by HttpServletRequest
*/
public static JSONObject getRequestBody(HttpServletRequest request) throws IOException {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(request.getInputStream()));
String str = "";
String wholeStr = "";
while ((str=bufferedReader.readLine()) != null) {
wholeStr += str;
}
return JSONObject.parseObject(wholeStr);
}
}
2、Service
@Service
public class StringCacheService {
@Autowired
private StringCacheRepository stringCacheRepository;
@Override
public CacheResponse get(CacheRequest cacheRequest) {
CacheResponse cacheResponse = stringCacheRepository.get(cacheRequest);
if (!StringUtils.hasText(cacheResponse.getValue())) {
cacheResponse.setMessage("no find value.");
}
return cacheResponse;
}
}
3、Repository
这里直接和Redis操作,来看代码。
@Repository
public class StringCacheRepository {
@Autowired
private JedisCluster jedisCluster;
public CacheResponse get(CacheRequest cacheRequest) {
CacheResponse cacheResponse = new CacheResponse();
try {
Object value = jedisCluster.get(SystemUtils.buildKey(cacheRequest.getMember(), cacheRequest.getKey()));
cacheResponse.setValue(value.toString());
} catch (Exception e) {
SystemUtils.buildErrorResponse(cacheResponse);
}
return cacheResponse;
}
}
这里引入了JedisCluster类,那肯定要先配置和加载该类
@Configuration
@ConditionalOnClass({JedisCluster.class})
public class RedisConfig {
@Value("${spring.redis.cluster.nodes}")
private String clusterNodes;
@Value("${spring.redis.timeout}")
private int timeout;
@Value("${spring.redis.jedis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.max-wait}")
private int maxWaitMillis;
@Bean
public JedisCluster getJedisCluster() {
String[] cNodes = clusterNodes.split(",");
Set<HostAndPort> nodes = new HashSet<>();
for(String node : cNodes) {
String[] hp = node.split(":");
nodes.add(new HostAndPort(hp[0], Integer.parseInt(hp[1])));
}
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
return new JedisCluster(nodes, jedisPoolConfig);
}
}
上面就是Redis支持的String类型的其中一个操作,Redis支持五大数据类型(String、List、Set、Hash和ZSort(有序集合)),对应我们就得有五个Controller进行处理,还有对Key值操作也需要新建一个Controller,更多操作可以看Jedis的APl https://blog.csdn.net/zhangguanghui002/article/details/78770071
功能优化
1、piplined管道功能
Redis cluster集群模式并不支持管道模式
个人理解:从代码角度来说,Piplined对象是从JedisPool中获取的,Cluster模式多少个主节点就有多少个JedisPool对象,存入的key值并不能保证都是存在于同一个节点中,因此无法实现管道功能。
针对这个原因,思考下,能否对属于同一个节点的key使用同一个Piplined对象,三个节点我们就需要获取三个Piplined对象。但是由于JedisCluster并没有把每个主节点对应的JedisPool对象暴露给我们,我们也就不能获取到Piplined对象了。这里的解决方案我参考了https://www.jianshu.com/p/54a754c85f81这位大神的写法,讲得很清楚。
2、Mongdb+AOP监控数据请求
先写个Service来做记录服务
@Service
public class RecordService {
@Autowired
private MongoTemplate mongoTemplate;
@Transactional
public RedisLogDoc insert(RecordRequest recordRequest) {
RedisLogDoc redisLogDoc = new RedisLogDoc();
BeanUtils.copyProperties(recordRequest, redisLogDoc);
return mongoTemplate.save(redisLogDoc);
}
}
再就是定义aop实现请求监控了
@Component
@Aspect
public class RecordControllerRequestAop {
@Autowired
private RecordService recordService;
private Logger logger = LoggerFactory.getLogger(getClass());
@Before("pointCut()")//切入点之前
public void Before() {
logger.info("before controller init");
}
@Pointcut("execution(* sch.xmut.jake.cache.apicache.web.controller..*.*(..))")//切入点
public void pointCut() {
logger.info("pointCut controller init");
}
@AfterReturning(returning = "response", value = "pointCut()")//执行完切入点之后
public void afterRunning(JoinPoint joinPoint, Object response) {
logger.info("afterRunning init");
BaseResponse baseResponse = (BaseResponse) response; // 切入点方法的返回值
CacheRequest cacheRequest = (CacheRequest) joinPoint.getArgs()[0];// request参数,根据实际需求写
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
RecordRequest recordRequest = buildRecordReuqest(request, cacheRequest);// build一个参数传入记录的service
recordRequest.setMethod(joinPoint.getSignature().getName());
if (BaseResponse.SUCCESS_CODE == baseResponse.getStatusCode()) {
recordService.insert(recordRequest);
} else {
logger.warn("response error, skip controller record");
}
logger.info("complete controller record");
}
private RecordRequest buildRecordReuqest(HttpServletRequest request, CacheRequest cacheRequest) {
RecordRequest recordRequest = new RecordRequest();
recordRequest.setUrl(request.getRequestURL().toString());
recordRequest.setContentType(request.getContentType());
recordRequest.setMethodType(request.getMethod());
recordRequest.setParams(JSONObject.toJSONString(cacheRequest));
recordRequest.setProjectMember(cacheRequest.getMember());
recordRequest.setRecordType(CacheConstans.RECORD_TYPE_CONTROLLER);
recordRequest.setCreateTime(SystemUtils.dateToFormat(new Date()));
return recordRequest;
}
}
效果如下
Mongodb.png
其中record_type字段是后续加上去的,因为当我使用Dubbo+Zookeeper作为调用框架之后,需要把service暴露出来,这时候对该项目的调用就只是实现对service的调用了,并不会走接口。传统的Rest或者httpClient调用还是从Controller作为入口。
3、Dubbo+Zookeeper集成
环境搭建:我也只是简单的在服务器上搭建了单机的环境,先将Zookeeper部署下来,再去获取Dubbon-admin的war包,记得修改Dubbo的配置,把注册中心改为Zookeeper的服务器地址,在把war包放入tomcat,启动之后就可以了。在这个过程中,楼主遇到了很多问题...比如zookeeper启动成功,但是状态显示缺提示没启动,后来发现是8080端口被占用了...具体搭建楼主也没有做研究,就只是了解~~后续还需加强学习...
搭建好了先引入两个包
<!-- zookeeper客户端 -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.7</version>
<exclusions><!-- 包冲突 -->
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.gitee.reger/spring-boot-starter-dubbo -->
<dependency>
<groupId>com.gitee.reger</groupId>
<artifactId>spring-boot-starter-dubbo</artifactId>
<version>1.1.3</version>
</dependency>
dubbo和zookeeper配置
#dubbo+zookeeper
spring.dubbo.application.name=api-cache-provider
spring.dubbo.base-package=sch.xmut.jake.cache.apicache.service //需要暴露的服务所在的包
spring.dubbo.registry.address=your ip
spring.dubbo.registry.port=2181
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20890
spring.dubbo.provider.timeout=5000
最后再把service包下的@Service注解改成Dubbo包下的注解
import com.alibaba.dubbo.config.annotation.Service;
总结
楼主是个小白实习生...写这篇博客只是做个总结,你们可以以怀疑的态度来看这篇博客,中间可能会有哪里说法不对或者写的有误,我不一定都是对的,请大家指教... 后续我会对这次用到的技术进行系统的学习,谢谢~