27 rpc远程调用幂等性的问题
2020-07-23 本文已影响0人
滔滔逐浪
image.png
image.png
image.png
RPC幂等框架
是远程调用接口Http+JSOn 格式传输
接口的超时就是服务器端没有及时的响应给客户端
什么时候会发出超时策略:
调用接口超时时间设置为5S<业务响应的时间
接口如果超时的情况下会发出什么策略?
重试, 重试的过程中业务逻辑会重复执行。
全部id
image.png从Redis删除该全局id 能够删除成功表示可以执行业务逻辑,删除失败则表示业务逻辑已经执行,适合于前端页面表单提交。
redis多路复用原则 本身线程安全
image.png通过ConcurrentHashMap和切面解决幂等性:
1,使用到的jar
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>5.2.7.RELEASE</version>
</dependency>
2,producer 服务请求接口:
package com.taotao.lizi.mideng.master;
import com.github.kevinsawicki.http.HttpRequest;
import com.taotao.lizi.service.ConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.Console;
import java.util.UUID;
/**
* @author aping
* @time 2020/7/23 13:30
*/
@RestController
@Slf4j
public class ProducerController {
@Autowired
private ConsumerService consumerService;
@RequestMapping("/producer")
public String producer(Integer id,Integer f){
//生成token
String token= UUID.randomUUID().toString();
String body=null;
for (int i = 0; i <f ; i++) {
//rpc调用slave-node 服务
body = HttpRequest.get("http://127.0.0.1:8082/consumer?id=" + id)
.header("idempotent-token", token)
.readTimeout(2000)
.connectTimeout(2000)
.body();
log.info(body);
}
return body;
}
}
3,Consumer服务
package com.taotao.droolsdemo;
import com.taotao.droolsdemo.config.Idempotent;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author aping
* @time 2020/7/23 14:18
*/
@RestController
public class ConsumerController {
@Idempotent()
@RequestMapping("/consumer")
public String consumer(Integer id){
System.out.println("执行业务逻辑:{}"+id);
return id+"";
}
}
package com.taotao.droolsdemo.config;
import java.lang.annotation.*;
/**
* 在rpc被调用方方法上添加@Idempotent注解用于幂等性问题
* @author aping
* @time 2020/7/23 14:20
*/
@Target(value = {ElementType.TYPE, ElementType.METHOD})//使用位置(类,方法)
@Retention(RetentionPolicy.RUNTIME)//加载到jvm里运行
public @interface Idempotent {
}
package com.taotao.droolsdemo.config;
import lombok.extern.slf4j.Slf4j;
import net.minidev.json.JSONUtil;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.json.JSONObject;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author aping
* @time 2020/7/23 14:26
*/
@Aspect
@Component
@Slf4j
public class IdempotentAspect {
private ConcurrentHashMap concurrentHashMap=new ConcurrentHashMap();
//指定包下的注解, 2个 .. 表示所有的子目录,最后括号里的2个.. 代表所有的参数
@Pointcut("@annotation(com.taotao.droolsdemo.config.Idempotent)")
public void logPointCut(){
}
//指定当前方法在logPointCut之前执行
@Before("logPointCut()")
public void doBefore(JoinPoint joinPoint)throws Throwable{
//接收请求,记录请求
ServletRequestAttributes attributes= (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request=attributes.getRequest();
// 记录下请求内容
log.info("请求地址 : " + request.getRequestURL().toString());
log.info("HTTP METHOD : " + request.getMethod());
// 获取真实的ip地址
//logger.info("IP : " + IPAddressUtil.getClientIpAddress(request));
log.info("CLASS_METHOD : " + joinPoint.getSignature().getDeclaringTypeName() + "."
+ joinPoint.getSignature().getName());
log.info("参数 : " + Arrays.toString(joinPoint.getArgs()));
//loggger.info("参数 : " + joinPoint.getArgs());
}
@AfterReturning(returning = "ret",pointcut = "logPointCut()")
//returning的值和 doAfterReturning的参数名一致
public void doAfterReturning(Object ret)throws Throwable{
//处理完请求返回内容(返回值太复杂时,打印的是物理存储空间的地址)
log.info("返回值:"+ret);
}
@Around("@annotation(Idempotent)")
public Object around(ProceedingJoinPoint jp) throws Throwable {
try {
//Class clazz = jp.getTarget().getClass();
//String methodName = jp.getSignature().getName();
Class[] parameterTypes = ((MethodSignature) jp.getSignature()).getMethod().getParameterTypes();
Method methdo = jp.getTarget().getClass().getMethod(jp.getSignature().getName(), parameterTypes);
if (methdo.getAnnotation(Idempotent.class) != null) {
// String value = methdo.getAnnotation(ApiOperation.class).value();
// 1.获取request对象
HttpServletRequest request = ((ServletRequestAttributes) (RequestContextHolder.currentRequestAttributes())).getRequest();
// 获取token
String token = request.getHeader("idempotent-token");
//Console.log("idempotent-token:{}", token);
// 2.获取session或redis
HttpSession session = request.getSession();
// 3.判断是否重复提交
String key = jp.getSignature() + "/" + Arrays.toString(jp.getArgs()); // 定义key名称
//Console.log("方法key:{}", key);
if (concurrentHashMap.get(key) == null || !concurrentHashMap.get(key).equals(token)) {
concurrentHashMap.put(key,token);
log.info("token:"+token);
} else {
System.out.println("重复提交:{}"+ key);
JSONObject obj = new JSONObject();
obj.put("code", "202");
obj.put("msg", "重复提交");
return obj.toString();
}
}
} catch (Exception ex) {
ex.toString();
}
// 正常执行目标业务
return jp.proceed();
}
}
实现的效果
image.png
image.png