【Ovirt 笔记】任务机制分析与整理
2018-01-26 本文已影响21人
58bc06151329
文前说明
作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。
本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。
分析整理的版本为 Ovirt 3.4.5 版本。
基础说明
- 作为执行命令 Command 行为的一种监控。
- 在引擎管理门户的任务执行列表中可以进行查看。
数据库相关
- 表 job 中存放了执行的任务(行为)信息。
字段名称 | 字段说明 | 字段类型 | 其它 |
---|---|---|---|
job_id | 任务 ID | uuid | 不能为空 |
action_type | 操作类型 | character varying(50) | 不能为空 VdcActionType |
description | 显示任务内容 | text | 不能为空 |
status | 任务执行状态 | character varying(32) | 不能为空 JobExecutionStatus |
owner_id | 任务执行者 | uuid | \ |
visible | 任务是否显示 | boolean | 默认为 true |
start_time | 任务开始时间 | timestamp with time zone | 不能为空 |
end_time | 任务结束时间 | timestamp with time zone | \ |
last_update_time | 任务信息最后修改时间 | timestamp with time zone | \ |
correlation_id | 相关操作(Command)的线程 ID | character varying(50) | 不能为空 |
is_external | 是否外部任务 | boolean | 默认为 false |
is_auto_cleared | 任务是否定时自动清除 | boolean | 默认为 true |
- 操作类型枚举类 VdcActionType。通过配置 ExecutionMessages.properties 资源文件实现国际化。
- 任务执行状态枚举类 JobExecutionStatus。通过配置 Enums.properties 资源文件实现国际化。
任务执行状态 | 说明 | 场景 |
---|---|---|
STARTED | 任务启动 | 任务开始启动时 |
FINISHED | 任务完成 | 任务正常结束时 |
FAILED | 任务失败 | 任务执行失败时 |
ABORTED | 任务终止 | 分布式存储执行任务状态返回的任务终止状态 |
UNKNOWN | 任务未知 | 初始化默认状态 |
-
定时自动清除
- 后台创建调度 JobRepositoryCleanupManager 实现。
- 调度中执行存储过程 DeleteCompletedJobsOlderThanDate。
- 调度时间可配置
- SucceededJobCleanupTimeInMinutes 执行成功的任务结束时间后多久被删除,结束 10 分钟后被删除。
- FailedJobCleanupTimeInMinutes 执行失败的任务结束后多久被删除,结束 1 个小时后被删除。
- JobCleanupRateInMinutes 执行任务的频率,默认每 10 分钟执行一次。
-
表 step 中存放了执行任务的所有步骤信息。
字段名称 | 字段说明 | 字段类型 | 其它 |
---|---|---|---|
step_id | 步骤 ID | uuid | 不能为空 |
parent_step_id | 父类步骤 ID | uuid | \ |
job_id | 所属任务 ID | uuid | 不能为空 |
step_type | 步骤类型 | character varying(32) | 不能为空 StepEnum |
description | 步骤描述信息 | text | 不能为空 |
step_number | 步骤序号 | integer | 不能为空,从 0 开始 |
status | 步骤状态 | character varying(32) | 不能为空 JobExecutionStatus |
start_time | 步骤开始时间 | timestamp with time zone | 不能为空 |
end_time | 步骤结束时间 | timestamp with time zone | \ |
correlation_id | 相关操作(Command)的线程 ID | character varying(50) | 不能为空 |
external_system_type | 外部系统类型 | character varying(32) | \ ExternalSystemType |
external_id | 外部系统 ID | uuid | \ |
is_external | 是否外部步骤 | boolean | 默认为 false |
- 步骤类型枚举类 StepEnum。通过配置 ExecutionMessages.properties 资源文件实现国际化。
步骤类型 | 说明 |
---|---|
VALIDATING | 验证 |
EXECUTING | 执行 |
FINALIZING | 结束 |
- 步骤类型枚举类 StepEnum 中可以设置了 AsyncTaskType 异步类型。
- 设置异步类型的步骤,可以添加异步任务,表 async_tasks 中保存了异步任务信息。
字段名称 | 字段说明 | 字段类型 | 其它 |
---|---|---|---|
task_id | 异步任务 ID | uuid | 不能为空 |
action_type | 操作类型 ID | integer | 不能为空 |
status | 任务状态 | integer | 不能为空 AsyncTaskStatusEnum |
result | 异步任务返回 | integer | 不能为空 AsyncTaskResultEnum |
action_parameters | 执行操作 Command 参数序列化 | text | \ |
action_params_class | 执行操作 Command 参数类名称 | character varying(256) | \ |
step_id | 步骤 ID | uuid | \ |
command_id | 命令 ID | uuid | 不能为空 |
started_at | 任务开始时间 | timestamp with time zone | \ |
storage_pool_id | 存储域 ID | uuid | \ |
task_type | 任务类型 | integer | 不能为空,默认为 0,AsyncTaskType |
task_parameters | 异步任务参数序列化 | text | \ |
task_params_class | 异步任务参数类 | character varying(256) | \ |
vdsm_task_id | 返回的 vdsm 运行异步任务 ID | uuid | 通过返回 uuidReturn.mUuid 中获取 |
root_command_id | 父类 Command 的 ID | uuid | \ |
- 异步任务执行状态枚举类 AsyncTaskStatusEnum。
异步状态 | 状态码 | 状态说明 |
---|---|---|
unknown | 0 | 任务未知状态。 |
init | 1 | 任务还没有开始前的初始化状态。 |
running | 2 | 任务正在运行状态。 |
finished | 3 | 任务完成状态。 |
aborting | 4 | 任务异常终止状态。 |
cleaning | 5 | 终止请求失败的任务状态,等待清理。 |
- 异步任务执行返回枚举类 AsyncTaskResultEnum。
异步任务返回 | 返回说明 |
---|---|
success | 成功 |
failure | 失败 |
cleanSuccess | 清除成功 |
cleanFailure | 清除失败 |
- 异步任务实际上是通过异步任务工厂(AsyncTaskFactory),构建了 SPMAsyncTask SPM 异步任务。任务状态为 AsyncTaskState。
SPM 异步任务状态 | 说明 |
---|---|
Initializing | 正在初始化状态 |
Polling | 轮询状态 |
Ended | 结束状态 |
AttemptingEndAction | 尝试结束操作状态 |
ClearFailed | 清除错误状态 |
Cleared | 清除干净状态 |
- 外部系统类型枚举类 ExternalSystemType
- VDSM
- GLUSTER
功能模块详细
- Backend 执行 Command 命令,创建默认的 Command 命令执行上下文对象。
@Override
public VdcReturnValueBase runAction(CommandBase<?> action, ExecutionContext executionContext) {
return runAction(action, true, ExecutionHandler.createDefaultContexForTasks(executionContext));
}
public static CommandContext createDefaultContexForTasks(ExecutionContext parentContext, EngineLock lock) {
ExecutionContext executionContext = new ExecutionContext();
if (parentContext != null) {
if (parentContext.getJob() != null) {
Step parentStep = parentContext.getParentTasksStep();
if (parentStep != null) {
executionContext.setParentTasksStep(parentStep);
}
} else {
executionContext.setParentTasksStep(parentContext.getParentTasksStep());
}
}
return new CommandContext(executionContext, lock);
}
-
命令上下文对象 CommandContext 包含
-
创建执行任务上下文对象,设置默认步骤。
- 默认包含 VALIDATING(验证) 和 EXECUTING(执行) 两个步骤。
inal static List<StepEnum> DEFAULT_STEPS_LIST = Arrays.asList(StepEnum.VALIDATING, StepEnum.EXECUTING);
public ExecutionContext() {
stepsList = DEFAULT_STEPS_LIST;
}
- 一个 Command 命令对应一个执行任务上下文 ExecutionContext。
- 属性 job 中包含执行任务上下文的任务信息。
- 属性 step 中包含执行任务上下文的步骤信息。
- 属性 ExecutionMethod 标明执行的 Command 命令是一个任务还是一个步骤。
- 属性 isMonitored 定义监管记录该任务或步骤。
- 属性 shouldEndJob 定义该任务或步骤完成的时候同时结束所在任务。
- 属性 parentTasksStep 定义当前处于 EXECUTING(执行) 状态的步骤。
- 属性 isTasksMonitored 定义监管记录当前处于 EXECUTING(执行) 状态的步骤。
- 属性 isCompleted 定义任务上下文是否完成。
- 属性 isJobRequired 定义是否为必须的任务。
任务步骤的操作相关。
- ExecutionHandler 类中任务操作相关实现。
创建任务
public static Job createJob(VdcActionType actionType, CommandBase<?> command) {
Job job = new Job();
job.setId(Guid.newGuid());
job.setActionType(actionType);
job.setDescription(ExecutionMessageDirector.resolveJobMessage(actionType, command.getJobMessageProperties()));
job.setJobSubjectEntities(getSubjectEntities(command.getPermissionCheckSubjects()));
job.setOwnerId(command.getUserId());
job.setStatus(JobExecutionStatus.STARTED);
job.setStartTime(new Date());
job.setCorrelationId(command.getCorrelationId());
return job;
}
- 任务描述的生成。通过配置 ExecutionMessages.properties 资源文件实现国际化。
- 关键字 job. 开头为任务描述。
- 关键字 step. 开头为步骤描述。
ExecutionMessageDirector.resolveJobMessage(actionType, command.getJobMessageProperties())
- 执行任务时相关的实体对象保存。
job.setJobSubjectEntities(getSubjectEntities(command.getPermissionCheckSubjects()))
- 设置任务的开始时间和开始状态。
job.setStartTime(new Date());
job.setCorrelationId(command.getCorrelationId());
获取任务对象
private static Job getJob(CommandBase<?> command, VdcActionType actionType) {
VdcActionParametersBase params = command.getParameters();
Job job;
// if Job is external, we had already created the Job by AddExternalJobCommand, so just get it from DB
if (params.getJobId() != null) {
job = DbFacade.getInstance().getJobDao().get((Guid)params.getJobId());
}
else {
job = createJob(actionType, command);
JobRepositoryFactory.getJobRepository().saveJob(job);
}
return job;
}
- 数据中存在则直接从数据库中获取。
- 不存在则创建新的任务对象。
获取与执行任务相关的实体类
private static Map<Guid, VdcObjectType> getSubjectEntities(List<PermissionSubject> permSubjectList) {
Map<Guid, VdcObjectType> entities = new HashMap<Guid, VdcObjectType>();
for (PermissionSubject permSubj : permSubjectList) {
if (permSubj.getObjectId() != null && permSubj.getObjectType() != null) {
entities.put(permSubj.getObjectId(), permSubj.getObjectType());
}
}
return entities;
}
结束任务(在任务上下文中生效)
public static void endJob(ExecutionContext context, boolean exitStatus) {
if (context == null) {
return;
}
Job job = context.getJob();
try {
if (context.isMonitored()) {
if (context.getExecutionMethod() == ExecutionMethod.AsJob && job != null) {
if (context.shouldEndJob() || !(job.isAsyncJob() && exitStatus)) {
context.setCompleted(true);
endJob(exitStatus, job);
}
} else {
Step step = context.getStep();
if (context.getExecutionMethod() == ExecutionMethod.AsStep && step != null) {
if (context.shouldEndJob()) {
if (job == null) {
job = JobRepositoryFactory.getJobRepository().getJob(step.getJobId());
}
if (job != null) {
context.setCompleted(true);
endJob(exitStatus, job);
}
}
}
}
}
} catch (Exception e) {
log.error(e);
}
}
private static void endJob(boolean exitStatus, Job job) {
job.markJobEnded(exitStatus);
try {
JobRepositoryFactory.getJobRepository().updateCompletedJobAndSteps(job);
} catch (Exception e) {
log.errorFormat("Failed to end Job {0}, {1}", job.getId(), job.getActionType().name(), e);
}
}
public void updateCompletedJobAndSteps(final Job job) {
TransactionSupport.executeInNewTransaction(new TransactionMethod<Void>() {
@Override
public Void runInTransaction() {
jobDao.update(job);
stepDao.updateJobStepsCompleted(job.getId(), job.getStatus(), job.getEndTime());
return null;
}
});
}
设置任务为异步任务(在任务上下文中生效)
public static void setAsyncJob(ExecutionContext executionContext, boolean isAsync) {
if (executionContext == null) {
return;
}
Job job = executionContext.getJob();
if (executionContext.getExecutionMethod() == ExecutionMethod.AsJob && job != null) {
job.setIsAsyncJob(isAsync);
}
}
根据设置的步骤状态添加步骤(在任务上下文中生效)
public static Step addTaskStep(ExecutionContext context, StepEnum stepName, String description) {
if (context == null) {
return null;
}
Step step = null;
if (context.isTasksMonitored()) {
Step parentTaskStep = context.getParentTasksStep();
if (parentTaskStep != null) {
step = addSubStep(parentTaskStep, stepName, description);
}
}
return step;
}
根据设置的步骤状态结束步骤
public static void endTaskStep(Guid stepId, JobExecutionStatus exitStatus) {
try {
if (stepId != null) {
Step step = JobRepositoryFactory.getJobRepository().getStep(stepId);
if (step != null) {
step.markStepEnded(exitStatus);
JobRepositoryFactory.getJobRepository().updateStep(step);
}
}
} catch (Exception e) {
log.errorFormat("Failed to terminate step {0} with status {1}", stepId, exitStatus, e);
}
}
根据设置的步骤状态结束步骤(在任务上下文中生效)
public static void endTaskJob(ExecutionContext context, boolean exitStatus) {
if (context == null) {
return;
}
try {
if (context.getExecutionMethod() == ExecutionMethod.AsJob && context.getJob() != null) {
endJob(context, exitStatus);
} else {
Step parentStep = context.getStep();
if (context.getExecutionMethod() == ExecutionMethod.AsStep && parentStep != null) {
Step finalizingStep = parentStep.getStep(StepEnum.FINALIZING);
if (finalizingStep != null) {
finalizingStep.markStepEnded(exitStatus);
JobRepositoryFactory.getJobRepository().updateStep(finalizingStep);
}
parentStep.markStepEnded(exitStatus);
JobRepositoryFactory.getJobRepository().updateStep(parentStep);
List<Step> steps = DbFacade.getInstance().getStepDao().getStepsByJobId(parentStep.getJobId());
boolean hasChildStepsRunning = false;
for (Step step : steps) {
if (step.getStatus() == JobExecutionStatus.STARTED && step.getParentStepId() != null) {
hasChildStepsRunning = true;
break;
}
}
if (!hasChildStepsRunning) {
endJob(exitStatus, JobRepositoryFactory.getJobRepository().getJob(parentStep.getJobId()));
}
}
}
} catch (RuntimeException e) {
log.error(e);
}
}
检测任务或步骤中是否包含外部系统步骤
public static boolean checkIfJobHasTasks(ExecutionContext context) {
if (context == null || !context.isMonitored()) {
return false;
}
try {
Guid jobId = null;
if (context.getExecutionMethod() == ExecutionMethod.AsJob && context.getJob() != null) {
jobId = context.getJob().getId();
} else if (context.getExecutionMethod() == ExecutionMethod.AsStep && context.getStep() != null) {
jobId = context.getStep().getId();
}
if (jobId != null) {
return DbFacade.getInstance().getJobDao().checkIfJobHasTasks(jobId);
}
} catch (RuntimeException e) {
log.error(e);
}
return false;
}
- 通过存储过程 CheckIfJobHasTasks 进行了查询。
SELECT EXISTS(
SELECT *
FROM step
WHERE job_id = v_job_id
AND external_id is not null
AND external_system_type in ('VDSM','GLUSTER'));
通过默认的行为监控准备运行的命令
public static void prepareCommandForMonitoring(CommandBase<?> command,
VdcActionType actionType,
boolean runAsInternal) {
ExecutionContext context = command.getExecutionContext();
if (context == null) {
context = new ExecutionContext();
}
try {
boolean isMonitored = shouldMonitorCommand(actionType, runAsInternal);
// A monitored job is created for monitored external flows
if (isMonitored || context.isJobRequired()) {
Job job = getJob(command, actionType);
context.setExecutionMethod(ExecutionMethod.AsJob);
context.setJob(job);
command.setExecutionContext(context);
command.setJobId(job.getId());
context.setMonitored(true);
}
} catch (Exception e) {
log.errorFormat("Failed to prepare command of type {0} for monitoring due to error {1}",
actionType.name(),
ExceptionUtils.getMessage(e),
e);
}
}
创建步骤(在任务上下文中生效)
public static Step addStep(ExecutionContext context, StepEnum stepName, String description, boolean isExternal) {
if (context == null) {
return null;
}
Step step = null;
if (context.isMonitored()) {
if (description == null) {
description = ExecutionMessageDirector.getInstance().getStepMessage(stepName);
}
try {
Job job = context.getJob();
if (context.getExecutionMethod() == ExecutionMethod.AsJob && job != null) {
step = job.addStep(stepName, description);
try {
step.setExternal(isExternal);
JobRepositoryFactory.getJobRepository().saveStep(step);
} catch (Exception e) {
log.errorFormat("Failed to save new step {0} for job {1}, {2}.", stepName.name(),
job.getId(), job.getActionType().name(), e);
job.getSteps().remove(step);
step = null;
}
} else {
Step contextStep = context.getStep();
if (context.getExecutionMethod() == ExecutionMethod.AsStep && contextStep != null) {
step = addSubStep(contextStep, stepName, description);
step.setExternal(isExternal);
}
}
} catch (Exception e) {
log.error(e);
}
}
return step;
}
- Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsJob),添加步骤到任务中。
step = job.addStep(stepName, description);
try {
step.setExternal(isExternal);
JobRepositoryFactory.getJobRepository().saveStep(step);
} catch (Exception e) {
log.errorFormat("Failed to save new step {0} for job {1}, {2}.", stepName.name(),
job.getId(), job.getActionType().name(), e);
job.getSteps().remove(step);
step = null;
}
- Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsStep),添加步骤到父类步骤中。
if (context.getExecutionMethod() == ExecutionMethod.AsStep && contextStep != null) {
step = addSubStep(contextStep, stepName, description);
step.setExternal(isExternal);
结束步骤(在任务上下文中生效)
public static void endStep(ExecutionContext context, Step step,
boolean exitStatus) {
if (context == null) {
return;
}
if (context.isMonitored()) {
Job job = context.getJob();
try {
if (step != null) {
step.markStepEnded(exitStatus);
JobRepositoryFactory.getJobRepository().updateStep(step);
}
if (context.getExecutionMethod() == ExecutionMethod.AsJob
&& job != null && !exitStatus) {
// step failure will cause the job to be marked as failed
context.setCompleted(true);
job.markJobEnded(false);
JobRepositoryFactory.getJobRepository()
.updateCompletedJobAndSteps(job);
} else {
Step parentStep = context.getStep();
if (context.getExecutionMethod() == ExecutionMethod.AsStep
&& parentStep != null) {
context.setCompleted(true);
if (!exitStatus) {
job.markJobEnded(false);
JobRepositoryFactory.getJobRepository()
.updateCompletedJobAndSteps(job);
}
}
}
} catch (Exception e) {
log.error(e);
}
}
}
- 设置该步骤最终的结束状态为失败状态(exitStatus = false),Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsJob),设置任务结束并且执行失败。
- Command 命令为一个步骤(context.getExecutionMethod() == ExecutionMethod.AsStep),设置所属父步骤结束并且所属任务执行失败。
step.markStepEnded(exitStatus);
JobRepositoryFactory.getJobRepository().updateStep(step);
context.setCompleted(true);
job.markJobEnded(false);
JobRepositoryFactory.getJobRepository().updateCompletedJobAndSteps(job);
- 设置该步骤最终的结束状态为成功状态(exitStatus = true),Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsStep),设置父类步骤结束并且执行成功。
context.setCompleted(true);
创建子步骤
private static Step addSubStep(Step parentStep, StepEnum stepName, String description) {
Step step = null;
if (parentStep != null) {
if (description == null) {
description = ExecutionMessageDirector.getInstance().getStepMessage(stepName);
}
step = parentStep.addStep(stepName, description);
try {
JobRepositoryFactory.getJobRepository().saveStep(step);
} catch (Exception e) {
log.errorFormat("Failed to save new step {0} for step {1}, {2}.", stepName.name(),
parentStep.getId(), parentStep.getStepType().name(), e);
parentStep.getSteps().remove(step);
step = null;
}
}
return step;
}
- 为一个步骤添加子步骤。
创建子步骤(在任务上下文中生效)
public static Step addSubStep(ExecutionContext context, Step parentStep, StepEnum newStepName, String description, boolean isExternal) {
Step step = null;
if (context == null || parentStep == null) {
return null;
}
try {
if (context.isMonitored()) {
if (description == null) {
description = ExecutionMessageDirector.getInstance().getStepMessage(newStepName);
}
if (context.getExecutionMethod() == ExecutionMethod.AsJob) {
if (DbFacade.getInstance().getStepDao().exists(parentStep.getId())) {
if (parentStep.getJobId().equals(context.getJob().getId())) {
step = parentStep.addStep(newStepName, description);
}
}
} else if (context.getExecutionMethod() == ExecutionMethod.AsStep) {
step = parentStep.addStep(newStepName, description);
}
}
if (step != null) {
step.setExternal(isExternal);
JobRepositoryFactory.getJobRepository().saveStep(step);
}
} catch (Exception e) {
log.error(e);
}
return step;
}
- Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsJob),父步骤所属该任务,将子步骤创建至父步骤中。
if (parentStep.getJobId().equals(context.getJob().getId())) {
step = parentStep.addStep(newStepName, description);
}
- Command 命令为一个步骤(context.getExecutionMethod() == ExecutionMethod.AsStep),将子步骤创建至该步骤中。
step = parentStep.addStep(newStepName, description);
开始执行最终的完成步骤(在任务上下文中生效)
public static Step startFinalizingStep(ExecutionContext executionContext) {
if (executionContext == null) {
return null;
}
Step step = null;
try {
if (executionContext.getExecutionMethod() == ExecutionMethod.AsJob) {
Job job = executionContext.getJob();
if (job != null) {
Step executingStep = job.getStep(StepEnum.EXECUTING);
Step finalizingStep =
job.addStep(StepEnum.FINALIZING,
ExecutionMessageDirector.getInstance().getStepMessage(StepEnum.FINALIZING));
if (executingStep != null) {
executingStep.markStepEnded(true);
JobRepositoryFactory.getJobRepository().updateExistingStepAndSaveNewStep(executingStep,
finalizingStep);
} else {
JobRepositoryFactory.getJobRepository().saveStep(finalizingStep);
}
}
} else if (executionContext.getExecutionMethod() == ExecutionMethod.AsStep) {
Step parentStep = executionContext.getStep();
if (parentStep != null) {
Step executingStep = parentStep.getStep(StepEnum.EXECUTING);
Step finalizingStep =
parentStep.addStep(StepEnum.FINALIZING, ExecutionMessageDirector.getInstance()
.getStepMessage(StepEnum.FINALIZING));
if (executingStep != null) {
executingStep.markStepEnded(true);
JobRepositoryFactory.getJobRepository().updateExistingStepAndSaveNewStep(executingStep,
finalizingStep);
} else {
JobRepositoryFactory.getJobRepository().saveStep(finalizingStep);
}
}
}
} catch (Exception e) {
log.error(e);
}
return step;
}
- Command 命令为一个任务(context.getExecutionMethod() == ExecutionMethod.AsJob),任务中创建 FINALIZING(结束)步骤。
Step executingStep = job.getStep(StepEnum.EXECUTING);
Step finalizingStep =
job.addStep(StepEnum.FINALIZING,
ExecutionMessageDirector.getInstance().getStepMessage(StepEnum.FINALIZING));
if (executingStep != null) {
executingStep.markStepEnded(true);
JobRepositoryFactory.getJobRepository().updateExistingStepAndSaveNewStep(executingStep,
finalizingStep);
} else {
JobRepositoryFactory.getJobRepository().saveStep(finalizingStep);
}
- Command 命令为一个步骤(context.getExecutionMethod() == ExecutionMethod.AsStep),创建 FINALIZING(结束)子步骤。
Step executingStep = parentStep.getStep(StepEnum.EXECUTING);
Step finalizingStep =
parentStep.addStep(StepEnum.FINALIZING, ExecutionMessageDirector.getInstance()
.getStepMessage(StepEnum.FINALIZING));
if (executingStep != null) {
executingStep.markStepEnded(true);
JobRepositoryFactory.getJobRepository().updateExistingStepAndSaveNewStep(executingStep,
finalizingStep);
} else {
JobRepositoryFactory.getJobRepository().saveStep(finalizingStep);
}
public void updateExistingStepAndSaveNewStep(final Step existingStep, final Step newStep) {
TransactionSupport.executeInNewTransaction(new TransactionMethod<Void>() {
@Override
public Void runInTransaction() {
jobDao.updateJobLastUpdateTime(existingStep.getJobId(), new Date());
stepDao.update(existingStep);
stepDao.save(newStep);
return null;
}
});
}
更新外部步骤信息
public static void updateStepExternalId(Step step, Guid externalId, ExternalSystemType systemType) {
if (step != null) {
step.getExternalSystem().setId(externalId);
step.getExternalSystem().setType(systemType);
try {
JobRepositoryFactory.getJobRepository().updateStep(step);
} catch (Exception e) {
log.errorFormat("Failed to save step {0}, {1} for system-type {2} with id {3}",
step.getId(),
step.getStepType().name(),
systemType.name(),
externalId,
e);
}
}
}
创建内部任务上下文
public static CommandContext createInternalJobContext(EngineLock lock) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.setJobRequired(true);
executionContext.setMonitored(true);
return new CommandContext(executionContext, lock);
}
- 任务为必须执行任务
executionContext.setJobRequired(true);
- 任务为必须监控的任务
executionContext.setMonitored(true);
- 追加资源对象锁
return new CommandContext(executionContext, lock);
创建任务上下文子项(以当前任务上下文为父项)
public static CommandContext createDefaultContexForTasks(ExecutionContext parentContext, EngineLock lock) {
ExecutionContext executionContext = new ExecutionContext();
if (parentContext != null) {
if (parentContext.getJob() != null) {
Step parentStep = parentContext.getParentTasksStep();
if (parentStep != null) {
executionContext.setParentTasksStep(parentStep);
}
} else {
executionContext.setParentTasksStep(parentContext.getParentTasksStep());
}
}
return new CommandContext(executionContext, lock);
}
将给定实体的所有任务状态更新为一致
public static void updateSpecificActionJobCompleted(Guid entityId, VdcActionType actionType, boolean status) {
try {
List<Job> jobs = JobRepositoryFactory.getJobRepository().getJobsByEntityAndAction(entityId, actionType);
for (Job job : jobs) {
if (job.getStatus() == JobExecutionStatus.STARTED)
job.markJobEnded(status);
JobRepositoryFactory.getJobRepository().updateCompletedJobAndSteps(job);
}
} catch (RuntimeException e) {
log.error(e);
}
}
public List<Job> getJobsByEntityAndAction(Guid entityId, VdcActionType actionType) {
List<Job> jobList = new ArrayList<Job>();
List<Guid> jobIdsList = jobSubjectEntityDao.getJobIdByEntityId(entityId);
for (Guid jobId : jobIdsList) {
Job job = jobDao.get(jobId);
if (job != null && job.getActionType() == actionType) {
jobList.add(job);
}
}
return jobList;
}
Command 命令执行流程
Command 命令执行流程- 橘黄色方法中,包含有 Command 命令任务相关的操作。
- MultipleActionsRunner.executeValidatedCommand 方法中创建了 Command 命令的任务记录。
ExecutionHandler.prepareCommandForMonitoring(command, command.getActionType(), command.isInternalExecution());
- CommandBase.executeAction 方法中记录了 VALIDATING(验证) 的步骤。
validatingStep = ExecutionHandler.addStep(getExecutionContext(), StepEnum.VALIDATING, null);
actionAllowed = getReturnValue().getCanDoAction() || internalCanDoAction();
if (!isExternal) {
ExecutionHandler.endStep(getExecutionContext(), validatingStep, actionAllowed);
}
- CommandBase.execute 方法中记录了 EXECUTING(执行) 的步骤。
ExecutionHandler.addStep(getExecutionContext(), StepEnum.EXECUTING, null);
if (!hasTasks() && !ExecutionHandler.checkIfJobHasTasks(getExecutionContext())) {
ExecutionHandler.endJob(getExecutionContext(), getSucceeded());
}
- Command 命令执行类中,通过 getExecutionContext() 就能获取当前 Command 命令的任务上下文,从而获取该命令相关的任务和步骤。
异步任务执行流程
异步任务执行流程- 添加异步任务在执行命令 Command 任务行为中的步骤。
Step taskStep = ExecutionHandler.addTaskStep(getExecutionContext(), StepEnum.getStepNameByTaskType(asyncTaskCreationInfo.getTaskType()), description);
- 建立异步任务连接
SPMAsyncTask task = concreteCreateTask(taskId, asyncTaskCreationInfo, parentCommand)
- 保存异步任务数据到 async_tasks 表中。
AsyncTaskUtils.addOrUpdateTaskInDB(task);
- 更新异步任务外部系统类型和 vdsm 任务 ID 等。
ExecutionHandler.updateStepExternalId(taskStep, vdsmTaskId, ExternalSystemType.VDSM);
- CommandBase.executeAction 方法执行完成后,清除异步任务表 async_tasks 中数据。
if (!getReturnValue().getSucceeded()) {
clearAsyncTasksWithOutVdsmId();
}
private void clearAsyncTasksWithOutVdsmId() {
if (!getReturnValue().getTaskPlaceHolderIdList().isEmpty()) {
TransactionSupport.executeInNewTransaction(new TransactionMethod<Void>() {
@Override
public Void runInTransaction() {
for (Guid asyncTaskId : getReturnValue().getTaskPlaceHolderIdList()) {
AsyncTasks task = getAsyncTaskDao().get(asyncTaskId);
if (task != null && Guid.isNullOrEmpty(task.getVdsmTaskId())) {
AsyncTaskManager.removeTaskFromDbByTaskId(task.getTaskId());
}
}
return null;
}
});
}
}