spring boot异步调用批处理

2018-06-24  本文已影响189人  周六不算加班

在spring boot项目中实现一个批量调用外部接口功能,由于每次调用接口都会比较浪费时间,
所以打算用异步来实现这个批量的调用的功能。
具体的思路是:
1、建立一个全局的队列来存储数据。
2、在批量调用外部接口的时候,只是往队列中添加相关的数据,添加成功以后就返回。
3、创建aop,在@After中也就是在队列添加完数据以后,调用异步方法。
4、创建异步方法,在异步方法中是真正调用外部接口实现。
在异步方法中注入别的bean方法容易报错,一般是多个线程同时注入同一个bean造成的,
加上@Lazy注解就能解决了

功能实现:
1、建立全局队列
@Component
public class QueueUtils {

private static QueueUtils instance;

static final int FILE_QUEUE_SIZE = 10000;// 阻塞队列大小

private static BlockingQueue<Map<String,Object>> queue = new ArrayBlockingQueue<Map<String,Object>>(FILE_QUEUE_SIZE);

/**
 * 构造方法,private不让外界生成队列工具类
 */
private QueueUtils(){

}



/**
 * 添加队列元素
 * @param map
 * @throws InterruptedException
 */
public static void put(Map<String,Object>  map) throws InterruptedException {
    queue.put(map);
}

/**
 * 获取队列
 * @return
 */
public static BlockingQueue<Map<String,Object>> getQueue(){
    return queue;
}

}
2、添加队列值
@Controller
public class TestController {

/**
 * aop测试
 */
@RequestMapping(value = "/aopTest")
@ResponseBody
public Object aopTest() throws InterruptedException {
    for (int i = 0; i<11;i++){
        Map<String,Object> map = new HashMap<String, Object>();
        map.put("userId",i);
        map.put("amount",5);
        QueueUtils.put(map);
    }
    String temp =  "aopTest";
    return temp;
}

}

3、aop实现
@Aspect
@Component
public class testIntercept {

@Autowired
private MyAsyncTask myAsyncTask;

private final static Logger logger = LoggerFactory.getLogger(testIntercept.class);

@Pointcut("execution(public * com.caody.muyi.TestController.aopTest())")
public void testAop(){};


@After("testAop()")
public void after(){

    String aaa = "执行完主体方法以后才会执行的方法";
    myAsyncTask.refreshMyDbAsync();
    logger.info(aaa);
}

}
4、异步方法实现
@Component
public class MyAsyncTask {

private Logger logger = LoggerFactory.getLogger(getClass());

@Async
public void refreshMyDbAsync()  {
    BlockingQueue<Map<String,Object>> queue = QueueUtils.getQueue();
    //队列方式遍历,元素逐个被移除
    while (queue.peek() != null) {
        Map<String,Object> map = queue.poll();

        logger.info("userId:"+map.get("userId")+" amount:"+map.get("amount"));
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

}
5、主方法上面开始异步
@EnableCaching //开启缓存
@EnableAsync(proxyTargetClass = true)//开启异步处理
@SpringBootApplication
public class MuYiApplication {

public static void main(String[] args) {
    SpringApplication.run(MuYiApplication.class, args);
}

}

上一篇下一篇

猜你喜欢

热点阅读