数客联盟

如何配置KafkaChannel

2017-06-12  本文已影响46人  Woople

完整配置

# Define spooling source
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /tmp/flume
a1.sources.s1.channels = c1

a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = regex_extractor
a1.sources.s1.interceptors.i1.regex =.*\\|(.*)\\|.*
a1.sources.s1.interceptors.i1.serializers = e1
a1.sources.s1.interceptors.i1.serializers.e1.name = key

# Define a kafka channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = host1:6667,host2:6667,host3:6667
a1.channels.c1.kafka.topic = test
a1.channels.c1.parseAsFlumeEvent = false

a1.channels = c1
a1.sources = s1
a1.sinks =k1

例如在/tmp/flume下面放置一个文件,内容a|b|c
那么通过上面的配置,消费一下kafka的test,看一下结果

sh /usr/hdp/2.5.0.0-1245/kafka/bin/kafka-console-consumer.sh --zookeeper host1:2181 --topic test  --property print.key=true

b       a|b|c
上一篇 下一篇

猜你喜欢

热点阅读