Activiti工作流引擎的使用、思考与总结
从业务角度来说,至少需要满足以下功能:
1.查询待办事项列表
2.待办事项的办理
3.查看已办历史
从技术角度来说:
1.activiti引擎提供的api功能过于分散,对于开发人员来说,没有一个统一的接口。最终导致开发效率不高,代码维护困难。
2.画流程图不确定性比较多,节点名称和监听器设置没有统一规范。同样也会导致效率问题。
用请假的流程来举例,首先,看一下流程图怎么画?
针对于画图,我们先定一个小规范,要求所有的节点上的Listeners全部指定为一个我们的自定义listener,无论是ExecutionListener还是TaskListener,全部指定为一个固定的自定义listener。
// 全部统一设置成ActivitiListener监听器
public class ActivitiListener implements TaskListener, ExecutionListener {
private static final long serialVersionUID = 6071133014325140738L;
// 用户任务监听器被触发的话,就会调用下面的notify方法
@Override
public void notify(DelegateTask delegateTask) {
// 获取当前任务的taskDefinitionKey,其实就是节点id
String taskDefinitionKey = delegateTask.getTaskDefinitionKey();
// 通过taskDefinitionKey和目标类型去IOC容器中查找指定的Bean
UserTaskListener userTaskHandler =
ApplicationContextUtil.getBean(taskDefinitionKey, UserTaskListener.class);
if (Objects.nonNull(userTaskHandler)) {
// 执行目标Bean中的方法
userTaskHandler.notify(delegateTask);
}
}
// ExecutionListener回调方法
@Override
public void notify(DelegateExecution execution) {
// 获取IOC容器
ApplicationContext webApplicationContext = ApplicationContextUtil.getApplicationContext();
// 从容器中查找目标类型Bean列表
Map<String, AbstractElement> beans =
webApplicationContext.getBeansOfType(AbstractElement.class);
if (CollectionUtils.isEmpty(beans)) {
return;
}
// 通过activityId查找符合要求的Bean实例。在bpmn中,其实就是元素的id。
String activityId = execution.getCurrentActivityId();
AbstractElement abstractElement = beans.get(activityId);
if (Objects.nonNull(abstractElement)) {
// 执行目标Bean的notify方法
abstractElement.notify(execution);
return;
}
// 如果通过activityId找不到,就通过processDefinitionKey查找目标Bean
String processDefinitionKey =
CastUtil.castString(execution.getVariable(Const.PROCESS_DEFINITION_KEY));
abstractElement = beans.get(processDefinitionKey);
if (Objects.nonNull(abstractElement)) {
// 执行目标Bean的notify方法
abstractElement.notify(execution);
}
}
}
按照上述设定,那么一旦触发ExecutionListener就会去IOC容器中找AbstractElement类,我们来看一下这个类:
// 这个接口是需要所有UserTask来实现的
public interface UserTaskListener {
/** @param delegateTask */
void notify(DelegateTask delegateTask);
}
// 把所有的节点都抽象成AbstractElement,主要在于需要让每一个节点都是ExecutionListener的子类
// 这个设计的灵感来自于bpmn流程图里面任何组件元素都可以指定ExecutionListener,那意味着我如果想要统一规范所有的组件元素的Listener为ActivitiListener的话,那么我就需要把所有的组件都抽象成一个目标,这样我在ActivitiListener的实现里面就可以通过一定的规则找到目标组件的实现了。
// 最终,通过总结,要求所有元素的id都作为Bean的名字,也就是说,通过这个元素的id可以在IOC容器中找到对应的Bean实例
public abstract class AbstractElement extends AbstractActivitiServiceImpl
implements ExecutionListener {
private static final long serialVersionUID = 4836235578847862052L;
@Autowired private ActivitiBusinessDataRepository activitiBusinessDataRepository;
@Transactional(rollbackFor = Exception.class)
@Override
public void notify(DelegateExecution execution) {
// 如果当前对象不支持,就直接结束
if (!support(execution)) {
return;
}
Map<String, Object> variables = Maps.newHashMap();
Map<String, Object> data = Maps.newHashMap();
// 执行前
beforeExecute(execution, variables, data);
// 真正的执行目标代码
execute(execution, variables, data);
// 执行后
afterExecute(execution, data);
}
/**
* 执行业务
*
* @param execution
* @param variables
* @param data
*/
protected abstract void execute(
DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data);
/**
* 后置处理
*
* @param execution
* @param data
*/
protected abstract void afterExecute(DelegateExecution execution, Map<String, Object> data);
/**
* @param execution
* @return
*/
protected boolean support(DelegateExecution execution) {
return false;
}
/**
* 读取前置系统参数
*
* @param execution
* @param variables
* @param data
*/
protected void beforeExecute(
DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {
String taskDefinitionKey = execution.getCurrentActivityId();
String processDefinitionKey =
CastUtil.castString(execution.getVariable(Const.PROCESS_DEFINITION_KEY));
// 从数据库里面把自定义配置读取到variables里面。如果没有这种需要的话,可以去掉。
activitiBusinessDataRepository.readProcessInstanceVariable(
processDefinitionKey, taskDefinitionKey, variables);
if (!CollectionUtils.isEmpty(variables)) {
for (Map.Entry<String, Object> e : variables.entrySet()) {
String name = e.getKey();
Object value = e.getValue();
execution.setVariable(e.getKey(), e.getValue());
data.put(name, value);
}
}
}
protected void saveData(DelegateExecution execution, Map<String, Object> data) {
Timestamp now = new Timestamp(System.currentTimeMillis());
String id = execution.getId();
String taskDefinitionKey = execution.getCurrentActivityId();
String name = execution.getCurrentFlowElement().getName();
String processInstanceId = execution.getProcessInstanceId();
String businessKey = execution.getProcessInstanceBusinessKey();
TaskNodeInfo taskNodeInfo = TaskNodeInfo.newInstance(id, name, taskDefinitionKey);
// 自动保存的流程节点数据;把一些我认为一定要保存的数据自动保存下来,也就是说,不依赖于开发人员,我自己认为一定要保存的数据,比如businessKey一定要保存
List<FieldInfo> autoSaveTaskNodeVariables =
activitiBusinessDataRepository.getAutoSaveTaskNodeVariables(
Executor.defaultExecutor(), businessKey, data);
// processInstanceData是抽象方法,需要每一个子类实现;返回的数据是当前流程实例需要保存的信息,以便后续节点可以使用
List<FieldInfo> processInstanceDataList = processInstanceData(execution, data);
// taskNodeData是抽象方法,子类需要实现;返回的数据是当前节点的快照信息
List<FieldInfo> taskNodeDataList = taskNodeData(execution, data);
// 记录流程实例数据;保存当前流程实例数据
activitiBusinessDataRepository.saveProcessInstanceData(
processInstanceDataList, null, processInstanceId, businessKey, now);
// 记录节点数据;把当前节点的相关信息保存起来
activitiBusinessDataRepository.saveTaskNodeData(
autoSaveTaskNodeVariables, taskNodeDataList, processInstanceId, taskNodeInfo, now);
// 自动记录节点历史,不需要开发者操心
// 一般历史数据大概可以归纳为时间、节点、任务、流程实例ID,附加信息等等,所以花点心思总结一下,应该是可以固定下来的,所以就不需要子类去实现
activitiBusinessDataRepository.saveHistory(
businessKey,
processInstanceId,
Executor.defaultExecutor(),
taskNodeInfo,
data,
now);
}
// 返回的数据是当前流程实例需要保存的信息
protected abstract List<FieldInfo> processInstanceData(
DelegateExecution execution, Map<String, Object> data);
// 返回的数据是当前节点的快照信息
protected abstract List<FieldInfo> taskNodeData(
DelegateExecution execution, Map<String, Object> data);
/**
* 读取变量参数
*
* @param processDefinitionKey
* @param taskDefinitionKey
* @param variables
*/
protected void readTransactorAndSendNoticeVariable(
String processDefinitionKey, String taskDefinitionKey, Map<String, Object> variables) {
activitiBusinessDataRepository.readTransactorAndSendNoticeVariable(
processDefinitionKey, taskDefinitionKey, variables);
}
}
综上所述,我们总结两点规范:
-
1.所有的Listener都是ActivitiListener
-
2.所有的组件id都作为Bean的名字,通过组件id去IOC容器中查找目标Bean;如果找不到,就通过processDefinitionKey查找目标Bean
关于AbstractActivitiServiceImpl这个类,其实就是集成了activiti里面一些常用的api,我认为这是大多数组件都会调用的,所以特地放在这个抽象类里面,以便所有AbstractElement的子类都可以使用里面的方法。只和activiti引擎相关数据表打交道的逻辑可以放在这个类里面实现。
public abstract class AbstractActivitiServiceImpl
implements IActivitiService, ApplicationContextAware {
@Autowired protected TaskService taskService;
@Autowired protected FormService formService;
@Autowired protected HistoryService historyService;
@Autowired protected IdentityService identityService;
@Autowired protected ManagementService managementService;
@Autowired protected RepositoryService repositoryService;
@Autowired protected RuntimeService runtimeService;
@Autowired private IActRunTaskRepository actRunTaskRepository;
private static ApplicationContext APPLICATION_CONTEXT;
protected ApplicationContext applicationContext() {
return APPLICATION_CONTEXT;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
APPLICATION_CONTEXT = applicationContext;
}
/**
* 统计指定用户的待办事项数量
*
* @param userIds
* @return
*/
public List<ActivitiCountGroupByResult> countUnassignedTaskGroupByUserId(List<String> userIds) {
List<Map<String, Object>> list = actRunTaskRepository.countRunTaskForEachOne(userIds);
if (CollectionUtils.isEmpty(list)) {
return null;
}
Map<String, Integer> userTaskMap = Maps.newHashMap();
for (Map<String, Object> e : list) {
String userId = CastUtil.castString(e.get("userId"));
Integer taskNum = CastUtil.castInt(e.get("taskNum"));
userTaskMap.put(userId, taskNum);
}
List<ActivitiCountGroupByResult> result = Lists.newArrayList();
for (String userId : userIds) {
Integer taskNum = userTaskMap.getOrDefault(userId, 0);
result.add(new ActivitiCountGroupByResult(userId, taskNum));
}
return result;
}
/**
* 待办事项最少的人
*
* @param userIds
* @return
*/
public ActivitiCountGroupByResult minUnassignedTask(List<String> userIds) {
return countUnassignedTask(userIds, false);
}
/**
* 待办事项最多的人
*
* @param userIds
* @return
*/
public ActivitiCountGroupByResult maxUnassignedTask(List<String> userIds) {
return countUnassignedTask(userIds, true);
}
/**
* 待办事项最少或最多的人
*
* @param userIds
* @return
*/
private ActivitiCountGroupByResult countUnassignedTask(List<String> userIds, boolean isMax) {
List<ActivitiCountGroupByResult> list = countUnassignedTaskGroupByUserId(userIds);
if (CollectionUtils.isEmpty(list)) {
// 都没有待办事项
return null;
}
if (isMax) {
return Collections.max(
list, Comparator.comparingInt(ActivitiCountGroupByResult::getNum));
} else {
return Collections.min(
list, Comparator.comparingInt(ActivitiCountGroupByResult::getNum));
}
}
@Override
public Task queryUnassignedTask(String taskId) {
return taskService.createTaskQuery().taskId(taskId).singleResult();
}
/**
* 通过taskId查询task
*
* @param taskId
* @return
*/
@Override
public Task queryUnassignedTask(String taskId, String userId) {
return taskService
.createTaskQuery()
.taskId(taskId)
.taskCandidateOrAssigned(userId)
.singleResult();
}
/**
* 查询待办任务
*
* @param userId
* @return
*/
@Override
public List<Task> queryUnassignedTaskByUser(
String userId, List<String> processInstaceIds, final int page, int size) {
int size_val = resetSize(size);
int firstResult = countFirstResult(page, size_val);
TaskQuery taskQuery = taskService.createTaskQuery().taskCandidateOrAssigned(userId);
if (!CollectionUtils.isEmpty(processInstaceIds)) {
taskQuery = taskQuery.processInstanceIdIn(processInstaceIds);
}
return taskQuery.orderByTaskCreateTime().desc().listPage(firstResult, size_val);
}
/**
* 计算第一条结果的索引
*
* @param page
* @param size
* @return
*/
private int countFirstResult(final int page, final int size) {
int startIndex = 1;
// 如果减1后还小于等于0,就重置为0(因为达成统一规定:分页的第一页page指定从1开始,但是jpa中是从0开始)
startIndex = page - startIndex;
int page_val = startIndex <= 0 ? 0 : startIndex;
return page_val * size;
}
/**
* 计算每页大小
*
* @param size
* @return
*/
private int resetSize(final int size) {
return size <= 0 ? 10 : size;
}
/**
* 查询已办任务
*
* @param userId
* @param page
* @param size
* @return
*/
@Override
public List<HistoricTaskInstance> queryDoneTasks(String userId, int page, int size) {
int size_val = resetSize(size);
int firstResult = countFirstResult(page, size_val);
return historyService
.createHistoricTaskInstanceQuery()
.taskAssignee(userId)
.finished()
.orderByTaskCreateTime()
.desc()
.listPage(firstResult, size_val);
}
/**
* 统计待办事项数量
*
* @param userId
* @return
*/
@Override
public long countUnassignedTask(String userId) {
return taskService.createTaskQuery().taskCandidateOrAssigned(userId).count();
}
/**
* 统计已办事项数量
*
* @param userId
* @return
*/
public long countDoneTask(String userId) {
return historyService.createHistoricTaskInstanceQuery().taskAssignee(userId).count();
}
/**
* 查询当前流程没有办理人的任务
*
* @param processInstanceId
* @return
*/
@Override
public List<Task> currentUnassignedTasks(String processInstanceId) {
return taskService
.createTaskQuery()
.processInstanceId(processInstanceId)
.taskUnassigned()
.list();
}
/**
* 查询当前没有被办理的任务
*
* @param processInstanceId
* @return
*/
@Override
public Task currentUndoneTask(String processInstanceId) {
return taskService
.createTaskQuery()
.processInstanceId(processInstanceId)
.singleResult();
}
}
从下面开始,我们就要开始做具体的区分了,开始区分流程实例和任务处理器了
-
流程实例
// 考虑到每一个流程都有一个开始节点,那么我就抓住它们的共同点,做了一个抽象类,供所有的实例类来继承 public abstract class BaseProcessServiceImpl extends AbstractElement { private ProcessDefinition processDefinition; private BpmnModel bpmnModel; private StartEvent startEvent; private Collection<FlowElement> xmlNodeElements; private static final String START_EVENT_KEY = "0"; // 通过IOC容器获取所有的用户任务处理器对象;后续会提到UserTaskHandler @Autowired private Map<String, UserTaskHandler> userTaskHandlers; // 持久化工具;专门用来存储数据 @Autowired private ActivitiBusinessDataRepository activitiBusinessDataRepository; // 初始化工作;这些解析出来的属性都是为了方便以后扩展使用。 @PostConstruct private void init() { // 把流程定义解析出来 processDefinition = repositoryService .createProcessDefinitionQuery() .processDefinitionKey(processDefinitionKey()) .latestVersion() .singleResult(); // 解析出bpmn模型 bpmnModel = repositoryService.getBpmnModel(processDefinition.getId()); Process process = bpmnModel.getProcessById(processDefinitionKey()); // 解析出所有的xml节点元素 this.xmlNodeElements = process.getFlowElements(); // 解析出启动节点 this.startEvent = (StartEvent) process.getInitialFlowElement(); } /** * 启动流程实例 * * @param businessKey * @param executor 执行人 * @param variables 需要放进流程实例中的变量值 * @param data 用来贯穿整个方法,可以把数据放在里面,最后在调用的方法中取出来使用 * @return * @throws GlobalException */ @Transactional(rollbackFor = Exception.class) public ProcessInstance startProcessInstance( String businessKey, Executor executor, Map<String, Object> variables, Map<String, Object> data) throws GlobalException { Preconditions.checkState(StringUtils.isNotBlank(businessKey), "businessKey不能为空"); // 特地创建一个map对象,用作启动流程实例的参数;注意要与data区分开,data是不会与流程实例打交道的,但是data的作用是用来贯穿整个方法的,专门用于数据传递;也就是说,data里面的数据不会弄脏流程实例。 Map<String, Object> allVariables = Maps.newHashMap(); if (!CollectionUtils.isEmpty(variables)) { allVariables.putAll(variables); } // processDefinitionKey是一个抽象方法,需要子类实现,不同的流程定义不一样 String processDefinitionKey = processDefinitionKey(); allVariables.put(Const.PROCESS_DEFINITION_KEY, processDefinitionKey); // 前置操作 beforeStartProcessInstance(processDefinitionKey, businessKey, executor, allVariables, data); // 启动流程实例 ProcessInstance processInstance; if (CollectionUtils.isEmpty(allVariables)) { processInstance = runtimeService.startProcessInstanceByKey(processDefinitionKey, businessKey); } else { processInstance = runtimeService.startProcessInstanceByKey( processDefinitionKey, businessKey, allVariables); } // 后置操作 afterStartProcessInstance(processInstance, businessKey, executor, variables, data); return processInstance; } // 前置操作 protected void beforeStartProcessInstance( String processDefinitionKey, String businessKey, Executor executor, Map<String, Object> variables, Map<String, Object> data) throws GlobalException { // 通过startEventKey获取启动节点 UserTaskHandler userTaskHandler = userTaskHandlers.get(startEventKey()); if (Objects.nonNull(userTaskHandler)) { // 设置启动节点的执行人 identityService.setAuthenticatedUserId(executor.getId()); // 同样的引入任务处理前置操作 userTaskHandler.beforeTask(null, businessKey, executor, variables, data); } // 读取一些自定义配置,把配置信息存储进variables;这些配置将被用在启动流程实例,这样就可以在这个流程实例存活的任意时刻和节点获取相关配置,方便后续扩展;比如后续可以扩展动态指派任务、待办任务消息提示等业务功能 activitiBusinessDataRepository.readProcessInstanceVariable( processDefinitionKey(), startEventKey(), variables); } // 后置操作 protected void afterStartProcessInstance( ProcessInstance processInstance, String businessKey, Executor executor, Map<String, Object> variables, Map<String, Object> data) { Timestamp now = new Timestamp(System.currentTimeMillis()); String processInstanceId = processInstance.getId(); String processDefinitionKey = processInstance.getProcessDefinitionKey(); String processDefinitionName = processInstance.getProcessDefinitionName(); String startUserId = processInstance.getStartUserId(); long startTime = processInstance.getStartTime().getTime(); TaskNodeInfo taskNodeInfo = TaskNodeInfo.newInstance(START_EVENT_KEY, startEventName(), startEventKey()); // 根据任务节点taskDefinitionKey获取指定的UserTaskHandler UserTaskHandler userTaskHandler = userTaskHandlers.get(taskNodeInfo.getDefinitionKey()); List<FieldInfo> processInstanceDataList = null; List<FieldInfo> taskNodeDataList = null; if (Objects.nonNull(userTaskHandler)) { // 获取审核结果和建议;默认用户任务都会有审核结果和建议。 Object result = userTaskHandler.approvalResult(variables, data); if (Objects.nonNull(result)) { data.put(Approval.APPROVAL_RESULT.name(), result); } Object opinion = userTaskHandler.approvalOpinion(variables, data); if (Objects.nonNull(opinion)) { data.put(Approval.APPROVAL_OPINION.name(), opinion); } // 用户任务中,需要保存到流程实例中的数据 processInstanceDataList = userTaskHandler.processInstanceData( processInstance, executor, businessKey, variables, data); // 用户任务中,需要保存到节点快照的数据 taskNodeDataList = userTaskHandler.taskNodeData( processInstance, executor, businessKey, variables, data); } // 和上面两个对应,下面是程序自动保存的数据,不依赖于开发者 // 自动保存的流程实例数据 List<FieldInfo> autoSaveProcessInstanceData = activitiBusinessDataRepository.getAutoSaveBusinessInstanceVariables( processDefinitionKey, processDefinitionName, startUserId, startTime); // 自动保存的流程节点数据 List<FieldInfo> autoSaveTaskNodeVariables = activitiBusinessDataRepository.getAutoSaveTaskNodeVariables( executor, businessKey, data); // 保存该流程实例数据 activitiBusinessDataRepository.saveProcessInstanceData( processInstanceDataList, autoSaveProcessInstanceData, processInstanceId, businessKey, now); // 保存节点快照数据 activitiBusinessDataRepository.saveTaskNodeData( autoSaveTaskNodeVariables, taskNodeDataList, processInstanceId, taskNodeInfo, now); // 保存节点历史数据;历史数据属于自动保存的,不需要开发者操心。 activitiBusinessDataRepository.saveHistory( businessKey, processInstanceId, executor, taskNodeInfo, data, now); } /** * 所有任务办理入口 * @description */ @Transactional(rollbackFor = GlobalException.class) public Task completeTask( String taskId, String businessKey, Executor executor, Map<String, Object> variables, Map<String, Object> data) throws GlobalException { // 通过taskId作为查询参数,还有一个就是当前办理人的id Task task = queryUnassignedTask(taskId, executor.getId()); if (Objects.isNull(task)) { throw GlobalException.newInstance("TASK_NOT_EXIST", "任务不存在"); } // 同上面的流程启动;作为办理任务的参数容器 Map<String, Object> allVariables = Maps.newHashMap(); if (!CollectionUtils.isEmpty(variables)) { allVariables.putAll(variables); } // 前置操作 beforeCompleteTask(task, businessKey, executor, allVariables, data); // 任务办理 if (CollectionUtils.isEmpty(allVariables)) { taskService.complete(task.getId()); } else { taskService.complete(task.getId(), allVariables); } // 后置操作 afterCompleteTask(task, businessKey, executor, allVariables, data); return task; } // 前置操作 protected void beforeCompleteTask( Task task, String businessKey, Executor executor, Map<String, Object> variables, Map<String, Object> data) throws GlobalException { // 获取当前任务的taskDefinitionKey String taskDefinitionKey = task.getTaskDefinitionKey(); // 查询目标UserTaskHandler UserTaskHandler userTaskHandler = userTaskHandlers.get(taskDefinitionKey); if (Objects.nonNull(userTaskHandler)) { // 执行任务处理的前置操作 userTaskHandler.beforeTask(task, businessKey, executor, variables, data); } // 读取该节点相关配置;只有执行到该节点,才会读取该节点的配置,而不是一次性读取流程的所有配置;注意按需加载 activitiBusinessDataRepository.readProcessInstanceVariable( processDefinitionKey(), taskDefinitionKey, variables); } // 后置操作 protected void afterCompleteTask( Task task, String businessKey, Executor executor, Map<String, Object> variables, Map<String, Object> data) { Timestamp now = new Timestamp(System.currentTimeMillis()); String processInstanceId = task.getProcessInstanceId(); TaskNodeInfo taskNodeInfo = TaskNodeInfo.newInstance(task.getId(), task.getName(), task.getTaskDefinitionKey()); // 拿到UserTaskHandler UserTaskHandler userTaskHandler = userTaskHandlers.get(taskNodeInfo.getDefinitionKey()); // 说白了,下面还是准备数据,因为后面需要把数据作持久化 List<FieldInfo> processInstanceDataList = null; List<FieldInfo> taskNodeDataList = null; if (Objects.nonNull(userTaskHandler)) { // 审核意见和建议 Object result = userTaskHandler.approvalResult(variables, data); if (Objects.nonNull(result)) { data.put(Approval.APPROVAL_RESULT.name(), result); } Object opinion = userTaskHandler.approvalOpinion(variables, data); if (Objects.nonNull(opinion)) { data.put(Approval.APPROVAL_OPINION.name(), opinion); } // 流程实例数据 processInstanceDataList = userTaskHandler.processInstanceData( task, executor, businessKey, variables, data); // 节点数据 taskNodeDataList = userTaskHandler.taskNodeData(task, executor, businessKey, variables, data); } // 自动保存的流程节点数据 List<FieldInfo> autoSaveTaskNodeVariables = activitiBusinessDataRepository.getAutoSaveTaskNodeVariables( executor, businessKey, data); // 数据准备好后,下面开始持久化 // 保存流程实例数据 activitiBusinessDataRepository.saveProcessInstanceData( processInstanceDataList, null, processInstanceId, businessKey, now); // 保存节点快照数据 activitiBusinessDataRepository.saveTaskNodeData( autoSaveTaskNodeVariables, taskNodeDataList, processInstanceId, taskNodeInfo, now); // 保存节点历史数据;其实是当前节点的时间、节点、办理人等相关信息,结合上面的快照数据,就可以完全恢复当前节点办理的任务 activitiBusinessDataRepository.saveHistory( businessKey, processInstanceId, executor, taskNodeInfo, data, now); } /** * 指定processDefinitionKey,需要子类实现 * * @return */ protected abstract String processDefinitionKey(); /** * 指定startEventKey * * @return */ protected String startEventKey() { return getStartEvent().getId(); } /** * 指定startEventName * * @return */ protected String startEventName() { return getStartEvent().getName(); } protected ProcessDefinition getProcessDefinition() { return this.processDefinition; } protected BpmnModel getBpmnModel() { return this.bpmnModel; } protected StartEvent getStartEvent() { return this.startEvent; } protected Collection<FlowElement> getXmlNodeElements() { return this.xmlNodeElements; } // 实现ExecutionListener前置操作,避免子类不需要实现的时候也要写一个空实现;子类如果真的需要实现,可以重写该方法 @Override protected void beforeExecute( DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {} // 实现ExecutionListener的notify操作,避免子类不需要实现的时候也要写一个空实现;子类如果真的需要实现,可以重写该方法 @Override protected void execute( DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) { } // 实现ExecutionListener后置操作,避免子类不需要实现的时候也要写一个空实现;子类如果真的需要实现,可以重写该方法 @Override protected void afterExecute(DelegateExecution execution, Map<String, Object> data) { } /** * 不能重写;在流程实例的实现中,不需要实现这个方法,只有任务处理器需要实现 * * @param execution * @param data * @return */ @Override protected final List<FieldInfo> processInstanceData( DelegateExecution execution, Map<String, Object> data) { return null; } /** * 不能重写;在流程实例的实现中,不需要实现这个方法,只有任务处理器需要实现 * * @param execution * @param data * @return */ @Override protected final List<FieldInfo> taskNodeData( DelegateExecution execution, Map<String, Object> data) { return null; } }
大家一定要仔细看上面这段代码的注释,里面阐述了这样设计的原因和思路
-
任务处理器
// 实现AbstractElement,开始分支出任务处理器 public abstract class AbstractTaskHandler extends AbstractElement { private static final long serialVersionUID = -1470919793703094859L; @Autowired private IBusinessNodeVariablesRepository businessNodeVariablesRepository; // 通过任务ID查询历史任务 public Object historyTaskInfo(String taskId){ List<TaskNodeVariables> historyDataList = businessNodeVariablesRepository.findByTaskId(taskId); if (CollectionUtils.isEmpty(historyDataList)) { return null; } Map<String, Object> map = Maps.newHashMap(); for (TaskNodeVariables nodeVariables : historyDataList) { map.put(nodeVariables.getFieldName(), nodeVariables); } return map; } } // 注意UserTaskHandler实现了UserTaskListener,一定要回头去看ActivitiListener public abstract class UserTaskHandler extends AbstractTaskHandler implements UserTaskListener { /** * 办理任务之前准备variables;同样的空实现,是为了避免子类一定要去写这个实现 * * @param task * @param businessKey * @param executor * @param variables * @param data */ public void beforeTask( Task task, String businessKey, Executor executor, Map<String, Object> variables, Map<String, Object> data) throws GlobalException { } /** * 这个是启动流程实例和完成待办任务的实现中都会调用的函数,自定义保存节点数据;同样的空实现,是为了避免子类一定要去写这个实现 * * @param obj * @param executor * @param businessKey * @param variables * @return */ public List<FieldInfo> taskNodeData( Object obj, Executor executor, String businessKey, Map<String, Object> variables, Map<String, Object> data) { return null; } /** * 这个是启动流程实例和完成待办任务的实现中都会调用的函数,自定义保存实例数据;同样的空实现,是为了避免子类一定要去写这个实现 * * @param obj * @param executor * @param businessKey * @param variables * @return */ public List<FieldInfo> processInstanceData( Object obj, Executor executor, String businessKey, Map<String, Object> variables, Map<String, Object> data) { return null; } /** * 审批结果;这个就是要求所有的用户任务必须要有审核结果 * * @return */ public abstract Object approvalResult(Map<String, Object> variables, Map<String, Object> data); /** * 审批意见;要求所有的用户任务必须要有审核意见 * * @return */ public abstract Object approvalOpinion(Map<String, Object> variables, Map<String, Object> data); /** * Execute前置处理;同样的空实现,是为了避免子类一定要去写这个实现 * * @param execution * @param variables * @param data */ @Override protected void beforeExecute( DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) { } /** * 后置处理;同样的空实现,是为了避免子类一定要去写这个实现 * * @param execution * @param data */ @Override protected final void afterExecute(DelegateExecution execution, Map<String, Object> data) { } // 这个基本没用,设计上还需要调整;同样的空实现,是为了避免子类一定要去写这个实现 @Override protected final List<FieldInfo> processInstanceData( DelegateExecution execution, Map<String, Object> data) { return null; } // 这个基本没用,设计上还需要调整;同样的空实现,是为了避免子类一定要去写这个实现 @Override protected final List<FieldInfo> taskNodeData( DelegateExecution execution, Map<String, Object> data) { return null; } // 这个就是实现ExecutionListener的空实现 @Override protected void execute( DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) { } // 如果是任务被创建的事件的话,那么先设置办理人,这就是动态实现设置办理人的思路;有需要可以扩展其他功能。 @Override public void notify(DelegateTask delegateTask) { if (StringUtils.equalsIgnoreCase(delegateTask.getEventName(), "create")) { setAssignee(delegateTask); } } /** * 设置办理人 * * @param delegateTask */ protected void setAssignee(DelegateTask delegateTask) { // 获取processDefinitionKey String processDefinitionKey = CastUtil.castString(delegateTask.getVariable(Const.PROCESS_DEFINITION_KEY)); // 获取taskDefinitionKey String taskDefinitionKey = delegateTask.getTaskDefinitionKey(); Map<String, Object> variables = Maps.newHashMap(); // 查询节点配置 readTransactorAndSendNoticeVariable(processDefinitionKey, taskDefinitionKey, variables); // 根据配置查询办理人;TODO String assignee = ""; delegateTask.setAssignee(assignee); } // 需要特殊处理的启动节点;我把它当用户任务处理掉 public abstract static class StartEventTaskHandler extends UserTaskHandler { // 把不可能调用的直接final处理掉 @Override public final void notify(DelegateTask delegateTask) { } @Override public List<FieldInfo> taskNodeData( Object obj, Executor executor, String businessKey, Map<String, Object> variables, Map<String, Object> data) { return null; } @Override public List<FieldInfo> processInstanceData( Object obj, Executor executor, String businessKey, Map<String, Object> variables, Map<String, Object> data) { return null; } // 把不可能调用的直接final处理掉 @Override public final Object approvalResult(Map<String, Object> variables, Map<String, Object> data) { return null; } // 把不可能调用的直接final处理掉 @Override public final Object approvalOpinion(Map<String, Object> variables, Map<String, Object> data) { return null; } } // 需要特殊处理的结束节点;我把它当用户任务处理掉 public abstract static class EndEventTaskHandler extends UserTaskHandler { // 把不可能调用的直接final处理掉 @Override public final List<FieldInfo> taskNodeData( Object obj, Executor executor, String businessKey, Map<String, Object> variables, Map<String, Object> data) { return null; } // 把不可能调用的直接final处理掉 @Override public final List<FieldInfo> processInstanceData( Object obj, Executor executor, String businessKey, Map<String, Object> variables, Map<String, Object> data) { return null; } // 把不可能调用的直接final处理掉 @Override public final Object approvalResult(Map<String, Object> variables, Map<String, Object> data) { return null; } // 把不可能调用的直接final处理掉 @Override public final Object approvalOpinion(Map<String, Object> variables, Map<String, Object> data) { return null; } // 把不可能调用的直接final处理掉 @Override public final void notify(DelegateTask delegateTask) { } } } // 这个类和ActivitiListener类似的原理,但是是针对ServiceTask的,因为ServiceTask组件需要指定一个JavaDelegate的实现 public class ServiceTaskDelegate implements JavaDelegate { @Override public void execute(DelegateExecution execution) { // 获取组件id String taskDefinitionKey = execution.getCurrentActivityId(); // 查找ServiceTaskHandler对象 ServiceTaskHandler serviceTaskHandler = ApplicationContextUtil.getBean(taskDefinitionKey, ServiceTaskHandler.class); if (Objects.nonNull(serviceTaskHandler)) { // 调用目标方法 serviceTaskHandler.notify(execution); } } } // 这个是系统任务处理器,activiti里面还有一个ServiceTask的组件 public abstract class ServiceTaskHandler extends AbstractTaskHandler { // 重写ExecutionListener后置操作,保存数据 @Override protected void afterExecute(DelegateExecution execution, Map<String, Object> data) { saveData(execution, data); } // 空实现 @Override protected List<FieldInfo> processInstanceData( DelegateExecution execution, Map<String, Object> data) { return null; } // 空实现 @Override protected List<FieldInfo> taskNodeData(DelegateExecution execution, Map<String, Object> data) { return null; } // 实现ExecutionListener的操作 @Override protected void execute( DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) { Object result = approvalResult(execution, variables, data); Object opinion = approvalOpinion(execution, variables, data); if (Objects.nonNull(result)) { data.put(Approval.APPROVAL_RESULT.name(), result); } if (Objects.nonNull(opinion)) { data.put(Approval.APPROVAL_OPINION.name(), opinion); } } /** * 审批结果 * * @return */ public abstract Object approvalResult( DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data); /** * 审批意见 * * @return */ public abstract Object approvalOpinion( DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data); } // 网关任务处理器 public abstract class GateTaskHandler extends AbstractElement { private static final long serialVersionUID = -1425610003326612058L; // 空实现 @Override protected void afterExecute(DelegateExecution execution, Map<String, Object> data) {} // 把不可能调用的直接final处理掉 @Override protected final List<FieldInfo> processInstanceData( DelegateExecution execution, Map<String, Object> data) { return null; } // 把不可能调用的直接final处理掉 @Override protected final List<FieldInfo> taskNodeData( DelegateExecution execution, Map<String, Object> data) { return null; } }
按照这个,我把最主要的UserTask、ServiceTask等几个常见的任务给处理掉了。下面来看一下这些抽象类都怎么使用。
-
具体实现
// "leave"是流程图的id
@Service("leave")
public class LeaveServiceImpl extends BaseProcessServiceImpl
implements ILeaveService {
/**
* 流程定义key
*/
private static final String LEAVE_PROCESS_DEFINITION_KEY = "leave";
@Override
protected String processDefinitionKey() {
return LEAVE_PROCESS_DEFINITION_KEY;
}
// 接口方法,需要实现,申请假期
@Transactional(rollbackFor = Exception.class)
@Override
public String apply(Object data, String userId) throws GlobalException {
// 准备数据TODO
// 调用BaseProcessServiceImpl启动方法启动流程实例
ProcessInstance processInstance =
startProcessInstance(businessKey, Executor.defaultExecutor(userId),
variables, data);
return processInstance.getId();
}
// 接口方法,需要实现,审批业务
@Transactional(rollbackFor = Exception.class)
@Override
public String approval(Object approval, String userId)
throws GlobalException {
// 准备数据
Map<String, Object> variables = handleApprovalVariables(approval, userId);
Map<String, Object> data = handleApprovalData(approval);
// 调用父类的办理方法
Task task =
completeTask(
approval.getTaskId(), approval.getBusinessKey(),
Executor.defaultExecutor(userId), variables, data);
return task.getId();
}
}
// 启动节点ID
@Component("leave_apply")
public class ApplyTask extends UserTaskHandler.StartEventTaskHandler {
// 任务前置操作
@Override
public void beforeTask(
Task task,
String businessKey,
Executor executor,
Map<String, Object> variables,
Map<String, Object> data)
throws GlobalException {
}
// 任务数据快照
@Override
public List<FieldInfo> taskNodeData(
Object obj,
Executor executor,
String businessKey,
Map<String, Object> variables,
Map<String, Object> data) {
if (CollectionUtils.isEmpty(data)) {
return null;
}
FieldInfo.Builder builder = new FieldInfo.Builder();
for (Map.Entry<String, Object> entry : data.entrySet()) {
String value = CastUtil.castString(entry.getValue(), StringUtils.EMPTY);
builder.normalFieldInfo(entry.getKey(), value, "");
}
return builder.build();
}
// 保存在流程实例中的数据
@Override
public List<FieldInfo> processInstanceData(
Object obj,
Executor executor,
String businessKey,
Map<String, Object> variables,
Map<String, Object> data) {
String leave_reason = CastUtil.castString(variables.get("leave_reason"), StringUtils.EMPTY);
String leave_date = CastUtil.castString(variables.get("leave_date"), StringUtils.EMPTY);
return new FieldInfo.Builder()
.normalFieldInfo(
"leave_reason", leave_reason, "请假原因")
.normalFieldInfo("leave_date", leave_date, "请假日期")
.build();
}
@Override
protected void execute(
DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {
}
}
// 审批节点id
@Component("leave_approval")
public class UserApprovalTask extends UserTaskHandler {
@Override
public void beforeTask(
Task task,
String businessKey,
Executor executor,
Map<String, Object> variables,
Map<String, Object> data)
throws GlobalException {
}
// 节点快照数据;这里我存储的是审批数据
@Override
public List<FieldInfo> taskNodeData(
Object obj,
Executor executor,
String businessKey,
Map<String, Object> variables,
Map<String, Object> data) {
String approvalUserId = CastUtil.castString(variables.get("approvalUserId"), StringUtils.EMPTY);
String approvalResult = CastUtil.castString(data.get("approvalResult"), StringUtils.EMPTY);
String approvalOpinion = CastUtil.castString(data.get("approvalOpinion"), StringUtils.EMPTY);
FieldInfo.Builder builder = new FieldInfo.Builder();
builder.normalFieldInfo("approvalUserId", approvalUserId, "审批人ID")
.normalFieldInfo("approvalResult", approvalResult, "审批结果")
.normalFieldInfo("approvalOpinion", approvalOpinion, "审批意见");
return builder.build();
}
// 同样的,我把需要的数据存储进流程实例中
@Override
public List<FieldInfo> processInstanceData(
Object obj,
Executor executor,
String businessKey,
Map<String, Object> variables,
Map<String, Object> data) {
String approvalUserId = CastUtil.castString(variables.get("approvalUserId"), StringUtils.EMPTY);
String approvalResult = CastUtil.castString(variables.get("approvalResult"), StringUtils.EMPTY);
String approvalOpinion = CastUtil.castString(variables.get("approvalOpinion"), StringUtils.EMPTY);
return new FieldInfo.Builder()
.normalFieldInfo(
"approvalUserId",
approvalUserId,
"审批人ID")
.normalFieldInfo("approvalResult",
approvalResult,
"审批结果")
.normalFieldInfo(
"approvalOpinion",
approvalOpinion,
"审批意见")
.build();
}
// 审批结果
@Override
public Object approvalResult(Map<String, Object> variables, Map<String, Object> data) {
return data.getOrDefault("approvalResult", StrUtil.EMPTY);
}
// 审批意见
@Override
public Object approvalOpinion(Map<String, Object> variables, Map<String, Object> data) {
return data.getOrDefault("approvalOpinion", StrUtil.EMPTY);
}
// 自定义设置办理人
@Override
protected void setAssignee(DelegateTask delegateTask) {
Object approvalUserId = delegateTask.getVariable("approvalUserId");
if (Objects.isNull(approvalUserId)) {
super.setAssignee(delegateTask);
} else {
String userId = CastUtil.castString(approvalUserId);
delegateTask.setAssignee(userId);
}
}
}
通过以上代码及相关注释,我已经把我对于activiti工作量引擎的使用与思考总结起来了,感兴趣的小伙伴可以参与一起讨论。