DDD中domain落地的思考

2023-06-21  本文已影响0人  小鲨鱼2333

定时任务扫描订单数据

任务描述:

设计为主任务和子任务,主任务只处理指定时间片,子任务会拆分主任务的时间片。其目的为拆分后的子任务可以并行执行,不会干扰,提供吞吐量。

主任务状态:新建、执行中、执行完毕
多次触发时,没有主任务则新建,执行中不处理,当前任务执行完毕,则需新建下一个时间段的主任务。

主任务初始化:
触发 job,主任务领域通过【加载最近执行的任务】或【新建主任务】方式,初始化主任务信息。

//获取最近同步的主任务  
OrderSyncMainTask lastSyncMainTask = MainTaskFactory.getOrInitLastMainTask();
public static OrderSyncMainTask getOrInitLastMainTask() {  
    // 任务查询并启动,暂时未考虑连续触发的情况(并发)  
    OrderSyncMainTask orderSyncMainTask = omsOrderSyncTaskRepo.getLastMainTask();  
    if (orderSyncMainTask == null) {  
        orderSyncMainTask = initMainTask(null);  
    }  
    return orderSyncMainTask;  
}

问题描述:
这里【主任务领域】初始化的方式,采用工厂+静态方法,对于外部传入的入参,是很容易加工生成目标类的,但如果需要再次读取 db 库的数据,工厂类就需要引入(注入)相关 service、repository。

@Slf4j  
@Component  
public class MainTaskFactory {
    static OmsOrderSyncTaskRepo omsOrderSyncTaskRepo;
    //从spring容器找到要用的bean
    @Autowired  
    public void setOmsOrderSyncTaskRepo(OmsOrderSyncTaskRepo omsOrderSyncTaskRepo) {  
        MainTaskFactory.omsOrderSyncTaskRepo = omsOrderSyncTaskRepo;  
    }
    //...
}

现在出现一个我们不熟悉的依赖方式,静态工厂的静态属性,需要引入 spring 的 bean 实例。

实际上,让 MainTaskFactory 做了初始化,从持久化数据库读数据,就必然依赖 spring 中的 bean:


image.png

整理下 MainTaskFactory 的工作职责:

总结为:factory 做了查询、构造、保存工作,这里分工混乱。

谁来读取持久化的数据(分工);
如何读取持久化的数据(实现方式);

谁来保存 domain 信息;

repo.save(domain)

domain 转换出 dto1,dto2,dto3
=======以下为同一事务=======
dto1Repo.save(dto1);
dto2Repo.save(dto2);
dto3Repo.save(dto3);

写到这里,发现写和读应该属于同一逻辑:

//repo 加载 domain 的方法:
public domain load(cmd.getId);

按以上想法,重新设计:

repo 层,从持久化服务中,加载需要的 dto,交给 factory 构造 domain。

public SyncOmsOrderTaskDomain loadLast() {  
MainTaskDTO mainTaskDTO = null;//omsOrderSyncTaskRepo.getLastMainTask();  
List<SubTaskDTO> subTaskDTOList = null;//omsOrderSyncTaskRepo.getSubTaskList(mainTaskDTO.getId());  
List<SubTaskRecordDTO> subTaskRecordDTOList = new ArrayList<>();//获取子任务执行记录,存在多个记录,取最后一次,或者设计上,保证每个子任务只有一个有效的记录  
return MainTaskFactory.create(mainTaskDTO, subTaskDTOList, subTaskRecordDTOList);  
}

factory 构造 domain 对象。

    public static SyncOmsOrderTaskDomain create(MainTaskDTO mainTaskDTO, List<SubTaskDTO> subTaskDTOList, List<SubTaskRecordDTO> subTaskRecordDTOList) {
        String taskId = UUIDHexGenerator.generate();
        //对查询的值做一些基本校验
        return new SyncOmsOrderTaskDomain(mainTaskDTO, subTaskDTOList, subTaskRecordDTOList);
    }

domainService 中,操作 domain 的各种方法,包括必要时,会通过入参、配置等信息,通过 factory 构造 domain。

@Service
public class MainTaskDomainService {
    @Resource
    MainTaskRepo mainTaskRepo;
    @Resource
    MainTaskConfig mainTaskConfig;
    @Resource
    SubTaskConfig subTaskConfig;
    @Resource
    ConcurrentReadOrderService concurrentReadOrderService;
    //触发任务调度
    public String triggerTask() {
        SyncOmsOrderTaskDomain syncOmsOrderTaskDomain = mainTaskRepo.loadLast();
        if (syncOmsOrderTaskDomain.ifNotExsit()) {
            syncOmsOrderTaskDomain.initTaskByConfig(mainTaskConfig);
            syncOmsOrderTaskDomain.initSubTaskByConfig(subTaskConfig);
            mainTaskRepo.save(syncOmsOrderTaskDomain);
            //细节1:实时更新子任务发送进度,如何实现?
            // 1。子任务查询并发送成功后,可使用内部通知机制,异步通知、更新。
            // 2。子任务查询发送、直接调用repo更新(有点耦合)
            concurrentReadOrderService.executeRead(syncOmsOrderTaskDomain);
            return "任务首次运行";
        } else {
            if (syncOmsOrderTaskDomain.isRunning()) {
                return "执行中";
            }
            if (syncOmsOrderTaskDomain.needReRun()){
                syncOmsOrderTaskDomain.updateReRunTask();
                mainTaskRepo.updateReRunTask(syncOmsOrderTaskDomain);
                concurrentReadOrderService.executeRead(syncOmsOrderTaskDomain);
                return "重新运行";
            }
            if (syncOmsOrderTaskDomain.isFinished()) {
                syncOmsOrderTaskDomain.createNextTask();
                mainTaskRepo.save(syncOmsOrderTaskDomain);
                concurrentReadOrderService.executeRead(syncOmsOrderTaskDomain);
                return "运行下一周期";
            }
        }
        return "未知状态";
    }
}

更新子任务发送状态:

执行子任务时,会组装成 runable,交给线程池执行查询,每次查询到数据,同时发送内部事件,更新子任务的发送数据数以及更新是否发送完毕状态。
发送消息结构:

class SubTaskSendCommand {
    String taskId;
    String subTaskId;
    int sendCount;
    boolean sendFinished;
}

因为发送的状态数据仅需保存,所以通过 factory 直接构造 domain,通过 repo save。

SyncOrderTaskDomain domain = MainTaskFactory.generate(subTaskSendCommand);

repo.updateSendInfo(domain);

更新子任务消费状态:
扫描的数据被消费处理成功后,需要更新任务记录状态,处理流程和上步类似:

class SubTaskConsumerCommand {
    String taskId;
    String subTaskId;
    int receivedCount;
}
SyncOrderTaskDomain domain = MainTaskFactory.generate(subTaskConsumerCommand);

repo.updateReceiveInfo(domain);

整理流程都收敛在 domainService 层,factory 主要职责是构造 domain 对象,repo 负责加载、保存、更新 domain 信息。

domain 处理各类 command 指定,包括任务触发、更新任务发送记录,更新任务消费记录。

关于 domain 内部模型,如何管理主任务、子任务、任务记录,后序更新。

上一篇下一篇

猜你喜欢

热点阅读