代码库

Azkaban 源码分析之作业执行篇

2018-08-24  本文已影响0人  陌城小川

Executor 执行信息分析

当服务器通过 ExecutorManager 中 dispatch 方法将服务器的提交信息交给 flow 的时候.

Executor 这边通过 Azkaban-exec-server/ExecutorServlet 的 doGet 方法来接收到请求信息, 通过handleAjaxExecute 交给 FlowRunnerManager 的 submitFlow来处理.

public void submitFlow(int execId) throws ExecutorManagerException {

    // Load file and submit
    // 先查看 flow 是否已经被运行 。
    if (runningFlows.containsKey(execId)) {
      throw new ExecutorManagerException("Execution " + execId
          + " is already running.");
    }

    ExecutableFlow flow = null;

    flow = executorLoader.fetchExecutableFlow(execId);

    logger.info("get flow : " + flow.getFlowId());

    if (flow == null) {
      throw new ExecutorManagerException("Error loading flow with exec "
          + execId);
    }

    // Sets up the project files and execution directory.
    // 创建 executor 目录并关联 version_project
    setupFlow(flow);

    // Setup flow runner
    FlowWatcher watcher = null;

    // 获取 flow 的相关属性
    ExecutionOptions options = flow.getExecutionOptions();


    // 如果flow 运行之前还有没有结束的队列前面的 flow
    // 则 监控其状态
    if (options.getPipelineExecutionId() != null) {
      Integer pipelineExecId = options.getPipelineExecutionId();
      FlowRunner runner = runningFlows.get(pipelineExecId);

      if (runner != null) {
        watcher = new LocalFlowWatcher(runner);
      } else {
        watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
      }
    }

    // 获取 job  运行的线程数量
    int numJobThreads = numJobThreadPerFlow;

    if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
      try {
        int numJobs =
            Integer.valueOf(options.getFlowParameters().get(
                FLOW_NUM_JOB_THREADS));
        if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
                .isProjectWhitelisted(flow.getProjectId(),
                    WhitelistType.NumJobPerFlow))) {
          numJobThreads = numJobs;
        }
      } catch (Exception e) {
        throw new ExecutorManagerException(
            "Failed to set the number of job threads "
                + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
                + " for flow " + execId, e);
      }
    }

    FlowRunner runner =
        new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);

    // 初始化属性
    runner.setFlowWatcher(watcher)
        .setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
        .setValidateProxyUser(validateProxyUser)
        .setNumJobThreads(numJobThreads).addListener(this);

    configureFlowLevelMetrics(runner);

    // Check again.
    if (runningFlows.containsKey(execId)) {
      throw new ExecutorManagerException("Execution " + execId
          + " is already running.");
    }

    // Finally, queue the sucker.
    runningFlows.put(execId, runner);

    try {
      // The executorService already has a queue.
      // The submit method below actually returns an instance of FutureTask,
      // which implements interface RunnableFuture, which extends both
      // Runnable and Future interfaces
      // 向线程池中提交一个 flow
      Future<?> future = executorService.submit(runner);
      // keep track of this future
      submittedFlows.put(future, runner.getExecutionId());
      // update the last submitted time.
      this.lastFlowSubmittedDate = System.currentTimeMillis();
    } catch (RejectedExecutionException re) {
      throw new ExecutorManagerException(
          "Azkaban server can't execute any more flows. "
              + "The number of running flows has reached the system configured limit."
              + "Please notify Azkaban administrators");
    }
  }
上一篇下一篇

猜你喜欢

热点阅读