大数据

flume与kafka集成遇到的问题与解决思路

2016-07-15  本文已影响5612人  xcrossed

0x00 背景知识

基本上想去用flume的同学都知道点flume的用途了。flume是一个分布式,可靠的,易用的,可以将不同源的日志进行,收集,汇总,或者存储的中间件。

0x01 使用场景

0x02 flume的使用

flume支持三种不同的agent来发送数据,我这里比较符合的是spooldir这种方式.

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = flume0
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200

# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1

0x03 遇到的问题

SpoolDir Source throws IllegalStateException: File has changed size since being read

运行后报上面的错,查了资料说是在flume读这个文件时,该文件不能被继续写入。改了数据生成逻辑,在没有写完成前,以.tmp结尾,写完后,再重命名去掉tmp后缀。涉及到的配置也比较简单

a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$

发送kafka partition上面的数据不均匀,每次发送时,只往一个partition上面发,并没有同时往多个partition上面发。

查了资料,说是发送消息时不指定key将会随机发,但事实上,并没有。
这时,自己用python带的kafka python库直接发送测试,数据是均匀的。说明kafka集群是没问题的。这时候问题出在 kafka sink端。

事情到了这里,似乎需要正面刚这个问题了。
去flume官网下载源文件

public static final String KEY_HDR = "key";

eventKey = headers.get(KEY_HDR);
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
          (eventTopic, eventKey, eventBody);
messageList.add(data);

说明key是从event中拿到的,我们只需要在event中构造一个包含key为 key 的header 键值对就能达到目的。

事情到了这里,似乎只要搞定event中加key就可以搞定了。

查询官方文档,发现还有一个拦截器 Interceptor 的玩意儿。

flume默认提供了一些拦截器

我们需要一个能配置headerName的拦截器,找了一下,只有uuid拦截器符合要求。

a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key

加上上面二行,重启flume

/usr/local/flume/bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console

查看kafka-manager中的partition中message的分布,果然妥妥的均匀了。

完整的配置如下:

a1.sources = flume0
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.flume0.type = spooldir
a1.sources.flume0.spoolDir = /data/appdata/download
a1.sources.flume0.fileHeader = false
a1.sources.flume0.ignorePattern = ^(.)*\\.tmp$
a1.sources.flume0.interceptors = i1
a1.sources.flume0.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
a1.sources.flume0.interceptors.i1.headerName = key
a1.sources.flume0.interceptors.i1.preserveExisting = false

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = shopping
a1.sinks.k1.brokerList = 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200
a1.channels.c1.transactionCapacity = 200

# Bind the source and sink to the channel
a1.sources.flume0.channels = c1
a1.sinks.k1.channel = c1

使用的版本为flume 1.6

其实,真正没有随机的原因本文并没有直接去找到,只是另辟蹊径解决了问题。

上一篇下一篇

猜你喜欢

热点阅读