数据采集Flume

Flume6:Flume日志采集实战

2020-09-08  本文已影响0人  勇于自信
1.采集架构
2.Flume安装

可参考前面的文章:
Flume的安装部署
https://www.jianshu.com/p/bd72f04be576
在这里,我们使用集群模式,因此,需要把在master节点部署的flume分发到slave节点上:
]# scp -rp apache-flume-1.7.0-bin slave1:$PWD
]# scp -rp apache-flume-1.7.0-bin slave2:$PWD

3.Flume配置

1)Flume 配置分析



Flume 直接读 log 日志的数据,log 日志的格式是 app-yyyy-mm-dd.log。
2)Flume 的具体配置如下:
(1)在/opt/module/flume/conf 目录下创建 file-flume-kafka.conf 文件
vim file-flume-kafka.conf

a1.sources=r1
a1.channels=c1 c2
#configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/src/apache-flume-1.7.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/log/2020-11-03/app.*.log
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.zgjy.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.zgjy.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_resource = c1
a1.sources.r1.selector.mapping.topic_action = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c1.kafka.topic = topic_resource
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
# configure channe2
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c2.kafka.topic = topic_action
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

测试日志:


配置说明如下:


4.Flume 的 ETL 和分类型拦截器

本项目中自定义了两个拦截器,分别是:ETL 拦截器、日志类型区分拦截器。
ETL 拦截器主要作用:过滤时间戳不合法和 Json 数据不完整的日志
日志类型区分拦截器主要作用:将启动日志和事件日志区分开来,方便发往 Kafka 的不 同 Topic。

1)创建 Maven 工程 flume-interceptor
2)创建包名:com.zgjy.flume.interceptor
3)在 pom.xml 文件中添加如下配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.zgjy</groupId>
    <artifactId>flume-interceptor</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.41</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.3</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>

    </build>

</project>

4)在 com.zgjy.flume.interceptor 包下创建 LogETLInterceptor 类名
Flume ETL 拦截器 LogETLInterceptor实现代码如下:

package com.zgjy.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.List;

public class LogETLInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));
        try {
            JSONObject logJson = JSONObject.parseObject(log);
            String type = logJson.getString("type");
            if(type.equals("resource")){
                if(LOgUtils.validateResource(logJson)){
                    return event;
                }
            }else {
                if(LOgUtils.validateAction(logJson)){
                    return event;
                }
            }
        }catch (Exception e){
            return null;
        }



        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        return null;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}


5)Flume 日志类型区分拦截器 LogTypeInterceptor代码如下:

package com.zgjy.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LogTypeInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        // 1 获取 body 数据
        byte[] body = event.getBody();
        String log = new String(body, Charset.forName("UTF-8"));
        // 2 获取 header
        Map<String, String> headers = event.getHeaders();
        // 3 判断数据类型并向 Header 中赋值
        JSONObject logJson = JSONObject.parseObject(log);
        String type = logJson.getString("type");
        if(type.equals("resource")){
            headers.put("topic","topic_resource");
        }else {
            headers.put("topic","topic_action");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        ArrayList<Event> interceptors = new ArrayList<>();
        for (Event event:events){
            Event intercept = intercept(event);
            interceptors.add(intercept);
        }
        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

LOgUtils.java代码如下:

package com.zgjy.flume.interceptor;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.math.NumberUtils;

public class LOgUtils {


    public static boolean validateResource(JSONObject logJson) {
        // 校验时间
        String time = logJson.getString("time");
        if(time.length()!=10 && time.length()!=13){
            return false;
        }
        if(!NumberUtils.isDigits(time)){
            return false;
        }
        return true;
    }

    public static boolean validateAction(JSONObject logJson) {
        // 校验时间
        String time = logJson.getString("time");
        if(time.length()!=10 && time.length()!=13){
            return false;
        }
        if(!NumberUtils.isDigits(time)){
            return false;
        }
        return true;
    }
}

6)打包
双击assembly:assembly



打包之后要放入 Flume 的 lib 文件夹下面。



