spring boot异步调用批处理
2018-06-24 本文已影响189人
周六不算加班
在spring boot项目中实现一个批量调用外部接口功能,由于每次调用接口都会比较浪费时间,
所以打算用异步来实现这个批量的调用的功能。
具体的思路是:
1、建立一个全局的队列来存储数据。
2、在批量调用外部接口的时候,只是往队列中添加相关的数据,添加成功以后就返回。
3、创建aop,在@After中也就是在队列添加完数据以后,调用异步方法。
4、创建异步方法,在异步方法中是真正调用外部接口实现。
在异步方法中注入别的bean方法容易报错,一般是多个线程同时注入同一个bean造成的,
加上@Lazy注解就能解决了
功能实现:
1、建立全局队列
@Component
public class QueueUtils {
private static QueueUtils instance;
static final int FILE_QUEUE_SIZE = 10000;// 阻塞队列大小
private static BlockingQueue<Map<String,Object>> queue = new ArrayBlockingQueue<Map<String,Object>>(FILE_QUEUE_SIZE);
/**
* 构造方法,private不让外界生成队列工具类
*/
private QueueUtils(){
}
/**
* 添加队列元素
* @param map
* @throws InterruptedException
*/
public static void put(Map<String,Object> map) throws InterruptedException {
queue.put(map);
}
/**
* 获取队列
* @return
*/
public static BlockingQueue<Map<String,Object>> getQueue(){
return queue;
}
}
2、添加队列值
@Controller
public class TestController {
/**
* aop测试
*/
@RequestMapping(value = "/aopTest")
@ResponseBody
public Object aopTest() throws InterruptedException {
for (int i = 0; i<11;i++){
Map<String,Object> map = new HashMap<String, Object>();
map.put("userId",i);
map.put("amount",5);
QueueUtils.put(map);
}
String temp = "aopTest";
return temp;
}
}
3、aop实现
@Aspect
@Component
public class testIntercept {
@Autowired
private MyAsyncTask myAsyncTask;
private final static Logger logger = LoggerFactory.getLogger(testIntercept.class);
@Pointcut("execution(public * com.caody.muyi.TestController.aopTest())")
public void testAop(){};
@After("testAop()")
public void after(){
String aaa = "执行完主体方法以后才会执行的方法";
myAsyncTask.refreshMyDbAsync();
logger.info(aaa);
}
}
4、异步方法实现
@Component
public class MyAsyncTask {
private Logger logger = LoggerFactory.getLogger(getClass());
@Async
public void refreshMyDbAsync() {
BlockingQueue<Map<String,Object>> queue = QueueUtils.getQueue();
//队列方式遍历,元素逐个被移除
while (queue.peek() != null) {
Map<String,Object> map = queue.poll();
logger.info("userId:"+map.get("userId")+" amount:"+map.get("amount"));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
5、主方法上面开始异步
@EnableCaching //开启缓存
@EnableAsync(proxyTargetClass = true)//开启异步处理
@SpringBootApplication
public class MuYiApplication {
public static void main(String[] args) {
SpringApplication.run(MuYiApplication.class, args);
}
}