Flume系列6-Flume的Kafka输出

2021-12-27  本文已影响0人  只是甲

一.需求描述

今天有个需求是需要将hive运行的日志传输到Kafka,然后Flink消费Kafka数据,处理后输出给后端的同事。

二. 实现步骤

2.1 Kafka创建主题

创建一个三个副本三个分区的主题flume_to_kafka用来接收数据

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/
./kafka-topics.sh --create --zookeeper hp2:2181,hp3:2181,hp4:2181 --replication-factor 3 --partitions 3 --topic flume_to_kafka

2.2 Flume配置文件

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567
vi conf/job/flume-file-kafka.conf

添加如下内容:

# 01 define source,channel,sink name
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 02 define source
a1.sources.r1.type = exec
a1.sources.r1.command=tail -F /tmp/root/hive.log

# 03 define sink
a1.sinks.k1.type = logger 
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = flume_to_kafka
a1.sinks.k1.brokerList = hp2:9092,hp3:9092,hp4:9092
a1.sinks.k1.requiredAcks = -1  
a1.sinks.k1.batchSize = 20 


# 04 define channel
a1.channels.c1.type = memory
# number of events in memory queue 内存队列中的最大event数值
a1.channels.c1.capacity = 1000 
# number of events for 1 commit(每次向memory queuet放入event,取出event的最大值),所以肯定是比内存队列中的event小
a1.channels.c1.transactioncapacity = 100

# 05 bind source,sink to channel
a1.sources.r1.channels = c1
 # 1个source可以有多个channel
a1.sinks.k1.channel = c1     
 # 1个sink只能从1个channel取数据

2.3 启动Flume程序

cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/
bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/job/flume-file-kafka.conf

2.4 查看输出

Kafka查看主题的数据

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server 10.31.1.124:9092 --topic flume_to_kafka
image.png
上一篇下一篇

猜你喜欢

热点阅读