27 rpc远程调用幂等性的问题

2020-07-23  本文已影响0人  滔滔逐浪
image.png

RPC幂等框架
是远程调用接口Http+JSOn 格式传输
接口的超时就是服务器端没有及时的响应给客户端
什么时候会发出超时策略:
调用接口超时时间设置为5S<业务响应的时间
接口如果超时的情况下会发出什么策略?
重试, 重试的过程中业务逻辑会重复执行。

全部id

image.png

从Redis删除该全局id 能够删除成功表示可以执行业务逻辑,删除失败则表示业务逻辑已经执行,适合于前端页面表单提交。

redis多路复用原则 本身线程安全

image.png

通过ConcurrentHashMap和切面解决幂等性:

1,使用到的jar


 <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>5.2.7.RELEASE</version>
        </dependency>

2,producer 服务请求接口:

package com.taotao.lizi.mideng.master;

import com.github.kevinsawicki.http.HttpRequest;
import com.taotao.lizi.service.ConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.Console;
import java.util.UUID;

/**
 * @author aping
 * @time 2020/7/23 13:30
 */
@RestController
@Slf4j
public class ProducerController {
    @Autowired
    private ConsumerService consumerService;

    @RequestMapping("/producer")
    public  String producer(Integer id,Integer f){
        //生成token
        String token= UUID.randomUUID().toString();
        String body=null;
        for (int i = 0; i <f ; i++) {
           //rpc调用slave-node 服务
            body = HttpRequest.get("http://127.0.0.1:8082/consumer?id=" + id)
                    .header("idempotent-token", token)
                    .readTimeout(2000)
                    .connectTimeout(2000)
                    .body();
             log.info(body);
        }
        return  body;
    }
}



3,Consumer服务

package com.taotao.droolsdemo;

import com.taotao.droolsdemo.config.Idempotent;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author aping
 * @time 2020/7/23 14:18
 */
@RestController
public class ConsumerController {

    @Idempotent()
    @RequestMapping("/consumer")
    public  String consumer(Integer id){
         System.out.println("执行业务逻辑:{}"+id);
         return  id+"";
    }

}



package com.taotao.droolsdemo.config;

import java.lang.annotation.*;

/**
 * 在rpc被调用方方法上添加@Idempotent注解用于幂等性问题
 * @author aping
 * @time 2020/7/23 14:20
 */
@Target(value = {ElementType.TYPE, ElementType.METHOD})//使用位置(类,方法)
@Retention(RetentionPolicy.RUNTIME)//加载到jvm里运行
public @interface Idempotent {
}


package com.taotao.droolsdemo.config;



import lombok.extern.slf4j.Slf4j;
import net.minidev.json.JSONUtil;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.json.JSONObject;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;


/**
 * @author aping
 * @time 2020/7/23 14:26
 */
@Aspect
@Component
@Slf4j
public class IdempotentAspect {
private ConcurrentHashMap concurrentHashMap=new ConcurrentHashMap();
    //指定包下的注解, 2个 .. 表示所有的子目录,最后括号里的2个.. 代表所有的参数
    @Pointcut("@annotation(com.taotao.droolsdemo.config.Idempotent)")
    public  void logPointCut(){

    }
    //指定当前方法在logPointCut之前执行
    @Before("logPointCut()")
    public void doBefore(JoinPoint joinPoint)throws  Throwable{

        //接收请求,记录请求
        ServletRequestAttributes attributes= (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request=attributes.getRequest();
        // 记录下请求内容
        log.info("请求地址 : " + request.getRequestURL().toString());
        log.info("HTTP METHOD : " + request.getMethod());
        // 获取真实的ip地址
        //logger.info("IP : " + IPAddressUtil.getClientIpAddress(request));
        log.info("CLASS_METHOD : " + joinPoint.getSignature().getDeclaringTypeName() + "."
                + joinPoint.getSignature().getName());
        log.info("参数 : " + Arrays.toString(joinPoint.getArgs()));
        //loggger.info("参数 : " + joinPoint.getArgs());
    }


    @AfterReturning(returning = "ret",pointcut = "logPointCut()")
    //returning的值和 doAfterReturning的参数名一致
    public  void doAfterReturning(Object ret)throws  Throwable{
        //处理完请求返回内容(返回值太复杂时,打印的是物理存储空间的地址)
        log.info("返回值:"+ret);
    }

    @Around("@annotation(Idempotent)")
    public Object around(ProceedingJoinPoint  jp) throws Throwable {
        try {
            //Class clazz = jp.getTarget().getClass();
            //String methodName = jp.getSignature().getName();
            Class[] parameterTypes = ((MethodSignature) jp.getSignature()).getMethod().getParameterTypes();
            Method methdo = jp.getTarget().getClass().getMethod(jp.getSignature().getName(), parameterTypes);
            if (methdo.getAnnotation(Idempotent.class) != null) {
                // String value = methdo.getAnnotation(ApiOperation.class).value();

                // 1.获取request对象
                HttpServletRequest request = ((ServletRequestAttributes) (RequestContextHolder.currentRequestAttributes())).getRequest();
                // 获取token
                String token = request.getHeader("idempotent-token");
                //Console.log("idempotent-token:{}", token);

                // 2.获取session或redis
                HttpSession session = request.getSession();

                // 3.判断是否重复提交
                String key = jp.getSignature() + "/" + Arrays.toString(jp.getArgs()); // 定义key名称
                //Console.log("方法key:{}", key);
                if (concurrentHashMap.get(key) == null || !concurrentHashMap.get(key).equals(token)) {
                    concurrentHashMap.put(key,token);
                    log.info("token:"+token);

                } else {
                    System.out.println("重复提交:{}"+ key);
                    JSONObject obj = new JSONObject();
                    obj.put("code", "202");
                    obj.put("msg", "重复提交");
                    return obj.toString();
                }
            }
        } catch (Exception ex) {
            ex.toString();
        }

        // 正常执行目标业务
        return jp.proceed();
    }



}


实现的效果


image.png
image.png
上一篇 下一篇

猜你喜欢

热点阅读