编排调度

airflow

2019-08-22  本文已影响0人  eleven_a1dc

下载安装airflow

使用pip 安装

pip install apache-airflow

将默认的数据库sqlite 改为mysql

如果想用 mysql 作为 数据库,则 执行以下命令

  1. pip install 'apache-airflow[mysql]'
  2. 下载 mysql(已经有了则跳过)
  3. 更改airflow.cfg 文件,更改以下这两个配置,executor 改为local
executor = LocalExecutor
 # their website
sql_alchemy_conn = mysql://root:PASSWORD@localhost/Airflow

Airflow 是提前建好的数据库。

  1. 执行airflow initdb 命令。如果报错 Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
    则需要在mysql 的配置文件my.cnf ,mysqld 下增加explicit_defaults_for_timestamp = 1。然后重启mysqld 。如果是 brew 安装的mysql 。执行
$ brew services list
$ brew services restart SERVICE_NAME

quick start

# airflow needs a home, ~/airflow is the default,
# but you can lay foundation somewhere else if you prefer
# (optional)
export AIRFLOW_HOME=~/airflow

# install from pypi using pip
pip install apache-airflow

# initialize the database
airflow initdb

# start the web server, default port is 8080
airflow webserver -p 8080

# start the scheduler
airflow scheduler

# visit localhost:8080 in the browser and enable the example dag in the home page

Example

官方文档的例子

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

实例化一个DAG

tutorial 是 dag_id. 唯一标识你的DAG。

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

Tasks

当实例化 operators 时,就会生成 Tasks。 An object instantiated from an operator is called a constructor. 参数task_id 是 作为task 的唯一标识。

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

一个task 必须包含或者继承task_idowner, 不然Airflow 会引发异常。

Templating with Jinja

Airflow 提供了一些内置的 参数和宏。

建立依赖

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

当发现有环,或者引用多次,airflow就会引起异常。

Testing

运行脚本

将上面的代码放到一个文件 tutorial.py 在DAG 文件夹下,这个文件夹在airflow.cfg 配置好了。默认是~/airflow/dags

python ~/airflow/dags/tutorial.py

命令行元数据验证

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree

Testing

airflow test tutorial print_date 2015-06-01

Backfill

执行backfill 会真正执行 dag, 在UI 上可以看到执行状态。

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

airflow Guides

Config

配置文件 airflow.cfg. 也可以用过环境变量 $AIRFLOW__{SECTION}__{KEY}
比如
数据库的连接,在airflow.cfg 如下

[core]
sql_alchemy_conn = my_conn_string

也可以新建一个环境变量,像

AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_string

也可以 在key 后面添加_cmd 后面跟bash 命令,比如

[core]
sql_alchemy_conn_cmd = bash_command_to_run

支持_cmd的有

配置选项的优先级顺序:

  1. 环境变量
  2. 在airflow.cfg的配置
  3. 在airflow.cfg 的命令
  4. Airflow 默认的内置配置
上一篇 下一篇

猜你喜欢

热点阅读