Airflow根据执行日期,调用kylin
最近接到一个需求,根据Airflow的执行日期,每次往前推3天重新计算kylin中的指标
(1)首先需要拿到Airflow的执行日期,根据官网可以知道jinja中可以拿到执行日期{{ds}}
(2)然后需要调用kylin的api,很简单查询官网,拿到API请求方式,这里需要注意的是,时间需要做一个转换为时间戳,另外需要注意的是,jinja中需要注意下字符转义的问题。
具体代码如下:
exec_kylinReBuild = """
{% autoescape false %}
p_start_time=$(date -d \"{{ macros.ds_add(ds, -3) }}\" +%s000)
p_end_time=$(date -d \"{{ ds }}\" +%s000)
echo "end_date:"$p_end_time
echo "start_date:"$p_start_time
echo "exec_command:curl --user ADMIN:KYLIN -X PUT -H 'Content-Type: application/json' -d \'{\\\"startTime\\\":$p_start_time,\\\"endTime\\\":$p_end_time,\\\"buildType\\\":\\\"BUILD\\\"}\' http://host:7070/kylin/api/cubes/t_dwd_car_show_city_cube/rebuild"
eval "curl --user ADMIN:KYLIN -X PUT -H 'Content-Type: application/json' -d \'{\\\"startTime\\\":$p_start_time,\\\"endTime\\\":$p_end_time,\\\"buildType\\\":\\\"BUILD\\\"}\' http://host:7070/kylin/api/cubes/t_dwd_car_show_city_cube/rebuild"
{% endautoescape %}
"""
kylinReBuild = BashOperator(
task_id='kylinReBuild',
bash_command=exec_kylinReBuild,
owner='xxxx',
dag=dag)