flume经验

2018-05-10  本文已影响0人  cotecc

一、source

1. kafka source

常用参数:

二、channel

1. channel对比

三、sink

1. hdfs

常用参数:

其他

1. 直接使用kafka作为channel,sink到hdfs

# 使用kafka直接作为channel(flume中断重启时,会从kafka续传数据),没有source
# agent1是flume名称(启动时命名标识)

agent1.channels = kafka-channel
agent1.sources = no-source
agent1.sinks = hdfs-sink1

#channel是kafka
agent1.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.kafka-channel.kafka.bootstrap.servers = 
agent1.channels.kafka-channel.kafka.topic = 
agent1.channels.kafka-channel.kafka.consumer.group.id = 
agent1.channels.kafka-channel.migrateZookeeperOffsets = false
#agent1.channels.kafka-source1.kafka.consumer.auto.offset.reset = earliest
agent1.channels.kafka-channel.parseAsFlumeEvent = false


#sink数据到hdfs,配置
agent1.sinks.hdfs-sink1.channel = kafka-channel
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = /home/dt=%Y%m%d
agent1.sinks.hdfs-sink1.hdfs.filePrefix = events-
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink1.hdfs.rollInterval = 30
agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 100
agent1.sinks.hdfs-sink1.hdfs.txnEventMax = 1000
agent1.sinks.hdfs-sink1.hdfs.callTimeout = 60000
agent1.sinks.hdfs-sink1.hdfs.appendTimeout = 60000
agent1.sinks.hdfs-sink1.serializer.appendNewline = false

2. 过滤source数据

# source中数据,匹配正则的才流到下一层
agent1.sources.kafka-source1.interceptors = i1
agent1.sources.kafka-source1.interceptors.i1.type = regex_filter
agent1.sources.kafka-source1.interceptors.i1.regex = glog
上一篇下一篇

猜你喜欢

热点阅读