8)分发 Flume 到 slave2节点上
9)分别进入master和slave的flume的home目录启动:
bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
5.日志采集 Flume 启动停止脚本
#! /bin/bash

case $1 in
"start"){
    for i in master slave2
        do
            echo " --------启动 $i 采集 flume-------"
            ssh $i "nohup /usr/local/src/apache-flume-1.7.0-bin/bin/flume-ng agent --conf-file /usr/local/src/apache-flume-1.7.0-bin/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE > /usr/local/src/apache-flume-1.7.0-bin/out.log 2>&1 &"
        done
};;
"stop"){
    for i in master slave2
        do
            echo " --------停止 $i 采集 flume-------"
            ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
        done
};;
esac

此时master和slave2两个节点的flume已经在监控采集/tmp/log/20200907/下的文件了。

6.Flume对接kafka

(1)启动zookeeper
每个节点执行 zkServer.sh start
(2)启动kafka
每个节点进入kafka下home目录,执行:
./bin/kafka-server-start.sh config/server.properties
(3)kafka消费Flume
1)Flume 配置分析



2)Flume 的具体配置如下:
(1)在 slave2 的/usr/local/src/apache-flume-1.7.0-bin/conf 目录下创建 kafka-flume-hdfs.conf 文件
vim kafka-flume-hdfs.conf
在文件配置如下内容

## 组件 
a1.sources=r1 r2 
a1.channels=c1 c2 
a1.sinks=k1 k2 
## source1 
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource 
a1.sources.r1.batchSize = 5000 
a1.sources.r1.batchDurationMillis = 2000 
a1.sources.r1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 
a1.sources.r1.kafka.topics=topic_resource
## source2 
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource 
a1.sources.r2.batchSize = 5000 
a1.sources.r2.batchDurationMillis = 2000 
a1.sources.r2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092 
a1.sources.r2.kafka.topics=topic_action
## channel1 
a1.channels.c1.type = file 
a1.channels.c1.checkpointDir = /usr/local/src/apache-flume-1.7.0-bin/checkpoint/behavior1 
a1.channels.c1.dataDirs = /usr/local/src/apache-flume-1.7.0-bin/data/behavior1/ 
a1.channels.c1.maxFileSize = 2146435071 
a1.channels.c1.capacity = 1000000 
a1.channels.c1.keep-alive = 6 
## channel2 
a1.channels.c2.type = file 
a1.channels.c2.checkpointDir = /usr/local/src/apache-flume-1.7.0-bin/checkpoint/behavior2 
a1.channels.c2.dataDirs = /usr/local/src/apache-flume-1.7.0-bin/data/behavior2/ 
a1.channels.c2.maxFileSize = 2146435071 
a1.channels.c2.capacity = 1000000 
a1.channels.c2.keep-alive = 6 
## sink1 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = /origin_data/zgjy/log/topic_resource/%Y-%m-%d 
a1.sinks.k1.hdfs.filePrefix = logresouce- 
##sink2 
a1.sinks.k2.type = hdfs 
a1.sinks.k2.hdfs.path = /origin_data/zgjy/log/topic_action/%Y-%m-%d 
a1.sinks.k2.hdfs.filePrefix = logaction- 
## 不要产生大量小文件 
a1.sinks.k1.hdfs.rollInterval = 3600 
a1.sinks.k1.hdfs.rollSize = 134217728 
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 3600 
a1.sinks.k2.hdfs.rollSize = 134217728 
a1.sinks.k2.hdfs.rollCount = 0 

## 控制输出文件是原生文件。 
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 
a1.sinks.k1.hdfs.codeC = lzop 
a1.sinks.k2.hdfs.codeC = lzop 
## 拼装 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel= c1 
a1.sources.r2.channels = c2 
a1.sinks.k2.channel= c2

配置说明:



(2)启动flume消费:
进入flume的home目录下,执行:
./bin/flume-ng agent --conf-file conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE
(3)flume消费启动停止脚本:

