airflow scheduler 执行过程

2020-03-20  本文已影响0人  三须
image.png

0、airflow scheduler命令执行,创建schedulerJob并开始执行

1、根据conf初始化Executor,执行器启动 (executor.start())
初始化DAG处理管理器代理(DagFileProcessorAgent)并启动
2、创建一个新的进程,DagFileProcessorManager初始化并启动

self._process = multiprocessing.Process(
            target=type(self)._run_processor_manager,
            args=(
                self._dag_directory,
                self._file_paths,
                self._max_runs,
                self._processor_factory,
                self._processor_timeout,
                child_signal_conn,
                self._async_mode,
            )
        )
        self._process.start()

4、遍历Dags文件一个文件创建一个进程
DagFileProcessor.start() →DagFileProcessor._run_file_processor()
5、定时任务类(SchedulerJob)处理单个DAG文件,判断是否满足执行条件;创建DAGRun和TaskInstance,修改DAGRun状态

SchedulerJob.process_file(file_path, pickle_dags)
                     self._process_dags(dagbag, dags, ti_keys_to_schedule)
                     self._process_task_instances(dag, tis_out)
                         -> run.verify_integrity(session=session) 创建及校验TaskInstance,
                            run.update_state(session=session)   根据TI修改DAGRun的状态
       遍历DAGRun中TaskInstance将满足依赖条件修改TI运行状态,将已处理完的DAG发送到队列中

查询可执行TI

for ti in tis:
    task = dag.get_task(ti.task_id)
 
    # fixme: ti.task is transient but needs to be set
    ti.task = task
 
    if ti.are_dependencies_met(  ##判断是否满足依赖条件
            dep_context=DepContext(flag_upstream_failed=True),
            session=session):
        self.log.debug('Queuing task: %s', ti)
        task_instances_list.append(ti.key)

发送已处理完成的DAG

result = scheduler_job.process_file(file_path, pickle_dags)
result_channel.send(result)

7、SchedulerJob,循环收集各个DAG文件处理器进程中处理完成的DAG
simple_dags = self.processor_agent.harvest_simple_dags()
根据已处理完的DAG,根据pool的大小和Task的权重,执行器将TaskInstance发送到队列中
self._execute_task_instances(simple_dag_bag,(State.SCHEDULED,session))
发送TaskInstance到队列中

# actually enqueue them
       for simple_task_instance in simple_task_instances:
           simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
           command = TI.generate_command(
               simple_task_instance.dag_id,
               simple_task_instance.task_id,
               simple_task_instance.execution_date,
               local=True,
               mark_success=False,
               ignore_all_deps=False,
               ignore_depends_on_past=False,
               ignore_task_deps=False,
               ignore_ti_state=False,
               pool=simple_task_instance.pool,
               file_path=simple_dag.full_filepath,
               pickle_id=simple_dag.pickle_id)
 
           priority = simple_task_instance.priority_weight
           queue = simple_task_instance.queue
           self.log.info(
               "Sending %s to executor with priority %s and queue %s",
               simple_task_instance.key, priority, queue
           )
 
           self.executor.queue_command(
               simple_task_instance,
               command,
               priority=priority,
               queue=queue)
上一篇下一篇

猜你喜欢

热点阅读