数据采集之Flume+Kafka
2017-12-05 本文已影响243人
吃橘子的冬天
Flume简介
1. Flume特点
flume是收集日志的开源软件解决方案之一,相对于其他同类软件他具有高可用的,高可靠的,分布式等特性。flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source
2. Flume核心概念
- Agent 使用JVM运行Flume 每台机器运行一个agent , 但是可以在一个agent中包含多个sources和sinks
- Client 生产数据 , 运行在一个独立的线程
- Source 从Client收集数据 , 传递给Channel
- Sink 从Channel收集数据 , 运行在一个独立线程
- Channel 连接 sources 和 sinks ,这个有点像一个队列
- Events 可以是日志记录、 avro 对象等
Flume快速开发
1. 安装
- yum 方式下载安装 :
[mis-ecif@hadoop10-4-0-226 ~]$ yum install flume
- 或手动下载安装,下载地址:http://archive.apache.org/dist/flume/1.6.0/apache-flume-1.6.0-bin.tar.gz
下载完成后,上传并解压到/usr/local/目录下(目录可自行选择)
解压文件,若打印如下信息,解压缩报错 ,可能是包没下载完全,重新下载重试即可
[mis-ecif@hadoop10-4-0-226 ~]$ tar -zxvf apache-flume-1.6.0-bin.tar.gz
gzip: stdin: unexpected end of file
tar: Unexpected EOF in archive
tar: Unexpected EOF in archive
tar: Error is not recoverable: exiting now
若解压成功,可检测安装是否成功:/usr/local/flume/bin/flume-ng version
打印以下信息,则表示安装成功了
[mis-ecif@hadoop10-4-0-226 ~]$ flume-ng version
Flume 1.6.0-transwarp-tdh480
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Fri Apr 7 07:52:45 UTC 2017
From source with checksum 4031fa0e0507f30090f954451ab3a164
若打印以下信息,可能是因为安装了hbase,将Hbase的hbase-env.sh文件中HBASE_CLASS注释掉即可
[mis-ecif@hadoop10-4-0-226 ~]$ flume-ng version
Could not find or load main class org.apache.flume.tools.GetJavaProperty #加载不了该类
Flume 1.6.0-transwarp-tdh480
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Fri Apr 7 07:52:45 UTC 2017
From source with checksum 4031fa0e0507f30090f954451ab3a164
2. 开发
- 更改Flume配置文件
cd /usr/local/flume/conf/
cp flume-env.sh.template flume-env.sh
vi flume-env.sh # 修改flume-env.sh中JAVA_HOME变量的值
- 创建Flume启动使用到的配置文件 exec_tail.conf
[root@hadoop10-1-0-144 conf]# vi /local/flume/conf/exec_tail.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/mis-ecif/flume_logs/phoneinfo-20171204.log
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3. 测试
- 启动Flume
flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/spoon_kafka.conf -n a1 -Dflume.root.logger=INFO,console
- 往Flume监控日志中添加数据
echo 'phoneinfo||223.104.7.66||OPPO R9sk||6.0.1||天津市||2017-07-25 06:53:23||中国移动||yingyongbao
' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log
echo 'phoneinfo||101.38.64.172||iPhone 6 Plus||10.3.2||北京市||2017-07-25 07:11:40||中国联通' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log
控制台若有数据打印,则表示测试成功
4. 更改配置,与kafka集成
- 将消息传给 kafka
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flume_demo
#a1.sinks.k1.brokerList = 10.1.0.141:9092,10.1.0.142:9092,10.1.0.143:9092,10.1.0.144:9092
#Kafka集群Broker列表
a1.sinks.k1.brokerList = 10.1.0.144:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
- 将消息缓存在本地文件系统中 --建议将消息缓存在本地文件系统
# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpoint = /mnt/disk1/flume/checkpoint #检查点文件存储路径
a1.channels.c1.dataDirs = /mnt/disk1/flume/data #消息数据存储路径
- 创建kafka topic
./kafka-topics.sh --zookeeper 10.1.0.144:2181 --create --topic flume_demo --partition 3 --replication-factor 1
- 查看topic
./kafka-topics.sh --list --zookeeper 10.1.0.144:2181
- 启动kafka consumer,接收flume消息
./kafka-console-consumer.sh --topic flume_demo --bootstrap-server 10.1.0.144:9092
- 重启Flume
flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/spoon_kafka.conf -n a1 -Dflume.root.logger=INFO,console
- 往flume监控文件中添加日志
echo 'phoneinfo||116.227.248.47||HUAWEI MT7-CL00||6.0||||2017-07-25 07:21:36||中国移动||yingyongbao' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log
查看kafka consumer窗口,若能够正常接收消息,则表示集成kafka成功。