flume笔记

2019-12-24  本文已影响0人  霹雳解锋镝

Flume是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。

一、主要组件:
1、Agent是一个JVM进程,它以事件的形式将数据从源头送至目的,是Flume数据传输的基本单元。Agent主要有3个部分组成,Source、Channel、Sink
2、Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。
3、Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
4、Flume自带两种Channel:Memory Channel和File Channel。
    Memory Channel是内存中的队列。在不需要关心数据丢失的情景下适用。
    File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
5、Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
    Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。
    Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。
6、Event传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。
二、flume使用案例
1、监控端口数据
    (1) 查询端口是否被占用
        netstat -tunlp | grep 44444
        
    (2) vim flume-telnet-logger.conf
        # Name the components on this agent
        # 输入源 目的地 缓冲区
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # Describe/configure the source
        #输入源类型端口型 主机 端口
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 44444

        # Describe the sink
        #输入目的地是控制台logger类型
        a1.sinks.k1.type = logger

        # Use a channel which buffers events in memory
        #缓冲区类型内存型 1000个event 传输到100时提交事务
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
    (3) 先开启flume监听端口
        bin/flume-ng agent --conf conf/ --name a1 --conf-file demo/flume-telnet-logger.conf -Dflume.root.logger=INFO,console
    (4) 使用telnet工具向本机的44444端口发送内容
        telnet localhost 44444

2、实时读取本地文件到HDFS案例
    (1) flume-file-hdfs.conf
        # Name the components on this agent
        a2.sources = r2
        a2.sinks = k2
        a2.channels = c2

        # Describe/configure the source
        # 类型为exec可执行命令
        a2.sources.r2.type = exec
        a2.sources.r2.command = tail -F /tmp/hadoop/hive.log
        a2.sources.r2.shell = /bin/bash -c

        # Describe the sink
        a2.sinks.k2.type = hdfs
        a2.sinks.k2.hdfs.path = hdfs://hadoop130:9000/flume/%Y%m%d/%H
        #上传文件的前缀
        a2.sinks.k2.hdfs.filePrefix = logs-
        #是否按照时间滚动文件夹
        a2.sinks.k2.hdfs.round = true
        #多少时间单位创建一个新的文件夹
        a2.sinks.k2.hdfs.roundValue = 1
        #重新定义时间单位
        a2.sinks.k2.hdfs.roundUnit = hour
        #是否使用本地时间戳
        a2.sinks.k2.hdfs.useLocalTimeStamp = true
        #积攒多少个Event才flush到HDFS一次
        a2.sinks.k2.hdfs.batchSize = 1000
        #设置文件类型,可支持压缩
        a2.sinks.k2.hdfs.fileType = DataStream
        #多久生成一个新的文件
        a2.sinks.k2.hdfs.rollInterval = 600
        #设置每个文件的滚动大小
        a2.sinks.k2.hdfs.rollSize = 134217700
        #文件的滚动与Event数量无关
        a2.sinks.k2.hdfs.rollCount = 0
        #最小冗余数
        a2.sinks.k2.hdfs.minBlockReplicas = 1

        # Use a channel which buffers events in memory
        a2.channels.c2.type = memory
        a2.channels.c2.capacity = 1000
        a2.channels.c2.transactionCapacity = 100

        # Bind the source and sink to the channel
        a2.sources.r2.channels = c2
        a2.sinks.k2.channel = c2
    (2) 执行监控配置
        bin/flume-ng agent --conf conf/ --name a2 --conf-file demo/flume-file-hdfs.conf
3、实时读取目录文件到HDFS案例
    (1) 创建配置文件flume-dir-hdfs.conf
        # Name the components on this agent
        a3.sources = r3
        a3.sinks = k3
        a3.channels = c3

        # Describe/configure the source
        # 定义类型为目录  
        a3.sources.r3.type = spooldir
        a3.sources.r3.spoolDir = /tmp/hadoop/upload
        # 文件上传完后缀
        a3.sources.r3.fileSuffix = .COMPLETED
        # 是否有文件头 
        a3.sources.r3.fileHeader = true
        #忽略所有以.tmp结尾的文件,不上传
        a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

        # Describe the sink
        a3.sinks.k3.type = hdfs
        a3.sinks.k3.hdfs.path = hdfs://hadoop130:9000/flume/upload/%Y%m%d/%H
        #上传文件的前缀
        a3.sinks.k3.hdfs.filePrefix = upload-
        #是否按照时间滚动文件夹
        a3.sinks.k3.hdfs.round = true
        #多少时间单位创建一个新的文件夹
        a3.sinks.k3.hdfs.roundValue = 1
        #重新定义时间单位
        a3.sinks.k3.hdfs.roundUnit = hour
        #是否使用本地时间戳
        a3.sinks.k3.hdfs.useLocalTimeStamp = true
        #积攒多少个Event才flush到HDFS一次
        a3.sinks.k3.hdfs.batchSize = 100
        #设置文件类型,可支持压缩
        a3.sinks.k3.hdfs.fileType = DataStream
        #多久生成一个新的文件
        a3.sinks.k3.hdfs.rollInterval = 600
        #设置每个文件的滚动大小大概是128M
        a3.sinks.k3.hdfs.rollSize = 134217700
        #文件的滚动与Event数量无关
        a3.sinks.k3.hdfs.rollCount = 0
        #最小冗余数
        a3.sinks.k3.hdfs.minBlockReplicas = 1

        # Use a channel which buffers events in memory
        a3.channels.c3.type = memory
        a3.channels.c3.capacity = 1000
        a3.channels.c3.transactionCapacity = 100

        # Bind the source and sink to the channel
        a3.sources.r3.channels = c3
        a3.sinks.k3.channel = c3
    (2) 执行监控配置  
        bin/flume-ng agent --conf conf/ --name a3 --conf-file demo/flume-dir-hdfs.conf
