flume经验
2018-05-10 本文已影响0人
cotecc
一、source
1. kafka source
常用参数:
auto.offset.reset,只有当使用新的groupid时,earliest才会生效,因为kafka没有该groupid的offset。对于已经存在的groupid,只要有offset记录,所以会从该groupid的offset开始处消费数据
migrateZookeeperOffsets,不使用zookeeper记录offset
group.id,kafka broker会记录每个groupid的offset,如果需要重新消费数据,使用心得groupid,并配合使用auto.offset.reset=earliest
二、channel
1. channel对比
- file —channel,使用local目录,存储中间数据流程,如果flume死掉重启,可以通过该文件进行恢复,不会丢失数据,但是性能较差,需要读写磁盘
- memory — channel,使用内存做中间存储,性能较好,但是flume死掉重启,在内存尚未保存的数据,会丢失
- kafka — channel,通过kafka记录offset,及时flume死掉重启,也不会丢失数据;缺点是不能使用interceptors进行过滤
三、sink
1. hdfs
常用参数:
serializer.appendNewline,不要在每一行数据后加\n,因为有些数据自带有\n
rollInterval,多长时间sink一个文件到hdfs
其他
1. 直接使用kafka作为channel,sink到hdfs
# 使用kafka直接作为channel(flume中断重启时,会从kafka续传数据),没有source
# agent1是flume名称(启动时命名标识)
agent1.channels = kafka-channel
agent1.sources = no-source
agent1.sinks = hdfs-sink1
#channel是kafka
agent1.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
agent1.channels.kafka-channel.kafka.bootstrap.servers =
agent1.channels.kafka-channel.kafka.topic =
agent1.channels.kafka-channel.kafka.consumer.group.id =
agent1.channels.kafka-channel.migrateZookeeperOffsets = false
#agent1.channels.kafka-source1.kafka.consumer.auto.offset.reset = earliest
agent1.channels.kafka-channel.parseAsFlumeEvent = false
#sink数据到hdfs,配置
agent1.sinks.hdfs-sink1.channel = kafka-channel
agent1.sinks.hdfs-sink1.type = hdfs
agent1.sinks.hdfs-sink1.hdfs.path = /home/dt=%Y%m%d
agent1.sinks.hdfs-sink1.hdfs.filePrefix = events-
agent1.sinks.hdfs-sink1.hdfs.useLocalTimeStamp = true
agent1.sinks.hdfs-sink1.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink1.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink1.hdfs.rollInterval = 30
agent1.sinks.hdfs-sink1.hdfs.rollSize = 0
agent1.sinks.hdfs-sink1.hdfs.rollCount = 0
agent1.sinks.hdfs-sink1.hdfs.batchSize = 100
agent1.sinks.hdfs-sink1.hdfs.txnEventMax = 1000
agent1.sinks.hdfs-sink1.hdfs.callTimeout = 60000
agent1.sinks.hdfs-sink1.hdfs.appendTimeout = 60000
agent1.sinks.hdfs-sink1.serializer.appendNewline = false
2. 过滤source数据
# source中数据,匹配正则的才流到下一层
agent1.sources.kafka-source1.interceptors = i1
agent1.sources.kafka-source1.interceptors.i1.type = regex_filter
agent1.sources.kafka-source1.interceptors.i1.regex = glog