异常补偿机制(队列)

2019-11-06  本文已影响0人  zianL

前言

项目中常常与第三方系统交互的场景,但是往往我们不能保证每一次网络请求都是正常的,但是常规的做法是对发起http请求的方法进行异常捕获,在处理异常中进行一次重试,若还是失败就丢弃该请求,很明显这么不能满足我们业务场景;

方案

我们基于异常捕获重试的方案进行进一步改进,获取请求参数、执行的方法、该方法所属类 封装成一个对象,放入队列。由异步线程去消费这个对象,从这么对象中拿到类、和方法名反射获取methor ,构造请求参数,重新执行该方法直至成功为止;

package com.gzcb.creditcard.quartz.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;

/**
 * @description: 异常补偿队列
 * @author: libingdao
 * @create: 2019-11-04 15:06
 **/
@Component
@Slf4j
public class ExceptionQueue {
    private static final PriorityBlockingQueue<ExceptionCompensateDTO> message = new PriorityBlockingQueue();
    private final ExecutorService pools =  Executors.newCachedThreadPool();
    private final ExecutorService subpools =  Executors.newFixedThreadPool(2);
    /**
     * 是否启动消费者
     */
    private volatile boolean enable = true;

    @Async
    public void consumer(){
        while (!message.isEmpty()) {
            ExceptionCompensateDTO poll = message.poll();
            pools.submit(()->{
                //出队并删除,假如执行失败重新入队
                if(!StringUtils.isEmpty(poll)){
                    log.error("业务执行: {}",poll);
                    ClassMethod.invokeMethod(poll.getObj(),poll.getMethodName(),poll.getArgs());
                }
            });
        }
        enable = true;
        log.error("子线程执行结束");
    }

    public void  add(ExceptionCompensateDTO exceptionCompensateDTO){
        log.info("exceptionCompensateDTO: {}",exceptionCompensateDTO);
        if(!StringUtils.isEmpty(exceptionCompensateDTO)){
            message.add(exceptionCompensateDTO);
        }
        log.debug("enable:{}",enable);
        if(enable){
            enable = false;
            subpools.submit(()->{
                log.debug("启动子线程");
                consumer();
            });
        }
    }
}

ExceptionQueue 控制消息的出入队列

package com.gzcb.creditcard.quartz.utils;

import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Method;

/**
 * @description:
 * @author: libingdao
 * @create: 2019-11-07 11:06
 **/
public class ClassMethod {
    /**
     * @param obj        调用此方法的对象
     * @param methodName 方法名称
     * @param args       调用的这个方法的参数参数列表
     */
    public static void invokeMethod(Object obj, String methodName, Object[] args) {
        Class argsClass[] = null;
        //1.参数存在
        if (args != null) {
            int len = args.length;
            argsClass = new Class[len];
            //2.根据参数得到相应的 Class的类对象
            for (int i = 0; i < len; ++i) {
                argsClass [i] = args[i].getClass();
            }
        }
        // 找到方法
        Method method = ReflectionUtils.findMethod(obj.getClass(), methodName, argsClass);
        // 执行方法
        ReflectionUtils.invokeMethod(method, obj, args);
    }
}

ClassMethod 反射获取被执行者对象和方法、参数然后执行;

package com.gzcb.creditcard.quartz.utils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @description:
 * @author: libingdao
 * @create: 2019-11-07 11:04
 **/
@Component
public class DeclareMethodA {
    public DeclareMethodA(){}
    @Autowired Test test;
    public void showA(Integer a,String name,NewClass nc){
        System.out.println("Integer="+a+" name="+name+" NewClass="+nc);
        test.xx();
    }
     public static class NewClass{
    }
}

DeclareMethodA 被执行对象

package com.gzcb.creditcard.quartz.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @description:
 * @author: libingdao
 * @create: 2019-11-07 11:04
 **/
@Component
public class DeclareMethodB {
    public DeclareMethodB(){}
    @Autowired Test test;
    public void show11(Integer a,String name){
        System.out.println("Integer11="+a+" name11="+name);
        test.xx();
    }
     public static class NewClass{
    }
}

DeclareMethodB被执行对象

package com.gzcb.creditcard.quartz.utils;
import org.springframework.stereotype.Component;
/**
 * @description:
 * @author: libingdao
 * @create: 2019-11-07 11:11
 **/
@Component
public class Test {
    public void xx(){
        System.out.println("test.xx");
    }
}

Test 对象主要是验证能否调通spring bean

package com.gzcb.creditcard.quartz.utils;
import lombok.Data;
/**
 * @description:
 * @author: libingdao
 * @create: 2019-11-04 15:13
 **/
@Data
public class ExceptionCompensateDTO implements Comparable{
    public Object obj;
    public String methodName;
    public Object[] args;
    public ExceptionCompensateDTO(Object obj, String methodName, Object[] args) {
        this.obj = obj;
        this.methodName = methodName;
        this.args = args;
    }
    @Override
    public int compareTo(Object o) {
        return 0;
    }
}
package com.gzcb.creditcard.quartz.sms;

import com.alibaba.fastjson.JSONObject;
import com.gzcb.creditcard.QuartzApplication;
import com.gzcb.creditcard.config.sms.bean.SmsTemplateBody;
import com.gzcb.creditcard.config.sms.bean.SmsTemplateQuery;
import com.gzcb.creditcard.quartz.utils.*;
import com.gzcb.creditcard.service.SMSService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = QuartzApplication.class)
public class SmsTemplateServiceImplTest {
    @Autowired
    DeclareMethodA declareMethod;
    @Autowired
    DeclareMethodB declareMethod11;
    @Autowired ExceptionQueueTesy exceptionQueueTesy;

static ExecutorService mainpools =  Executors.newCachedThreadPool();
    @Test
    public void exceptionQueueTesy(){

        for (Integer i = 0;i<1000;i++) {
            Integer ss = i;
            mainpools.submit(()->{
                for (Integer j = 0;j<100;j++) {
                    ExceptionCompensateDTO dto =
                            new ExceptionCompensateDTO(declareMethod,"show",new Object[]{ss+j,"今天天气不错"+ss+j,new DeclareMethod.NewClass()});
                    exceptionQueueTesy.add(dto);
                }
            });
            mainpools.submit(()->{
                for (Integer j = 0;j<100;j++) {
                    ExceptionCompensateDTO dto =
                            new ExceptionCompensateDTO(declareMethod11,"show11",new Object[]{ss+j,"今天天气很糟糕"+ss+j});
                    exceptionQueueTesy.add(dto);
                }
            });
        }
        try {
            Thread.sleep(2000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("主线程执行结束");
    }
}

测试模拟多线程请求

上一篇 下一篇

猜你喜欢

热点阅读