Nifi入门
NiFi基本概念
概述
简单地说,NiFi是为了自动化系统之间的数据流而构建的。虽然术语“数据流”在各种环境中使用,但我们在此处使用它来表示系统之间自动化和管理的信息流。这个问题空间一直存在,因为企业有多个系统,其中一些系统创建数据,一些系统消耗数据。已经讨论并广泛阐述了出现的问题和解决方案模式。企业集成模式 中提供了一个全面且易于使用的表单。
NiFi的诞生,要致力于解决的问题:
- 因为网络故障、磁盘故障、软件崩溃、人们犯错导致的系统错误。
- 数据读写超出了自身系统的处理能力。
- 获取的数据不具有规范性。
- 数据结构的优先级变化很快,启用新流和更改现有流的速度必须非常快。
- 数据结构化管理的可移植性与不同数据格式之间的依赖性。
核心概念
NiFi的基本设计概念与基于流程的编程的主要思想密切相关。以下是一些主要的NiFi概念以及它们如何映射到FBP:
NiFi 术语 | FBP 术语 | 描述 |
---|---|---|
FlowFile | Information Packet | FlowFile表示在系统中移动的每个对象,对于每个对象,NiFi跟踪键/值对属性字符串的映射及其相关的零个或多个字节的内容。 |
FlowFile Processor | Black Box | 处理器实际执行这项工作。在企业集成模式中,处理器正在系统之间进行数据路由、转换或中介的某些组合。处理器可以访问给定流文件及其内容流的属性。处理器可以对给定工作单元中的零或多个流文件进行操作,并提交该工作或回滚该工作。 |
Connection | Bounded Buffer | Connections提供处理器之间的实际链接。它们充当队列并允许各种进程以不同的速率进行交互。这些队列可以动态优先化,并且可以在负载上具有上限,从而实现反压。 |
Flow Controller | Scheduler | 流控制器维护流程如何连接和管理所有流程使用的线程及其分配的知识。Flow Controller充当促进处理器之间FlowFiles交换的代理。 |
Process Group | subnet | Process Group 是一组特定的进程及其连接,可以通过输入端口接收数据并通过输出端口发送数据。以这种方式,Process Group允许仅通过组合其他组件来创建全新组件。 |
NiFi架构原理
image.pngNiFi在主机操作系统上的JVM内执行。JVM上NiFi的主要组件如下:
- 1.Web Server
Web服务器的目的是托管NiFi基于HTTP的命令和控制API。 - 2.Flow Controller
流量控制器是操作的大脑。它为扩展程序提供运行的线程,并管理扩展程序何时接收要执行的资源的计划。 - 3.Extensions
在其他文献中描述了各种类型的NiFi扩展。这里的关键点是扩展在JVM中运行和执行。 - 4.FlowFile Repository
FlowFile存储库是NiFi跟踪其对流中当前活动的给定FlowFile的了解状态的地方。存储库的实现是可插入的。默认方法是位于指定磁盘分区上的持久性预写日志。 - 5.Content Repository
内容存储库是给定FlowFile的实际内容字节。存储库的实现是可插入的。默认方法是一种相当简单的机制,它将数据块存储在文件系统中。可以指定多个文件系统存储位置,以便获得不同的物理分区以减少任何单个卷上的争用。 - 6.Provenance Repository
Provenance Repository是存储所有出处事件数据的地方。存储库构造是可插入的,默认实现是使用一个或多个物理磁盘卷。在每个位置内,事件数据被索引并可搜索。
NiFi运行在集群
image.png从NiFi 1.0版本开始,采用了Zero-Master Clustering范例。NiFi群集中的每个节点对数据执行相同的任务,但每个节点都在不同的数据集上运行。
Apache ZooKeeper选择单个节点作为集群协调器,ZooKeeper自动处理故障转移。
所有群集节点都会向群集协调器报告心跳和状态信息。群集协调器负责断开和连接节点。此外,每个群集都有一个主节点,也由ZooKeeper选举。作为DataFlow管理器,您可以通过任何节点的用户界面(UI)与NiFi群集进行交互。您所做的任何更改都将复制到群集中的所有节点,从而允许多个入口点。
1.对于IO
可以预期的吞吐量或延迟会有很大差异,具体取决于系统的配置方式。鉴于大多数主要NiFi子系统都有可插拔的方法,性能取决于实施。
但是,对于具体且广泛适用的内容,请考虑开箱即用的默认实现。
这些都是持久的保证交付,并使用本地磁盘这样做。因此保守一点,假设典型服务器中的适度磁盘或RAID卷大约每秒50 MB读/写速率。
然后,对于大类数据流的NiFi应该能够有效地达到每秒100 MB或更高的吞吐量。这是因为预期每个物理分区和添加到NiFi的内容存储库都会出现线性增长。
这将在FlowFile存储库和originance存储库的某个点上出现瓶颈。我们计划提供一个基准测试和性能测试模板,以包含在构建中,允许用户轻松测试他们的系统并确定瓶颈在哪里,以及他们可能成为一个因素。此模板还应使系统管理员可以轻松进行更改并验证其影响。
2.对于CPU
流控制器充当引擎,指示特定处理器何时被赋予执行线程。编写处理器以在执行任务后立即返回线程。可以为Flow Controller提供一个配置值,指示它维护的各个线程池的可用线程。
理想的线程数取决于主机系统资源的核心数量,系统是否正在运行其他服务,以及流程中处理的性质。对于典型的IO大流量,可以使许多线程可用。
3.对于RAM
NiFi存在于JVM中,因此仅限于JVM提供的内存空间。JVM垃圾收集成为限制总实际堆大小以及优化应用程序运行时间的一个非常重要的因素。定期阅读相同内容时,NiFi作业可能是I / O密集型的。配置足够大的磁盘以优化性能。
NiFi安装
NiFi安装地址
1.NiFi官网地址
http://nifi.apache.org/
2.文档查看地址
http://nifi.apache.org/docs.html
3.下载地址
http://nifi.apache.org/download.html
安装NiFi
NiFi安装
(1)把nifi-1.8.0-bin.tar.gz上传到linux的/opt/software目录下
(2)解压nifi-1.8.0-bin.tar.gz到/opt/module/目录下面
[atguigu@hadoop102 software]$ tar -zxvf nifi-1.8.0-bin.tar.gz -C /opt/module/
(3)修改apache-NiFi-sources-0.8.4的名称为NiFi
[atguigu@hadoop102 module]$ mv nifi-1.8.0/ nifi
NiFi核心配置
(1)编辑/etc/security/limits.conf来增加NiFi打开大量的文件数量和线程数量的限制。
[atguigu@hadoop102 module]$ vim /etc/security/limits.conf
* hard nofile 50000
* soft nofile 50000
* hard nproc 10000
* soft nproc 10000
[atguigu@hadoop102 module]$ vim /etc/security/limits.d/90-nproc.conf
* soft nproc 10000
(2)增加可用的TCP套接字端口数
[atguigu@hadoop102 module]$ sudo sysctl -w net.ipv4.ip_local_port_range="10000 65000"
[atguigu@hadoop102 module]$ vim /etc/sysctl.conf
vm.swappiness = 0
(3)修改默认端口(可选)
[atguigu@hadoop102 module]$ vim /opt/module/nifi/conf/nifi.properties
image.png
提示:更多属性配置可以参考如下:http://nifi.apache.org/docs.html
启动
(1)NiFi后台启动/关闭命令
[atguigu@hadoop102 nifi]$ bin/nifi.sh start
[atguigu@hadoop102 nifi]$ bin/nifi.sh stop
[atguigu@hadoop102 nifi]$ bin/nifi.sh status
(2)NiFi前台启动/关闭命令
[atguigu@hadoop102 nifi]$ bin/nifi.sh run #Ctrl+c 关闭
(3)web访问
安装NiFi服务作为系统服务(可选)
(1)安装系统服务。
[atguigu@hadoop102 nifi]$ bin/nifi.sh install
(2)服务常用启动/关闭命令
[atguigu@hadoop102 nifi]$ sudo service nifi start
[atguigu@hadoop102 nifi]$ sudo service nifi stop
[atguigu@hadoop102 nifi]$ sudo service nifi status
NiFi 的使用
Web页面简介
- 1.NiFi登陆后界面
- 2.NiFi登陆界面解读
- 3.全局菜单
- 4.NiFi登陆界面解读
我们现在可以通过在画布中添加Processor来开始创建数据流。要执行此操作,请将处理器图标(
image.png
)从屏幕左上方拖动到画布中间(图纸类背景)并将其放在那里。这将为我们提供一个对话框,允许我们选择要添加的处理器:
image.png提示:各个处理器的用途及配置在官网上都有介绍,大约提供了近300个常用处理器。包含但不限于:数据格式转换、数据采集、数据(local/kafka/solr/hdfs/hbase/mysql/hive/http等)的读写等功能,使用方便,如果不能满足需求,还可以自定义处理器。
5.配置处理器(以GetFile为例)
image.png image.png image.png image.png提示:详细properties配置可参考官网:http://nifi.apache.org/docs.html
image.png5.连接处理器(将文件转换为csv格式)
image.png案例一
添加ExecuteSQL
(1)添加ExecuteSQL到面板
image.png(2)配置ExecuteSQL
image.png(3)点击 -> 配置Database Connection Pooling Service
image.png注意:一定要将state改为Enable
image.png image.png添加ConverrtAvroToJson
添加ConverrtAvroToJson到面板,不用修改任何属性配置。
添加PutFile
(1)添加PutFile到面板
image.png连接处理器
(1)拖动箭头指向下一层,并勾选success。
image.png image.png image.png(2)启动任务,选择对应处理器,点击启动按钮
image.png案例二
需求:导出Hive数据转换为csv并保存到HDFS
添加SelectHiveQL
(1)添加SelectHiveQL到面板
image.png(2)配置数据仓库连接池
image.png(3)启动数据仓库连接池
image.png注意:此方案借助hiveserver2进行连接,因此要手动开启和hiveserver2服务。
(4)添加PutHDFS处理器
image.png(5)启动
image.png(6)查看HDFS生成的文件
image.png常见错误
1.每一个开始处理器都要有指定任务失败时的处理方式
image.png image.png2.每一个结束处理器都要有指定任务失败/成功的处理方式
image.png image.png