Airflow 动态指定DAG运行队列
2021-03-25 本文已影响0人
默默无痕
Airflow版本 2.0.1
根据官网提示,airflow 支持hook函数,可以在DAG运行开始之前 根据我们触发的参数 指定队列
https://github.com/apache/airflow/discussions/14987
新建 airflow_local_settings.py 文件, 放在${AIRFLOW_HOME}/config/airflow_local_settings.py
无需重启scheduler 即可加载。
def task_instance_mutation_hook(task_instance):
dag_run = task_instance.get_dagrun()
conf = dag_run.conf
# print(conf)
# conf为 trigger_dag 的 --conf 参数
if conf['key4']:
if 'change_queue' in conf:
return
else:
task_instance.queue = conf['key4']
conf["change_queue"] = 'true'
上面代码表示,在DAG 刚刚要运行的时候,会执行上面的hook函数 拿到conf 根据conf中的 key4 重新执行DAG的queue。
DAG触发参数为
airflow dags trigger 'dag_test_bash' -r `uuidgen` --conf '{"key4":"test_param"}'
传入的参数 key4 的value 会赋值到 task_instance.queue 也就是在DAG运行前 为其动态分配了队列,实际生产中 可以用该种方式去路由DAG到指定ip的机器去运行,我们可以在每台机器起自己IP+业务前缀的worker,然后在这里指定,如指定queue为 prod_10.0.0.1的queue。