如何给 Airflow 的 DAG 文件添加 CI
我们使用 Airflow 作为任务调度引擎, 那么就需要有一个 DAG 的定义文件, 每次修改 DAG 定义, 提交 code review 我都在想, 如何给这个流程添加一个 CI, 确保修改的 DAG 文件正确并且方便 reviewer 做 code review?
0x00 Airflow DAG 介绍
DAG 的全称是 Directed acyclic graph(有向无环图), 在 ETL 系统中, 由于很多的任务相互依赖, 因此构成的就是 DAG, 确保前置任务完成后, 对应的计算任务才会执行.
Airflow 中的任务 DAG 定义实现非常有意思, 假设让我去设计一个 DAG 定义语法, 估计我一定就会循规蹈矩的使用 JSON/YAML 等配置文件, 通过各种依赖语法实现. 但 Airflow 最大的创新是使用 Python 文件定义 DAG. 例如如下官网文档中的 DAG 示例:
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('tutorial', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
那么使用 Python 文件定义 DAG 的好处是什么呢:
- Python 是代码, 因此可以实现逻辑. 比如计算日期, 获取 IP, 调用其他系统的 API, 凡是 Python 能干的事情, DAG 中都能干. 因此你的 DAG 规则可以任你发挥. 甚至可以直接作为代码被其他代码
import
- 自带语法检查. DAG 语法是不是正确, 直接调用
python dag.py
就可以做基础的语法检查
0x01 Airflow DAG 接入 CI
那么实际应用过程中, 随着业务的复杂, DAG 也会越来越复杂, 修改了几行代码, 如何方便的 Code Review?
例如, 在 t0.py
的基础上加了一个新的 task: t4
作为 t3
的 downstream, 代码 diff 仅仅能显示如下信息:
CI 中简单的做法可以通过执行 python t1.py
来做 Python 语法级别的检测, 没有问题, 剩下的就靠 reviewer 脑补了.
如果想进一步优化 CI 的输出呢?
0x02 code as configuration file 的好处
琢磨了一下, 发现每个 Airflow DAG 文件都会有一个 airflow.DAG
的实例, 每个 Task 都会注册到这个 dag 的实例中, 猜测 Airflow framework 是通过每个 DAG 文件的 dag
实例获取 DAG 的信息的.
启动了有个 Python shell, help 一下 import 进来的 dag 实例, 发现有 tree_view(self)
方法:
Python 2.7.10 (default, Oct 23 2015, 19:19:21)
[GCC 4.2.1 Compatible Apple LLVM 7.0.0 (clang-700.0.59.5)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from t1 import dag
[2017-08-21 23:05:48,021] {__init__.py:36} INFO - Using executor SequentialExecutor
>>> help(dag.tree_view)
Help on method tree_view in module airflow.models:
tree_view(self) method of airflow.models.DAG instance
Shows an ascii tree representation of the DAG
>>> dag.tree_view()
<Task(BashOperator): sleep>
<Task(BashOperator): print_date>
<Task(BashOperator): t4>
<Task(BashOperator): templated>
<Task(BashOperator): print_date>
那岂不是直接 diff 一下代码修改前后的 tree_view(self)
方法的输出, 就可以在 code review 的时候更方便的显示修改过的 DAG 了?
那么 CI 步骤就很明确了:
- checkout code, 执行
python dag.py
确保 DAG 语法正确 - 使用 Python 的 imp 模块动态 import DAG 文件并调用
dag.tree_view()
方法, 打印 DAG 日志到current.log
- 切换分支到 master, 重复第二步, 将 DAG 日志打印到
master.log
- 为了提升可读性, 使用 icdiff 对比
current.log
和masater.log
需要注意的是, gitlab CI 中获取当前 git 分支名称有些 tricky, 需要从环境变量 $CI_COMMIT_REF_NAME
中获取.
if [ -z "$CI_COMMIT_REF_NAME" ]
then
current_branch=`git rev-parse --abbrev-ref HEAD`
else
# get the branch name from gitlab ci, see http://docs.gitlab.com/ce/ci/variables/README.html
current_branch="$CI_COMMIT_REF_NAME"
fi
echo "`date +'%F %T'` working on branch: $current_branch"
总结
Airflow 的设计中, 我最喜欢的就是使用 Python 代码定义 DAG 这点, 扩展性太强了. 从这点受到启发, 我在另一个系统中放弃了自己造轮子定义 DSL, 也同样使用了 Python 作为 DSL, 效果非常好.
-- EOF --