4、单数据源多出口
    配置1个接收日志文件的source和两个channel、两个sink,分别输送给flume-flume-hdfs和flume-flume-dir。
    (1)创建flume-file-flume.conf
        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1 k2
        a1.channels = c1 c2
        # 将数据流复制给多个channel
        a1.sources.r1.selector.type = replicating

        # Describe/configure the source
        a1.sources.r1.type = exec
        a1.sources.r1.command = tail -F /tmp/hadoop/hive.log
        a1.sources.r1.shell = /bin/bash -c

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = hadoop132
        a1.sinks.k1.port = 4141

        a1.sinks.k2.type = avro
        a1.sinks.k2.hostname = hadoop132
        a1.sinks.k2.port = 4142

        # Describe the channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        a1.channels.c2.type = memory
        a1.channels.c2.capacity = 1000
        a1.channels.c2.transactionCapacity = 100

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1 c2
        a1.sinks.k1.channel = c1
        a1.sinks.k2.channel = c2
        
        注:Avro是由Hadoop创始人Doug Cutting创建的一种语言无关的数据序列化和RPC框架。
        注:RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。  
    (2) 创建flume-flume-hdfs.conf
        # Name the components on this agent
        a2.sources = r1
        a2.sinks = k1
        a2.channels = c1

        # Describe/configure the source
        a2.sources.r1.type = avro
        a2.sources.r1.bind = hadoop132
        a2.sources.r1.port = 4141

        # Describe the sink
        a2.sinks.k1.type = hdfs
        a2.sinks.k1.hdfs.path = hdfs://hadoop130:9000/flume2/%Y%m%d/%H
        #上传文件的前缀
        a2.sinks.k1.hdfs.filePrefix = flume2-
        #是否按照时间滚动文件夹
        a2.sinks.k1.hdfs.round = true
        #多少时间单位创建一个新的文件夹
        a2.sinks.k1.hdfs.roundValue = 1
        #重新定义时间单位
        a2.sinks.k1.hdfs.roundUnit = hour
        #是否使用本地时间戳
        a2.sinks.k1.hdfs.useLocalTimeStamp = true
        #积攒多少个Event才flush到HDFS一次
        a2.sinks.k1.hdfs.batchSize = 100
        #设置文件类型,可支持压缩
        a2.sinks.k1.hdfs.fileType = DataStream
        #多久生成一个新的文件
        a2.sinks.k1.hdfs.rollInterval = 600
        #设置每个文件的滚动大小大概是128M
        a2.sinks.k1.hdfs.rollSize = 134217700
        #文件的滚动与Event数量无关
        a2.sinks.k1.hdfs.rollCount = 0
        #最小冗余数
        a2.sinks.k1.hdfs.minBlockReplicas = 1

        # Describe the channel
        a2.channels.c1.type = memory
        a2.channels.c1.capacity = 1000
        a2.channels.c1.transactionCapacity = 100

        # Bind the source and sink to the channel
        a2.sources.r1.channels = c1
        a2.sinks.k1.channel = c1
        
    (3) 创建flume-flume-dir.conf  
        # Name the components on this agent
        a3.sources = r1
        a3.sinks = k1
        a3.channels = c2

        # Describe/configure the source
        a3.sources.r1.type = avro
        a3.sources.r1.bind = hadoop132
        a3.sources.r1.port = 4142

        # Describe the sink
        a3.sinks.k1.type = file_roll
        a3.sinks.k1.sink.directory = /demo/flume3

        # Describe the channel
        a3.channels.c2.type = memory
        a3.channels.c2.capacity = 1000
        a3.channels.c2.transactionCapacity = 100

        # Bind the source and sink to the channel
        a3.sources.r1.channels = c2
        a3.sinks.k1.channel = c2
    (4) 执行配置文件
        bin/flume-ng agent --conf conf/ --name a3 --conf-file demo/group1/flume-flume-dir.conf
        bin/flume-ng agent --conf conf/ --name a2 --conf-file demo/group1/flume-flume-hdfs.conf
        bin/flume-ng agent --conf conf/ --name a1 --conf-file demo/group1/flume-file-flume.conf
        
