airflow scheduler 执行过程
2020-03-20 本文已影响0人
三须
image.png
0、airflow scheduler命令执行,创建schedulerJob并开始执行
1、根据conf初始化Executor,执行器启动 (executor.start())
初始化DAG处理管理器代理(DagFileProcessorAgent)并启动
2、创建一个新的进程,DagFileProcessorManager初始化并启动
self._process = multiprocessing.Process(
target=type(self)._run_processor_manager,
args=(
self._dag_directory,
self._file_paths,
self._max_runs,
self._processor_factory,
self._processor_timeout,
child_signal_conn,
self._async_mode,
)
)
self._process.start()
4、遍历Dags文件一个文件创建一个进程
DagFileProcessor.start() →DagFileProcessor._run_file_processor()
5、定时任务类(SchedulerJob)处理单个DAG文件,判断是否满足执行条件;创建DAGRun和TaskInstance,修改DAGRun状态
SchedulerJob.process_file(file_path, pickle_dags)
self._process_dags(dagbag, dags, ti_keys_to_schedule)
self._process_task_instances(dag, tis_out)
-> run.verify_integrity(session=session) 创建及校验TaskInstance,
run.update_state(session=session) 根据TI修改DAGRun的状态
遍历DAGRun中TaskInstance将满足依赖条件修改TI运行状态,将已处理完的DAG发送到队列中
查询可执行TI
for ti in tis:
task = dag.get_task(ti.task_id)
# fixme: ti.task is transient but needs to be set
ti.task = task
if ti.are_dependencies_met( ##判断是否满足依赖条件
dep_context=DepContext(flag_upstream_failed=True),
session=session):
self.log.debug('Queuing task: %s', ti)
task_instances_list.append(ti.key)
发送已处理完成的DAG
result = scheduler_job.process_file(file_path, pickle_dags)
result_channel.send(result)
7、SchedulerJob,循环收集各个DAG文件处理器进程中处理完成的DAG
simple_dags = self.processor_agent.harvest_simple_dags()
根据已处理完的DAG,根据pool的大小和Task的权重,执行器将TaskInstance发送到队列中
self._execute_task_instances(simple_dag_bag,(State.SCHEDULED,session))
发送TaskInstance到队列中
# actually enqueue them
for simple_task_instance in simple_task_instances:
simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)
command = TI.generate_command(
simple_task_instance.dag_id,
simple_task_instance.task_id,
simple_task_instance.execution_date,
local=True,
mark_success=False,
ignore_all_deps=False,
ignore_depends_on_past=False,
ignore_task_deps=False,
ignore_ti_state=False,
pool=simple_task_instance.pool,
file_path=simple_dag.full_filepath,
pickle_id=simple_dag.pickle_id)
priority = simple_task_instance.priority_weight
queue = simple_task_instance.queue
self.log.info(
"Sending %s to executor with priority %s and queue %s",
simple_task_instance.key, priority, queue
)
self.executor.queue_command(
simple_task_instance,
command,
priority=priority,
queue=queue)