Airflow 动态创建Task
2020-11-21 本文已影响0人
灯火gg
import airflow
import MySQLdb
from impala.util import as_pandas
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
# todo 优化脚本
def get_datas():
db = MySQLdb.connect(host="xxx.xx.xx.xx", port=4313, user="xxxx", password="xxxx",
database="xxxx", charset='utf8')
cur = db.cursor()
cur.execute(
"select * from xxx ")
return as_pandas(cur).fillna(0).to_dict(orient='records')
def multitasking_task(xx, dag):
python_command = "python3"
py_path = "/data/airflow/dag_scripts/xxxx.py"
return BashOperator(
task_id='handle_task_with_id_{}'.format(xx['id']),
bash_command="{} {} {} {}".format(python_command, py_path, "{{ ds }}", xx['id']),
dag=dag,
)
def running():
default_args = {
'owner': 'xx',
'start_date': airflow.utils.dates.days_ago(1),
'email': ['xx@mail.jj.cn'],
'email_on_failure': False,
'email_on_retry': False,
}
dag = DAG(dag_id='xx',
default_args=default_args,
schedule_interval='10 05 * * *', )
start = DummyOperator(
task_id="start",
dag=dag
)
end = DummyOperator(
task_id="end",
dag=dag
)
# todo 优化task过多情况,可以动态创建dag任务
for data in get_datas():
start >> multitasking_task(data , dag) >> end
return dag
dag = running()