我爱编程

数据采集之Flume+Kafka

2017-12-05  本文已影响243人  吃橘子的冬天

Flume简介

1. Flume特点

flume是收集日志的开源软件解决方案之一,相对于其他同类软件他具有高可用的,高可靠的,分布式等特性。flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。你可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source

2. Flume核心概念

  • Agent 使用JVM运行Flume 每台机器运行一个agent , 但是可以在一个agent中包含多个sources和sinks
  • Client 生产数据 , 运行在一个独立的线程
  • Source 从Client收集数据 , 传递给Channel
  • Sink 从Channel收集数据 , 运行在一个独立线程
  • Channel 连接 sources 和 sinks ,这个有点像一个队列
  • Events 可以是日志记录、 avro 对象等

Flume快速开发

1. 安装

[mis-ecif@hadoop10-4-0-226 ~]$ yum install flume 

解压文件,若打印如下信息,解压缩报错 ,可能是包没下载完全,重新下载重试即可

[mis-ecif@hadoop10-4-0-226 ~]$ tar -zxvf apache-flume-1.6.0-bin.tar.gz
gzip: stdin: unexpected end of file  
tar: Unexpected EOF in archive  
tar: Unexpected EOF in archive
tar: Error is not recoverable: exiting now

若解压成功,可检测安装是否成功:/usr/local/flume/bin/flume-ng version
打印以下信息,则表示安装成功了

[mis-ecif@hadoop10-4-0-226 ~]$ flume-ng version
Flume 1.6.0-transwarp-tdh480
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Fri Apr  7 07:52:45 UTC 2017
From source with checksum 4031fa0e0507f30090f954451ab3a164

若打印以下信息,可能是因为安装了hbase,将Hbase的hbase-env.sh文件中HBASE_CLASS注释掉即可

[mis-ecif@hadoop10-4-0-226 ~]$ flume-ng version
Could not find or load main class org.apache.flume.tools.GetJavaProperty #加载不了该类
Flume 1.6.0-transwarp-tdh480
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: Unknown
Compiled by root on Fri Apr  7 07:52:45 UTC 2017
From source with checksum 4031fa0e0507f30090f954451ab3a164

2. 开发

cd /usr/local/flume/conf/
cp flume-env.sh.template flume-env.sh
vi flume-env.sh # 修改flume-env.sh中JAVA_HOME变量的值
[root@hadoop10-1-0-144 conf]# vi /local/flume/conf/exec_tail.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3. 测试

flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/spoon_kafka.conf -n a1 -Dflume.root.logger=INFO,console
echo 'phoneinfo||223.104.7.66||OPPO R9sk||6.0.1||天津市||2017-07-25 06:53:23||中国移动||yingyongbao
' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

echo 'phoneinfo||101.38.64.172||iPhone 6 Plus||10.3.2||北京市||2017-07-25 07:11:40||中国联通' >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

控制台若有数据打印,则表示测试成功

4. 更改配置,与kafka集成

# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flume_demo
#a1.sinks.k1.brokerList = 10.1.0.141:9092,10.1.0.142:9092,10.1.0.143:9092,10.1.0.144:9092
#Kafka集群Broker列表
a1.sinks.k1.brokerList = 10.1.0.144:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
# Use a channel which buffers events in memory
a1.channels.c1.type = file
a1.channels.c1.checkpoint = /mnt/disk1/flume/checkpoint #检查点文件存储路径
a1.channels.c1.dataDirs = /mnt/disk1/flume/data #消息数据存储路径
./kafka-topics.sh --zookeeper 10.1.0.144:2181 --create --topic flume_demo --partition 3 --replication-factor 1
./kafka-topics.sh  --list --zookeeper 10.1.0.144:2181
./kafka-console-consumer.sh --topic flume_demo  --bootstrap-server 10.1.0.144:9092
flume-ng agent -c /usr/lib/flume/apache-flume-1.6.0-bin/conf -f /usr/lib/flume/apache-flume-1.6.0-bin/conf/spoon_kafka.conf -n a1 -Dflume.root.logger=INFO,console
echo 'phoneinfo||116.227.248.47||HUAWEI MT7-CL00||6.0||||2017-07-25 07:21:36||中国移动||yingyongbao'  >> /opt/mis-ecif/flume_logs/phoneinfo-20171204.log

查看kafka consumer窗口,若能够正常接收消息,则表示集成kafka成功。

上一篇下一篇

猜你喜欢

热点阅读