ADF realtime Scenario

2024-01-03  本文已影响0人  山猪打不过家猪

0.项目地址

Azure Data Factory Real Time Scenarios

1. Mapping DataFlow处理错误的数据

文件:department_error_date.csv

1.1 重点

1.2 例子

image.png

2. 动态获取folder的所有文件名称

2.1 实现

  1. 使用getMetadata 获取文件夹的所有childItems


    image.png
  2. ForEach循环childItems
    3.在ForEach循环里获取每个childItem的name(之后就可以用来复制)

3. 增量复制最新的编辑的表/删除60天之前的表

Incrementallly copy new and changed files bases on last modified

3.1重点

copy

3.2例子

pass

5. 使用dataflow修复一列都为字符串的csv文件

5.1 重点

5.2 使用derived修改数据都在一行的表

  1. 读取source,注意不要选择first row as header,之后使用skip line跳过这个表头,就得到了纯净的数据


    image.png

    2.derived column,这里我们添加新的列,内容是substring 字符串


    image.png
  2. 使用select 删除之前错误的列,保留新的2列数据
  3. 设置sink,注意如果只想输出一个文件,选择single partition
    image.png

7.使用dataflow删除重复的行

文件:employee_duplicated.csv

7.1 去重方法一:

1.先根据name和country对employee进行分组group by


image.png
  1. 然后对剩下不是country 和name的列,只取他们的第一列


    image.png

    3.结果


    image.png

7.1 去重方法二:使用sha2创建finger print

8. 使用dataflow合并一个无ID的表

文件:employee_key0,employee_key1
思路:首先计算出来原来有Id的文件的最大文件id
1.derivedColumn:在有ID的文件中,创建一个虚拟列,dummpy
2.groupby dummpy然后计算出最大的max id
3.join:然后将计算好的于没有ID的key0文件cross join ,条件是1==1
4.surrogate key:然后添加一个自增的surrogate key
5.derivedColumn:添加一个新的id列,用最大id+surrogate key,就得到了新的id
6.select:选择需要的列
7.new branch: key1添加新的分支
8.union: 新表和Key1进行union
9.sink:完成

9.滚动 和 running total

使用window

10. log ADF pipeline

image.png

需要两个data flow,判断今天是否存在了log,有就append,无就create

  1. clone上面的data flow1

11. 慢修改

读取新的数据,如果有更新且新增了数据,更改原表将新数据合并 alter row,目标数据库是sql.

12. 获取文件夹的所有文件数

注意:.childItmes返回的是array.length可以读取array的长度,返回 是一个object,需要转为string


image.png

13. 在copy过程中,添加新的列

直接在设置里,选择


image.png

14-15. 使用concat和join将array转为string

16. 验证文件得格式

image.png

17. 慢修改2

image.png

每次有新的数据,将旧的数据的isActive改为0,新的为1,并且新表的添加一个Surrkey来区分

19.执行带参数和返回值的procedure

直接使用pipleline里的procedure是无法执行带返回值的,需要使用loop up直接


image.png

20. 获取blob里最新的文件get latest file in blob

21. 动态的Mapping

就是将mapping的json,写去表里,或者文件中,使用getmeta读出来


image.png

22. 将多行合并成一行

image.png

23. 当报错时候,发送提醒 send email when pipeline fails

没看

24. 一行拆成多行

image.png

28. 时区转换

用转为时间戳然后进行加减法

29. 执行一个活动,如果任何一个分支发生了错误

Run an activity if any one of set of activities fail

30. Get Error message of Failed activities in Pipeline

36. 在look up中执行创建表的sql query

create table employee (id int)
select 1 as abc

37. 查看某天是否在某周之内

39. 查看数据更改的列

使用sha2和exists来判断

上一篇 下一篇

猜你喜欢

热点阅读