#! /bin/bash
case $1 in
"start"){
        for i in slave1
        do
                echo " --------启动 $i 消费 flume-------"
                ssh $i "nohup /usr/local/src/apache-flume-1.7.0-bin/bin/flume-ng agent --conf-file /usr/local/src/apache-flume-1.7.0-bin/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/usr/local/src/apache-flume-1.7.0-bin/log.txt 2>&1 &"
        done
};;
"stop"){
        for i in slave1
        do
                echo " --------停止 $i 消费 flume-------"
                ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
        done
};;
esac

检查:进入hdfs目录查看,数据已经被采集到hdfs:



如果只需要一个channel,只用etl拦截器,不用区分kafka分发类型,那么flume配置如下:
file-flume-kafka.conf

a1.sources=r1
a1.channels=c1
#configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/src/apache-flume-1.7.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/log/2020-11-07/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1
#interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zgjy.flume.interceptor.LogETLInterceptor$Builder
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c1.kafka.topic = topic_event
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

kafka-flume-hdfs.conf

# 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sources.r1.kafka.topics=topic_event
# source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 1000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sources.r2.kafka.topics=topic_action
# channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/local/src/apache-flume-1.7.0-bin/checkpoint/behavior1
a1.channels.c1.dataDirs = /usr/local/src/apache-flume-1.7.0-bin/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
# channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /usr/local/src/apache-flume-1.7.0-bin/checkpoint/behavior2
a1.channels.c2.dataDirs = /usr/local/src/apache-flume-1.7.0-bin/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
# sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/zgjy/log/topic_event/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logevent-
#sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/zgjy/log/topic_action/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logaction-
# 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

# 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
# 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2  

补充:logback配置日志

配置:

<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
    <!--日志文件主目录:这里${user.home}为当前服务器用户主目录-->
    <property name="LOG_HOME" value="/tmp/log"/>
    <!--日志文件名称:这里spring.application.name表示工程名称-->
    <!-- 控制台输出 -->
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder
                class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!--格式化输出:%d 表示日期,%thread 表示线程名,%-5level:级别从左显示 5 个字符宽度%msg: 日志消息,%n 是换行符 -->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
    </appender>

    <!--配置日志文件(File)-->
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!--设置策略-->
        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <!--日志文件路径:这里%d{yyyyMMdd}表示按天分类日志-->
            <FileNamePattern>${LOG_HOME}/%d{yyyyMMdd}/app-%d{yyyy-MM-dd}.%i.log</FileNamePattern>
            <!--日志保留天数-->
            <MaxHistory>30</MaxHistory>
            <maxFileSize>200MB</maxFileSize>
        </rollingPolicy>
        <!--设置格式-->
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
            <pattern>%msg%n</pattern>
            <!-- 或者使用默认配置 -->
            <!--<pattern>${FILE_LOG_PATTERN}</pattern>-->
            <charset>utf8</charset>
        </encoder>
        <!--日志文件最大的大小-->
        <!--<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <MaxFileSize>100MB</MaxFileSize>
        </triggeringPolicy>-->
    </appender>

    <!--异步打印日志-->
    <appender name="ASYNC_FILE"
              class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丢失日志.默认的,如果队列的 80%已满,则会丢弃 TRACT、DEBUG、INFO 级别的日志 -->
        <discardingThreshold>0</discardingThreshold> <!-- 更改默认的队列的深度,该值会影响性能.默认值为 256 -->
        <queueSize>512</queueSize> <!-- 添加附加的 appender,最多只能添加一个 -->
        <appender-ref ref="FILE"/>
    </appender>

    <root level="INFO">
        <appender-ref ref="STDOUT"/>
        <appender-ref ref="ASYNC_FILE"/>
<!--        <appender-ref ref="error"/>-->
    </root>

</configuration>

此配置,每天会生成一个日期目录,并且在改日的目录里产生一个个小文件(此处配置200MB),如下图:


如果希望每天在同一个目录下生成日志,不会产生新的日期目录,配置需要更改两个地方,如下:


上一篇下一篇

猜你喜欢

热点阅读