5、单数据源多出口(负载均衡)
    (1) 创建flume-netcat-flume.conf
        # Name the components on this agent
        a1.sources = r1
        a1.channels = c1
        a1.sinkgroups = g1
        a1.sinks = k1 k2

        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 44444

        #负载均衡 轮询模式
        a1.sinkgroups.g1.processor.type = load_balance
        a1.sinkgroups.g1.processor.backoff = true
        a1.sinkgroups.g1.processor.selector = round_robin
        a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = hadoop132
        a1.sinks.k1.port = 4141

        a1.sinks.k2.type = avro
        a1.sinks.k2.hostname = hadoop132
        a1.sinks.k2.port = 4142

        # Describe the channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinkgroups.g1.sinks = k1 k2
        a1.sinks.k1.channel = c1
        a1.sinks.k2.channel = c1
    (2) 创建flume-flume1.conf
        # Name the components on this agent
        a2.sources = r1
        a2.sinks = k1
        a2.channels = c1

        # Describe/configure the source
        a2.sources.r1.type = avro
        a2.sources.r1.bind = hadoop130
        a2.sources.r1.port = 4141

        # Describe the sink
        a2.sinks.k1.type = logger

        # Describe the channel
        a2.channels.c1.type = memory
        a2.channels.c1.capacity = 1000
        a2.channels.c1.transactionCapacity = 100

        # Bind the source and sink to the channel
        a2.sources.r1.channels = c1
        a2.sinks.k1.channel = c1
    (3) 创建flume-flume2.conf
        # Name the components on this agent
        a3.sources = r1
        a3.sinks = k1
        a3.channels = c2

        # Describe/configure the source
        a3.sources.r1.type = avro
        a3.sources.r1.bind = hadoop102
        a3.sources.r1.port = 4142

        # Describe the sink
        a3.sinks.k1.type = logger

        # Describe the channel
        a3.channels.c2.type = memory
        a3.channels.c2.capacity = 1000
        a3.channels.c2.transactionCapacity = 100

        # Bind the source and sink to the channel
        a3.sources.r1.channels = c2
        a3.sinks.k1.channel = c2
    (4) 执行配置文件
        bin/flume-ng agent --conf conf/ --name a3 --conf-file demo/group2/flume-flume2.conf -Dflume.root.logger=INFO,console
        bin/flume-ng agent --conf conf/ --name a2 --conf-file demo/group2/flume-flume1.conf -Dflume.root.logger=INFO,console
        bin/flume-ng agent --conf conf/ --name a1 --conf-file demo/group2/flume-netcat-flume.conf
    (5) 使用telnet工具向本机的44444端口发送内容
        telnet localhost 44444
6、多数据源汇总案例
    多Source汇总数据到单Flume
    (1) 创建flume1.conf
        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # Describe/configure the source
        a1.sources.r1.type = exec
        a1.sources.r1.command = tail -F /opt/module/group.log
        a1.sources.r1.shell = /bin/bash -c

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = hadoop131
        a1.sinks.k1.port = 4141

        # Describe the channel
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
    (2) 创建flume2.conf
        # Name the components on this agent
        a2.sources = r1
        a2.sinks = k1
        a2.channels = c1

        # Describe/configure the source
        a2.sources.r1.type = netcat
        a2.sources.r1.bind = hadoop130
        a2.sources.r1.port = 44444

        # Describe the sink
        a2.sinks.k1.type = avro
        a2.sinks.k1.hostname = hadoop132
        a2.sinks.k1.port = 4141

        # Use a channel which buffers events in memory
        a2.channels.c1.type = memory
        a2.channels.c1.capacity = 1000
        a2.channels.c1.transactionCapacity = 100

        # Bind the source and sink to the channel
        a2.sources.r1.channels = c1
        a2.sinks.k1.channel = c1
    (3) 创建flume3.conf
        # Name the components on this agent
        a3.sources = r1
        a3.sinks = k1
        a3.channels = c1

        # Describe/configure the source
        a3.sources.r1.type = avro
        a3.sources.r1.bind = hadoop132
        a3.sources.r1.port = 4141

        # Describe the sink
        # Describe the sink
        a3.sinks.k1.type = logger

        # Describe the channel
        a3.channels.c1.type = memory
        a3.channels.c1.capacity = 1000
        a3.channels.c1.transactionCapacity = 100

        # Bind the source and sink to the channel
        a3.sources.r1.channels = c1
        a3.sinks.k1.channel = c1
    (4) 执行配置文件
        bin/flume-ng agent --conf conf/ --name a3 --conf-file demo/group3/flume3.conf -Dflume.root.logger=INFO,console
        bin/flume-ng agent --conf conf/ --name a2 --conf-file demo/group3/flume2.conf
        bin/flume-ng agent --conf conf/ --name a1 --conf-file demo/group3/flume1.conf
    (5) 追加数据
        echo 'hello' > group.log
        telnet hadoop104 44444
上一篇下一篇

猜你喜欢

热点阅读