使用DataWorks调度DLA循环任务
DataWorks是阿里云上的一款热门产品,可以为用户提供大数据开发调度服务。它支持了Data Lake Analytics(后文简称DLA)以后,DLA用户可以通过它进行定时任务调度,非常方便。本文将主要介绍如何使用DataWorks调度DLA的循环任务。
场景
使用DLA对历史数据按天做清洗。数据清洗的SQL是固定的,只是每次执行的时候需要传入不同的日期。
对于这个场景,我们需要:
- 部署一个赋值节点。该节点负责输出日期值,并作为下游循环节点的输入。
- 部署一个循环节点。该节点包含用来做数据清洗的一个或者一组SQL,其中关于日期取值是一个变量。每次循环输入值由赋值节点提供。
DataWorks操作
步骤一:新建业务流程和节点
登录DataWorks的控制台,并创建一个业务流程或使用原有的业务流程。
在新建的业务流程下,创建一个赋值节点和一个循环节点。
步骤二:配置赋值节点
打开节点“日期集合”的编辑页面。这里我们选择SHELL语言,将要执行的日期值写在一个数组里。
打开节点“日期集合”的调度配置页面。
在这里需要给赋值节点设置一个上游节点,这里可以设置为当前工作空间的root。比如我的工作空间名字叫jinluo_poc,则该节点为jinluo_poc_root。
步骤三:配置循环节点
双击循环节点进入编辑页面。可以看到三个节点,分别是start, sql和end。这里我们需要新建一个DLA的任务节点,并把sql替换为一个DLA的任务节点。
在调度配置页面设置依赖关系和节点上下文。上游节点设置为赋值节点“日期集合”,本节点的输入为赋值节点的输出。
设置DLA_SQL节点
选择一个DLA的数据源,并填写SQL。
这里面的pure_date的值是从赋值节点读入的。每次读取赋值节点的输出结果数组中的一个值。写法是固定的,如下所示。
h.`pure_date`=${dag.input[${dag.offset}]}
设置end节点
该节点的作用是控制循环的结束。
end节点的结束条件:是把dag.loopTimes进行比较,小于则输出True继续循环;不小于则输出False退出循环。dag.input.length变量,标识上下文参数input数组的行数。是系统自动根据节点配置的上下文下发的变量。
if ${dag.loopTimes} < ${dag.input.length}:
print True
else:
print False
在调度配置页面,需要设置上游节点。
设置完成,保存后,可以看到循环节点变更为
步骤四:发布
目前在DataWorks的开发界面暂不支持循环节点的运行,需要提交后在运维中心测试运行。
分别点击 “日期集合”和“数据清洗SQL”页面上的“提交按钮”进行提交。
在提交循环节点时,注意要勾选上所有的节点。
步骤五:运行
进入运维中心页面,在周期任务的列表里面可以看到我们刚刚提交的两个作业。
右键“日期集合” -> 补数据 -> 当前节点及下游节点 可以手动执行该组任务。
提交后可以看到每个节点的运行状态。
参考
- DataWorks官方文档:
http://help.aliyun-inc.com/internaldoc/detail/102311.html?spm=a2c1f.8259796.2.25.24fa96d5a5twQO
本文作者:金络
本文为云栖社区原创内容,未经允许不得转载。