任务系统重构的一些思考
2019-10-14 本文已影响0人
卫渐行
需求背景
本文主要介绍自己在项目中设计任务系统的一些思考;当前常见的任务 是利用一些大数据的工具,比如说 hadoop 以及 spark,flink 等大数据处理框架, 来处理一些数据分析的工作,数据来源主要包括有边界的离线数据和无边界的流式的数据;但是对于业务性很强任务系统,需要结合项目特点,尽可能的实现任务与业务之间的联动以及之间的解耦,并且能够满足业务持续增长的需要;并且保证数据的准确性,实时性;
下面主要介绍自己在业务中任务设计实践的一些思考,分别包括 用户目标 GMV 的月度任务,以及新用户的分享任务;
用户目标 GMV 的月度任务
该任务是在一个月内,卖出累计达到目标 GMV 的用户,平台会对该用户进行对应比例 bonus 奖励, 任务特点是:周期性任务,并且任务之间没有关联关系;
新用户的三天分享的一次性关联任务
该任务是对于新加入平台的用户;分享平台的商品,可以获取到一定额度的奖励;该任务特点是;一次性任务、任务之间有依赖(第二天的任务,必须第一天的任务完成之后开启)
设计思路
最初的解决方案
- 建立任务池;建立任务的数据模型(包括任务类型;trigger 事件;重复周期;任务目标值,pre_task 等)
- 任务处理: 开始处理任务逻辑是非常粗暴的;比如说利用业务逻辑和任务逻辑同步的方式;代码耦和在一起,一旦业务发生变化,导致改动的代码比较多;而且对于不同任务的变种处理,增加了难度;
优化的解决方案
c168b4b826774bd697499be582b697e6_.png
- 利用 aws 的 sqs 实现任务与业务之间的解耦;
- 优化任务生成器的整个过程: 生成任务模型 -> 任务调度器 -> 任务处理器 的任务管理模式;
- 建立 uid+taskId+expired_at(一次性任务,取一个固定的大 date)的唯一索引;保证某一个时间段内,同一个用户的同一种任务的唯一性
业务实践
用户目标 GMV 的月度任务的解决方案(即用户卖出目标的 GMV,如一个月 10000 卢比,给予 GMV*3% 的 bonus 奖励)
- 接收到订单中心确认收货的队列消息;处理确认收货消息,并分发任务消息
- 接受到用户 GMV 增加(减少)订单的 GMV 消息,建立任务的数据模型
- 提交任务到任务管理器,任务调度分发任务到具体任务处理器
- 处理任务(完成任务之后,可根据任务类型,发 bonus 或产生子任务消息等其他操作)
任务调度与业务解耦(部分代码)
if (orderService.orderExists(orderName)) {
log.info("sync
order status start, event:{}, msgContent:{}", event, msgContent);
switch (event) {
case "order.confirm_received":
OrderDTO confirmOrderDTO = orderService.orderCenterDetail(orderName);
mqService.publishTask(confirmOrderDTO.getSellerId(), </br>confirmOrderDTO.getGmv().intValue(), EventEnum.ORDER_PAID.getEve
nt(), confirmOrderDTO);
jobService.afterOrderConfirm(confirmOrderDTO);
break;
case "fo.delivered":
case "fo.rejected":
Integer uid = obj.getInteger("user_id");
String fulfillmentOrderName = obj.get("fo_name").toString();
jobManageService.syncFoOrderState(orderName, uid, fulfillmentOrderName, event);
break;
}
}
生成任务模型
public List<UserTaskDO> listSellerSalesTaskByUid(Integer uid, Date taskAt) {</br>
return listTasksByUid(uid, sellerSalesTask, UserLevelEnum.TEAM_LEAD_SELLER.getLevel(), taskAt);</br>
}
提交任务 (部分代码)
public void submitOrderPaidTask(Integer uid, Integer increValue, Date taskAt) {
List<UserTaskDO> userTaskDOs = taskService.listSellerSalesTaskByUid(uid, taskAt);
UserDTO userDTO = userService.userDetail(uid);
jobService.progressSellerSalesTask(userTaskDO, increValue.doubleValue(), userDTO);
}
任务处理 (部分代码)
public TaskProgressRecordDO progressSellerSalesTask(UserTaskDO userTaskDO, Double orderGmv, UserDTO user) {
Boolean isSeller = user.getLevel() >= UserLevelEnum.SELLER.getLevel();
if (isSeller) {
bonusService.processTaskStateChanged( taskService.increUserTaskProgress(userTaskDO, orderGmv));
}
log.info("任务触发条件不符合, uid={}, taskId={}, isSeller={}",
user.getUid(), userTaskDO.getTaskId(), isSeller);
return null;
}
总结:
新用户的三天分享任务,也是基于上述的任务处理模型;稍微有点不同的是,通过任务模型中任务中 pre_task,parent_task 建立之间的关系,形成任务 DAG 模型;然后也是通过 sqs 解耦任务之间的关系;下面就不一一赘述;
开发人员在规定的时间和有限的资源下,需要完成产品的一些想法和功能时,可能在代码书写上,用了一些“捷径”;但是在业务以及功能上不断迭代的时候,会遇到一些问题,代码也会越来越臃肿;所以需要开发人员经常去审慎之前的代码;不断进行优化,满足未来的业务发展;要在重要的事情上,多花时间;