Azkaban 源码分析之作业提交
介绍 :
Azkaban 提交作业有两种 :
- 通过人为手动提交一个作业
- 通过调度信息去调度执行一个作业
1. 作业调度信息 :
1.1 通过人为去手动提交一个作业:
用户提交作业首先需要通过 LoginAbstractAzkabanServlet 中的 doGet
方法
用户通过 doGet
进入 handleGet
(azkaban-web-server/ExecutorServlet) 方法,然后进入ajaxAttemptExecuteFlow
.
在 ajaxAttemptExecuteFlow
中需要经过权限检测,并获取 project 对应的 flow
.
- project 权限控制
protected Project getProjectAjaxByPermission(Map<String, Object> ret,
String projectName, User user, Permission.Type type) {
Project project = projectManager.getProject(projectName);
if (project == null) {
ret.put("error", "Project '" + project + "' not found.");
} else if (!hasPermission(project, user, type)) {
ret.put("error",
"User '" + user.getUserId() + "' doesn't have " + type.name()
+ " permissions on " + project.getName());
} else {
return project;
}
return null;
}
之后进入 ajaxExecuteFlow
来根据用户页面提交信息,为 flow
初始化, 并且将信息初始化到flow 对应的 ExecutableFlow
信息类中.
信息初始化完毕后, 就通过 ExecutorManager.submitExecutableFlow
将信息放入到 queuedFlows
队列中去.
1.2 通过调度去调起一个作业:
- 提交调度请求
用户提交调度的请求首先需要通过 LoginAbstractAzkabanServlet
的 doGet
的方法.
之后进入 ScheduleServlet
的 handleGet
再到 handleAJAXAction
, ajaxScheduleFlow
来处理调度请求.
private void ajaxScheduleFlow(HttpServletRequest req,
HashMap<String, Object> ret, User user) throws ServletException {
String projectName = getParam(req, "projectName");
String flowName = getParam(req, "flow");
int projectId = getIntParam(req, "projectId");
Project project = projectManager.getProject(projectId);
if (project == null) {
ret.put("message", "Project " + projectName + " does not exist");
ret.put("status", "error");
return;
}
if (!hasPermission(project, user, Type.SCHEDULE)) {
ret.put("status", "error");
ret.put("message", "Permission denied. Cannot execute " + flowName);
return;
}
Flow flow = project.getFlow(flowName);
if (flow == null) {
ret.put("status", "error");
ret.put("message", "Flow " + flowName + " cannot be found in project "
+ project);
return;
}
String scheduleTime = getParam(req, "scheduleTime");
String scheduleDate = getParam(req, "scheduleDate");
DateTime firstSchedTime;
try {
firstSchedTime = parseDateTime(scheduleDate, scheduleTime);
} catch (Exception e) {
ret.put("error", "Invalid date and/or time '" + scheduleDate + " "
+ scheduleTime);
return;
}
// String sscheduleTime = firstSchedTime.toString("yyyy-MM-dd HH:mm:ss");
// System.out.println(sscheduleTime);
ReadablePeriod thePeriod = null;
try {
if (hasParam(req, "is_recurring")
&& getParam(req, "is_recurring").equals("on")) {
thePeriod = Schedule.parsePeriodString(getParam(req, "period"));
}
} catch (Exception e) {
ret.put("error", e.getMessage());
}
ExecutionOptions flowOptions = null;
try {
flowOptions = HttpRequestUtils.parseFlowOptions(req);
HttpRequestUtils.filterAdminOnlyFlowParams(userManager, flowOptions, user);
} catch (Exception e) {
ret.put("error", e.getMessage());
}
//flowOptions.setScheduleTime(sscheduleTime);
List<SlaOption> slaOptions = null;
Schedule schedule =
scheduleManager.scheduleFlow(-1, projectId, projectName, flowName,
"ready", firstSchedTime.getMillis(), firstSchedTime.getZone(),
thePeriod, DateTime.now().getMillis(), firstSchedTime.getMillis(),
firstSchedTime.getMillis(), user.getUserId(), flowOptions,
slaOptions); // 创建 Schedule 并构建调度信息
logger.info("User '" + user.getUserId() + "' has scheduled " + "["
+ projectName + flowName + " (" + projectId + ")" + "].");
projectManager.postProjectEvent(project, EventType.SCHEDULE,
user.getUserId(), "Schedule " + schedule.toString()
+ " has been added.");
ret.put("status", "success");
ret.put("scheduleId", schedule.getScheduleId());
ret.put("message", projectName + "." + flowName + " scheduled.");
}
- 执行调度
调度是通过 TriggerManager
来管理的, TriggerManager
中有一个 TriggerScannerThread
线程, 不停的轮询检查 Trigger 集合, 一分钟轮询一次
在 checkAllTriggers
中检查所有的调度.
之后对已经具备执行条件的 trigger 在调用 onTriggerTrigger
. 之后检查 Action , 通过 Action.doAction()
来调用执行过程.
ExecuteFlowAction implements TriggerAction
@Override
public void doAction() throws Exception { // 调度经过位置
if (projectManager == null || executorManager == null) {
throw new Exception("ExecuteFlowAction not properly initialized!");
}
Project project = projectManager.getProject(projectId);
if (project == null) {
logger.error("Project to execute " + projectId + " does not exist!");
throw new RuntimeException("Error finding the project to execute "
+ projectId);
}
Flow flow = project.getFlow(flowName);
if (flow == null) {
logger.error("Flow " + flowName + " cannot be found in project "
+ project.getName());
throw new RuntimeException("Error finding the flow to execute "
+ flowName);
}
// 创建 ExecutableFlow 对象管理 flow
ExecutableFlow exflow = new ExecutableFlow(project, flow);
exflow.setSubmitUser(submitUser);
exflow.addAllProxyUsers(project.getProxyUsers());
if (executionOptions == null) {
executionOptions = new ExecutionOptions();
}
if (!executionOptions.isFailureEmailsOverridden()) {
executionOptions.setFailureEmails(flow.getFailureEmails());
}
if (!executionOptions.isSuccessEmailsOverridden()) {
executionOptions.setSuccessEmails(flow.getSuccessEmails());
}
String scheduleTime = context.get("ScheduleTime").toString();
// 重新设置 SchedultTime 变量
executionOptions.getFlowParameters().put("ScheduleTime", scheduleTime);
//executionOptions.setScheduleTime(context.get("ScheduleTime").toString());
exflow.setExecutionOptions(executionOptions);
exflow.setScheduleTime(scheduleTime);
try {
// 提交执行
executorManager.submitExecutableFlow(exflow, submitUser); // 提交一个作业
logger.info("Invoked flow " + project.getName() + "." + flowName);
} catch (ExecutorManagerException e) {
throw new RuntimeException(e);
}
// deal with sla
if (slaOptions != null && slaOptions.size() > 0) { // sla 选项检查
int execId = exflow.getExecutionId();
for (SlaOption sla : slaOptions) {
logger.info("Adding sla trigger " + sla.toString() + " to execution "
+ execId);
SlaChecker slaFailChecker =
new SlaChecker("slaFailChecker", sla, execId);
Map<String, ConditionChecker> slaCheckers =
new HashMap<String, ConditionChecker>();
slaCheckers.put(slaFailChecker.getId(), slaFailChecker);
Condition triggerCond =
new Condition(slaCheckers, slaFailChecker.getId()
+ ".isSlaFailed()");
// if whole flow finish before violate sla, just expire
SlaChecker slaPassChecker =
new SlaChecker("slaPassChecker", sla, execId);
Map<String, ConditionChecker> expireCheckers =
new HashMap<String, ConditionChecker>();
expireCheckers.put(slaPassChecker.getId(), slaPassChecker);
Condition expireCond =
new Condition(expireCheckers, slaPassChecker.getId()
+ ".isSlaPassed()");
List<TriggerAction> actions = new ArrayList<TriggerAction>();
List<String> slaActions = sla.getActions();
for (String act : slaActions) {
if (act.equals(SlaOption.ACTION_ALERT)) {
SlaAlertAction slaAlert =
new SlaAlertAction("slaAlert", sla, execId);
actions.add(slaAlert);
} else if (act.equals(SlaOption.ACTION_CANCEL_FLOW)) {
KillExecutionAction killAct =
new KillExecutionAction("killExecution", execId);
actions.add(killAct);
}
}
Trigger slaTrigger =
new Trigger("azkaban_sla", "azkaban", triggerCond, expireCond,
actions);
slaTrigger.getInfo().put("monitored.finished.execution",
String.valueOf(execId));
slaTrigger.setResetOnTrigger(false);
slaTrigger.setResetOnExpire(false);
logger.info("Ready to put in the sla trigger");
triggerManager.insertTrigger(slaTrigger);
logger.info("Sla inserted.");
}
}
}
通过 doAction
初始化了flow 环境之后, 作业就通过 submitExecutableFlow
提交给了 queuedFlows
队列
@Override
public String submitExecutableFlow(ExecutableFlow exflow, String userId) // 用户提交 flow 的执行入口
throws ExecutorManagerException {
synchronized (exflow) {
String flowId = exflow.getFlowId();
logger.info("Submitting execution flow " + flowId + " by " + userId);
String message = "";
if (queuedFlows.isFull()) {
message =
String
.format(
"Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity",
flowId, exflow.getProjectName());
logger.error(message);
} else {
int projectId = exflow.getProjectId();
exflow.setSubmitUser(userId);
exflow.setSubmitTime(System.currentTimeMillis());
List<Integer> running = getRunningFlows(projectId, flowId);
ExecutionOptions options = exflow.getExecutionOptions();
if (options == null) {
options = new ExecutionOptions();
}
if (options.getDisabledJobs() != null) {
applyDisabledJobs(options.getDisabledJobs(), exflow);
}
if (!running.isEmpty()) {
if (options.getConcurrentOption().equals(
ExecutionOptions.CONCURRENT_OPTION_PIPELINE)) {
Collections.sort(running);
Integer runningExecId = running.get(running.size() - 1);
options.setPipelineExecutionId(runningExecId); // 如果指标中有 Pipline 级别的话,如果 当前有没有执行完毕的任务,则设置等待.
message =
"Flow " + flowId + " is already running with exec id "
+ runningExecId + ". Pipelining level "
+ options.getPipelineLevel() + ". \n";
} else if (options.getConcurrentOption().equals( // 如果设置 skip 状态, 则如果当前队列中有没有执行完毕的任务, 则设置跳过(通过抛异常的方式)
ExecutionOptions.CONCURRENT_OPTION_SKIP)) {
throw new ExecutorManagerException("Flow " + flowId
+ " is already running. Skipping execution.",
ExecutorManagerException.Reason.SkippedExecution);
} else {
// The settings is to run anyways.
message =
"Flow " + flowId + " is already running with exec id "
+ StringUtils.join(running, ",")
+ ". Will execute concurrently. \n";
}
}
boolean memoryCheck =
!ProjectWhitelist.isProjectWhitelisted(exflow.getProjectId(),
ProjectWhitelist.WhitelistType.MemoryCheck);
options.setMemoryCheck(memoryCheck); // 是否检查内存大小
// The exflow id is set by the loader. So it's unavailable until after
// this call.
executorLoader.uploadExecutableFlow(exflow); // 向 execution_flow 中插入一条记录(等待 executor 去读取)
// We create an active flow reference in the datastore. If the upload
// fails, we remove the reference.
ExecutionReference reference =
new ExecutionReference(exflow.getExecutionId());
if (isMultiExecutorMode()) { // 多队列模式执行情况
//Take MultiExecutor route
executorLoader.addActiveExecutableReference(reference); // 向 active_executing_flows 插入一条记录
queuedFlows.enqueue(exflow, reference); // 将提交的 flow 放入, 等待队列中
} else {
// assign only local executor we have // 否则,将从存活的 executor 中找到一个并提交过去.
Executor choosenExecutor = activeExecutors.iterator().next();
executorLoader.addActiveExecutableReference(reference);
try {
dispatch(reference, exflow, choosenExecutor);
} catch (ExecutorManagerException e) {
executorLoader.removeActiveExecutableReference(reference
.getExecId());
throw e;
}
}
message +=
"Execution submitted successfully with exec id "
+ exflow.getExecutionId();
}
return message;
}
}
2. ExecutorManager信息:
作业提交后都会统一的交付给 ExecutorManager.submitExecutableFlow
去处理, 最终存储在 queuedFlows
这个 QueuedExecutions
结构(里面有个 List 容器)中.当然这个容器大小也有上限, 当 queueFlows
里面的元素超过一定数值后(azkaban.webserver.queue.size
,默认 100000),用户作业将提交失败.
作业交付给 queuedFlows
这个容器后, ExecutorManager
将通过内部一个单独线程QueueProcessorThread
去处理容器中的提交的作业.
public void run() {
// Loops till QueueProcessorThread is shutdown
while (!shutdown) {
synchronized (this) {
try {
// start processing queue if active, other wait for sometime
if (isActive) {
processQueuedFlows(activeExecutorRefreshWindowInMilisec,
activeExecutorRefreshWindowInFlows);
}
wait(QUEUE_PROCESSOR_WAIT_IN_MS);
} catch (Exception e) {
logger.error(
"QueueProcessorThread Interrupted. Probably to shut down.", e);
}
}
}
}
processQueuedFlows
会不停的轮询查看 queuedFlows
容器,查看有没有等待提交的作业. 然后从queuedFlows
中拿到作业,去选择一个 executor, 在将这个作业去提交给 executor, 并将信息放入到 runningFlows
中.
private void processQueuedFlows(long activeExecutorsRefreshWindow,
int maxContinuousFlowProcessed) throws InterruptedException,
ExecutorManagerException {
long lastExecutorRefreshTime = 0;
Pair<ExecutionReference, ExecutableFlow> runningCandidate;
int currentContinuousFlowProcessed = 0;
while (isActive() && (runningCandidate = queuedFlows.fetchHead()) != null) {
ExecutionReference reference = runningCandidate.getFirst();
ExecutableFlow exflow = runningCandidate.getSecond();
long currentTime = System.currentTimeMillis();
synchronized (ExecutorManager.this){
if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
// Refresh executorInfo for all activeExecutors
refreshExecutors();
lastExecutorRefreshTime = currentTime;
currentContinuousFlowProcessed = 0;
}
if(exflow.getUpdateTime() > lastExecutorRefreshTime) {
queuedFlows.enqueue(exflow, reference);
long sleepInterval =
activeExecutorsRefreshWindow
- (currentTime - lastExecutorRefreshTime);
// wait till next executor refresh
sleep(sleepInterval);
} else {
exflow.setUpdateTime(currentTime);
selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
}
}
// do not count failed flow processsing (flows still in queue)
if(queuedFlows.getFlow(exflow.getExecutionId()) == null) {
currentContinuousFlowProcessed++;
}
}
}
提交之前有一个检查 executor
存活的过程.
private void refreshExecutors() {
synchronized (activeExecutors) {
List<Pair<Executor, Future<String>>> futures =
new ArrayList<Pair<Executor, Future<String>>>();
for (final Executor executor : activeExecutors) {
// execute each executorInfo refresh task to fetch
Future<String> fetchExecutionInfo =
executorInforRefresherService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return callExecutorForJsonString(executor.getHost(),
executor.getPort(), "/serverStatistics", null);
}
});
futures.add(new Pair<Executor, Future<String>>(executor,
fetchExecutionInfo));
}
boolean wasSuccess = true;
for (Pair<Executor, Future<String>> refreshPair : futures) {
Executor executor = refreshPair.getFirst();
executor.setExecutorInfo(null); // invalidate cached EXecutorInfo
try {
// max 5 secs
String jsonString = refreshPair.getSecond().get(5, TimeUnit.SECONDS);
executor.setExecutorInfo(ExecutorInfo.fromJSONString(jsonString));
logger.info(String.format(
"Successfully refreshed executor: %s with executor info : %s",
executor, jsonString));
executor.setActive(true);
} catch (TimeoutException e) {
wasSuccess = false;
executor.setActive(false);
logger.error("Timed out while waiting for ExecutorInfo refresh"
+ executor, e);
} catch (Exception e) {
wasSuccess = false;
executor.setActive(false);
logger.error("Failed to update ExecutorInfo for executor : "
+ executor, e);
}
}
if (wasSuccess) {
lastSuccessfulExecutorInfoRefresh = System.currentTimeMillis();
}
}
}
在多个 executor 的模式下, 首先需要挑选一个 executor . 我这里因为公司业务重新写了这个过程 :
private Executor selectExecutor(ExecutableFlow exflow,
Set<Executor> activeExecutors) {
Set<Executor> availableExecutors = new HashSet<Executor>(); // 备用挑选的集合
ExecutorSelector selector = new ExecutorSelector(filterList, comparatorWeightsMap); // 使用挑选器挑选 executor
Executor choosenExecutor = null; // 选中的 executor
String userName = exflow.getLastModifiedByUser();
// 获取这个用户所有映射的 executor
Set<Executor> executors =userExecutor.get(userName);
// 存在映射关系
if(executors != null){
for(Executor executor : executors){
if(executor.isActive()){
availableExecutors.add(executor);
}
}
if(availableExecutors.size() != 0){
choosenExecutor = selector.getBest(availableExecutors, exflow);
}
}
if (choosenExecutor == null) {
logger.info("========== Choosen failth and now useing dispatcher");
logger.info("Using dispatcher for execution id :"
+ exflow.getExecutionId());
availableExecutors.clear(); // 清空信息
for(Executor executor : backupExecutors){
if(executor.isActive()){
availableExecutors.add(executor);
}
}
if(availableExecutors.size() != 0){
choosenExecutor = selector.getBest(availableExecutors, exflow);
}
// choosenExecutor = selector.getBest(activeExecutors, exflow);
}
return choosenExecutor;
}
挑选完成 executor 之后就可以通过 dispatch
函数将请求发送给 executor 节点来处理.
private void dispatch(ExecutionReference reference, ExecutableFlow exflow,
Executor choosenExecutor) throws ExecutorManagerException {
exflow.setUpdateTime(System.currentTimeMillis());
executorLoader.assignExecutor(choosenExecutor.getId(),
exflow.getExecutionId());
try {
callExecutorServer(exflow, choosenExecutor,
ConnectorParams.EXECUTE_ACTION);
} catch (ExecutorManagerException ex) {
logger.error("Rolling back executor assignment for execution id:"
+ exflow.getExecutionId(), ex);
executorLoader.unassignExecutor(exflow.getExecutionId());
throw new ExecutorManagerException(ex);
}
reference.setExecutor(choosenExecutor);
// move from flow to running flows
runningFlows.put(exflow.getExecutionId(), // 放到 runningFlow 里面
new Pair<ExecutionReference, ExecutableFlow>(reference, exflow));
logger.info(String.format(
"Successfully dispatched exec %d with error count %d",
exflow.getExecutionId(), reference.getNumErrors()));
}