LinuxPythonCelery

Python - Airflow任务调度系统初识

2020-11-22  本文已影响0人  红薯爱帅

1. 概述

Apache Airflow

Airflow是airbnb开源的基于DAG(有向无环图)的用Python开发的任务管理系统。最简单的理解就是一个高级版的crontab,它解决了crontab无法解决的任务依赖问题。

项目于2014年启动,于2015年春季开源,于2016年加入Apache软件基金会的孵化计划。

Airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

有向无环图

1.1. 应用场景

1.2. 优势

1.3. 劣势

2. Concepts

2.1. 术语说明

2.1.1. airflow.DAG,实例化之后,称之为Dag RunDag Instance

DAG(Directed Acyclic Graph)是有向无环图,也称为有向无循环图。在Airflow中,一个DAG定义了一个完整的作业。同一个DAG中的所有Task拥有相同的调度时间。

代码样例
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner' : 'airflow' ,
    'depends_on_past' : False ,
    'start_date' : datetime ( 2015 , 6 , 1 ),
    'email' : [ 'airflow@example.com' ],
    'email_on_failure' : False ,
    'email_on_retry' : False ,
    'retries' : 1 ,
    'retry_delay' : timedelta( minutes = 5 ),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag = DAG('test_dag', default_args=default_args)
参数说明

2.1.2. airflow.operators,实例化之后,称之为Task

操作器,定义任务以哪种方式执行。airflow有多种operator,如BashOperator、DummyOperator、MySqlOperator、HiveOperator以及社区贡献的operator等,其中BaseOperator是所有operator的基础operator。

Operator Name Description
BaseOperator 基础operator,设置baseoperator会影响所有的operator
BashOperator executes a bash command
DummyOperator 空操作
PythonOperator calls an arbitrary Python function
EmailOperator sends an email
HTTPOperator sends an HTTP request
SqlOperator executes a SQL command
DockerOperator execute a command inside a docker container
Sensor waits for a certain time, file, database row, S3 key, etc…
代码样例
from airflow.operators.bash_operator import BashOperator

t1 = BashOperator (
    task_id = 'print_date' ,
    bash_command = 'date' ,
    dag = dag )

t2 = BashOperator (
    task_id = 'sleep' ,
    bash_command = 'sleep 5' ,
    retries = 3 ,
    dag = dag )

t1 >> t2
# t2.set_upstream(t1)
# t1.set_downstream(t2)

Task为DAG中具体的作业任务,依赖于DAG,也就是必须存在于某个DAG中。

Task在DAG中可以配置依赖关系(当然也可以配置跨DAG依赖,但是并不推荐。跨DAG依赖会导致DAG图的直观性降低,并给依赖管理带来麻烦)。

参数说明:

all_success: (default) all parents have succeeded 父task全success

all_failed: all parents are in a failed or upstream_failed state 父task全failed或者upstream_failed状态

all_done: all parents are done with their execution 父task全执行过,不管success or failed

one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done 当父task中有一个是failed状态时执行,不必等到所有的父task都执行

one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done 当父task中有一个是success状态时执行,不必等到所有的父task都执行

dummy: dependencies are just for show, trigger at will 无条件执行

2.1.3. airflow.executor,即调度方式

在配置文件config/airflow.cfg中,修改executor变量。

2.1.4. 其他

2.2. 服务构成

单节点部署

Webserver

Airflow提供了一个可视化的Web界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。

image.png

Scheduler

整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。

Worker

一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。

Flower

Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。

image.png

3. 单机部署与测试

3.1. Install

pip install apache-airflow

如果遇到ImportError: cannot import name 'resolve_types',解决办法:

pip3 install cattrs==1.0.0

参考:https://github.com/apache/airflow/issues/11965

3.2. 启动

# 在airflow目录初始化数据库和airflow配置
airflow initdb
# 启动 airflow web
airflow webserver
# 开始调度
airflow scheduler
# 测试任务,格式:airflow test dag_id task_id execution_time
airflow test test_task test1 2019-09-10
# 查看生效的 DAGs
airflow list_dags -sd $AIRFLOW_HOME/dags
# 开始运行任务(同 web 界面点 trigger 按钮)
airflow trigger_dag test_task    
# 暂停任务
airflow pause dag_id      
# 取消暂停,等同于在 web 管理界面打开 off 按钮
airflow unpause dag_id     
# 查看 task 列表
airflow list_tasks dag_id  查看task列表
# 清空任务状态
airflow clear dag_id       
# 运行task
airflow run dag_id task_id execution_date

3.3. 测试

# [START tutorial]
# [START import_module]
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}
# [END default_args]

# [START instantiate_dag]
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)
# [END instantiate_dag]

# t1, t2 and t3 are examples of tasks created by instantiating operators
# [START basic_task]
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    retries=3,
    dag=dag,
)
# [END basic_task]

# [START documentation]
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
# [END documentation]

# [START jinja_template]
templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)
# [END jinja_template]

t1 >> [t2, t3]
# [END tutorial]

4. 分布式部署与测试

分布式部署

采用docker部署airflow分布式调度系统,编排方式可以是k8s、swarm,这里采用docker-compose简单实现:
https://github.com/puckel/docker-airflow

$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
$ docker-compose -f docker-compose-CeleryExecutor.yml scale worker=2

5. 参考资料

上一篇 下一篇

猜你喜欢

热点阅读