Apache NiFi初体验
-
安装与使用
从官网下载zip包解压,执行./bin/run-nifi.bat即可启动,默认端口8080,打开浏览器http://localhost:8080/nifi即可使用。参考 http://nifi.apache.org/quickstart.html -
保存和导入模板
保存自己的process group,参考 https://community.hortonworks.com/questions/198963/how-to-export-processor-group-from-nifi-to-local-p.html
导入模板,在左边菜单旁边的upload Template,选定文件即可。 -
build项目
下载release-source,从github上拉代码应该也可以,按quick start上面的提示执行mvn clean install。
如果提示maven dependency找不到,是因为nifi用到的很多jar不在maven中央仓库里,需要到apache repo或jcenter里才有。这个时候可以把本地maven的mirror指向阿里仓库(http://maven.aliyun.com/nexus/content/groups/public/)。
build结束后会在nifi-assembly下面生成.zip,解压即可。build过程有点长,一般要10分钟以上,第一次要下载jar可能更慢。
提示:不建议自己build 整个项目,耗时很长,可以从github或官网下载release.zip,解压即可使用。 -
改代码后重新build部署
自己改java代码后,如果改动比较少,可以mvn clean compile后把.class覆盖掉对应nar包里的jar包里相同文件。如果改动大, 找到代码所在bundle,执行mvn clean package会生成nar包,拷贝过去替换即可。
package的时候建议跳过test, -Dmaven.test.skip=true; 如果改了其他包代码,需要先install到本地,否则打出来的nar包里的依赖不是最新的
替换后要重启NiFi -
调试
image.png
NiFi不支持直接在IDEA里运行(或者支持但我没找到方法)。可以在IDEA里开启远程调用。
1、在NiFi运行目录/conf/bootstrap里把remote debug那行取消注释,然后在IDEA里设置一个Remote debug,端口默认8000,指定log文件位置为NiFi/logs/nifi-app.log就可以了。
参考:https://community.hortonworks.com/articles/106931/nifi-debugging-tutorial.html
上面红框里不用写,直接<no module>即可。
-
Process配置
https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#Configuring_a_Processor -
Jolt Transform
NiFi里会对数据做转换,用到了Jolt Transform,语法不清楚,看下面demo。
demo: https://jolt-demo.appspot.com/#mapToList -
性能
参考:https://blog.csdn.net/weixin_39445556/article/details/84339249(原文 https://community.hortonworks.com/articles/7882/hdfnifi-best-practices-for-setting-up-a-high-perfo.html)
开发时建议调大-Xms -Xmx, flow + content + prevenance的存储位置在生产环境最好配置在别的目录。 -
自定义Processor
有时候内置的Processor不能满足我们的要求,看呀自定义Processor。参考 https://community.hortonworks.com/articles/4318/build-custom-nifi-processor.html
1、在nifi-nar-bundles中建子module如nifi-xyz,package是pom
2、在nfi-xyz中建子module,如nifi-xyz-bundle,package是pom
3、在nifi-xyz-bundle中建nifi-xyz-processors模块,package是jar,添加相关依赖,写具体的Processor类,在META-INF/services里建一个名为org.apache.nifi.processor.Processor的文件,里面加上你的Processor类全名。 这个就是SPI思想
4、在nifi-xyz-bundle里建nifi-xyz-nar,package是nar,依赖nifi-xyz-processors
5、在nifi-xyz-bundle里执行mvn clean package即可打出来nar包。
在queue里面可以设置背压,objects threshold和size threshold哪个先达到就会把压力传导到upstream queue,最终达到CaptureChangeMySQL,停止消费binlog。
PutSQL的rollback on false=true,这样flow file就会停留在input queue里面,这样才会让背压起作用。
https://stackoverflow.com/questions/56374764/stop-capturechangemysql-when-putsql-failed/56374948#56374948
NIFI的坑:
- 问题1:表结构发生变化,需要重启nifi,刷掉redis
我们用到了CaptchureMySQLChange和ConvertJsonToSql这2个Processor,CaptchureMySQLChange会把表结构信息缓存到redis,ConvertJsonToSql就可以根据表结构把json转换成sql了,同时ConvertJsonToSql在本地还缓存了表结构,这样在CaptchureMySQLChange接受到DDL的时候,只是把redis清了,而ConvertJsonToSql还是从本地的schemaCache里取表结构,造成转换出来的sql少字段。 - 解决方法:可以在每次表结构变化的时候,重启nifi,刷掉redis。或者CaptchureMySQLChange设置解析DDL,同时用RouteOnAttributeProcessor根据statementType来对DDL和DML进行不同的处理,在DDL的时候把ConvertJsonToSql中的schemaCache清掉。
- 注意:CaptchureMySQLChange里是直接根据sql.startsWith("alter table")来判断是不是DDL的,如果sql是注释开头的,如/*xxx*/这种就会有问题。
-问题2:MergeContentProcessor会有一定的几率不能保证前后sql的顺序。
- 补充:源码中用的HashMap,需要改成LinkedHashMap,这样就能按照日志的消费顺序来合并了
IDEA里运行和调试项目
1.运行NIFI
image.png
把nifi/lib/bootstrap下的jar添加到dependencies.
image.png其中VM options指定conf和log目录在哪.
2.调试NIFI
上面运行了NIFI即使打上断点也不能调试, 需要通过Remote Debug
image.png
未解决的问题
上面在IDEA里调试,如果改了代码,不知道怎么打包让代码生效?
方法:没有方法直接在IDEA里修改代码即生效,因为nifi启动时候会从bootstrap.conf的上级目录下的lib文件夹里解压nar包,获取到.jar组装到classpath, 代码在RunNiFi.start(). 因此改完代码后需要package,拷贝到nifi/lib目录下替换.
可以在IDEA里启动RunNifi和Remote debug, 只是每次改动代码后要重新打nar包放到nifi/lib下面即可.
通过.getClass().getProtectionDomain().getCodeSource()发现加载的是.work/下面的jar,不知道为什么从这里加载? 有时间再看一下.
看上面的解释.