Flume6:Flume日志采集实战
1.采集架构
2.Flume安装
可参考前面的文章:
Flume的安装部署
https://www.jianshu.com/p/bd72f04be576
在这里,我们使用集群模式,因此,需要把在master节点部署的flume分发到slave节点上:
]# scp -rp apache-flume-1.7.0-bin slave1:$PWD
]# scp -rp apache-flume-1.7.0-bin slave2:$PWD
3.Flume配置
1)Flume 配置分析
Flume 直接读 log 日志的数据,log 日志的格式是 app-yyyy-mm-dd.log。
2)Flume 的具体配置如下:
(1)在/opt/module/flume/conf 目录下创建 file-flume-kafka.conf 文件
vim file-flume-kafka.conf
a1.sources=r1
a1.channels=c1 c2
#configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/src/apache-flume-1.7.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/log/2020-11-03/app.*.log
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2
#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.zgjy.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.zgjy.flume.interceptor.LogTypeInterceptor$Builder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_resource = c1
a1.sources.r1.selector.mapping.topic_action = c2
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c1.kafka.topic = topic_resource
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
# configure channe2
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c2.kafka.topic = topic_action
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer
测试日志:
配置说明如下:
4.Flume 的 ETL 和分类型拦截器
本项目中自定义了两个拦截器,分别是:ETL 拦截器、日志类型区分拦截器。
ETL 拦截器主要作用:过滤时间戳不合法和 Json 数据不完整的日志
日志类型区分拦截器主要作用:将启动日志和事件日志区分开来,方便发往 Kafka 的不 同 Topic。
1)创建 Maven 工程 flume-interceptor
2)创建包名:com.zgjy.flume.interceptor
3)在 pom.xml 文件中添加如下配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.zgjy</groupId>
<artifactId>flume-interceptor</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.41</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.3</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
4)在 com.zgjy.flume.interceptor 包下创建 LogETLInterceptor 类名
Flume ETL 拦截器 LogETLInterceptor实现代码如下:
package com.zgjy.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.List;
public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
try {
JSONObject logJson = JSONObject.parseObject(log);
String type = logJson.getString("type");
if(type.equals("resource")){
if(LOgUtils.validateResource(logJson)){
return event;
}
}else {
if(LOgUtils.validateAction(logJson)){
return event;
}
}
}catch (Exception e){
return null;
}
return null;
}
@Override
public List<Event> intercept(List<Event> list) {
return null;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
5)Flume 日志类型区分拦截器 LogTypeInterceptor代码如下:
package com.zgjy.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class LogTypeInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 1 获取 body 数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
// 2 获取 header
Map<String, String> headers = event.getHeaders();
// 3 判断数据类型并向 Header 中赋值
JSONObject logJson = JSONObject.parseObject(log);
String type = logJson.getString("type");
if(type.equals("resource")){
headers.put("topic","topic_resource");
}else {
headers.put("topic","topic_action");
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
ArrayList<Event> interceptors = new ArrayList<>();
for (Event event:events){
Event intercept = intercept(event);
interceptors.add(intercept);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
LOgUtils.java代码如下:
package com.zgjy.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.math.NumberUtils;
public class LOgUtils {
public static boolean validateResource(JSONObject logJson) {
// 校验时间
String time = logJson.getString("time");
if(time.length()!=10 && time.length()!=13){
return false;
}
if(!NumberUtils.isDigits(time)){
return false;
}
return true;
}
public static boolean validateAction(JSONObject logJson) {
// 校验时间
String time = logJson.getString("time");
if(time.length()!=10 && time.length()!=13){
return false;
}
if(!NumberUtils.isDigits(time)){
return false;
}
return true;
}
}
6)打包
双击assembly:assembly
打包之后要放入 Flume 的 lib 文件夹下面。
8)分发 Flume 到 slave2节点上
9)分别进入master和slave的flume的home目录启动:
bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
5.日志采集 Flume 启动停止脚本
#! /bin/bash
case $1 in
"start"){
for i in master slave2
do
echo " --------启动 $i 采集 flume-------"
ssh $i "nohup /usr/local/src/apache-flume-1.7.0-bin/bin/flume-ng agent --conf-file /usr/local/src/apache-flume-1.7.0-bin/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE > /usr/local/src/apache-flume-1.7.0-bin/out.log 2>&1 &"
done
};;
"stop"){
for i in master slave2
do
echo " --------停止 $i 采集 flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs kill"
done
};;
esac
此时master和slave2两个节点的flume已经在监控采集/tmp/log/20200907/下的文件了。
6.Flume对接kafka
(1)启动zookeeper
每个节点执行 zkServer.sh start
(2)启动kafka
每个节点进入kafka下home目录,执行:
./bin/kafka-server-start.sh config/server.properties
(3)kafka消费Flume
1)Flume 配置分析
2)Flume 的具体配置如下:
(1)在 slave2 的/usr/local/src/apache-flume-1.7.0-bin/conf 目录下创建 kafka-flume-hdfs.conf 文件
vim kafka-flume-hdfs.conf
在文件配置如下内容
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sources.r1.kafka.topics=topic_resource
## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sources.r2.kafka.topics=topic_action
## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/local/src/apache-flume-1.7.0-bin/checkpoint/behavior1
a1.channels.c1.dataDirs = /usr/local/src/apache-flume-1.7.0-bin/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /usr/local/src/apache-flume-1.7.0-bin/checkpoint/behavior2
a1.channels.c2.dataDirs = /usr/local/src/apache-flume-1.7.0-bin/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/zgjy/log/topic_resource/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logresouce-
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/zgjy/log/topic_action/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logaction-
## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 3600
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
配置说明:
(2)启动flume消费:
进入flume的home目录下,执行:
./bin/flume-ng agent --conf-file conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE
(3)flume消费启动停止脚本:
#! /bin/bash
case $1 in
"start"){
for i in slave1
do
echo " --------启动 $i 消费 flume-------"
ssh $i "nohup /usr/local/src/apache-flume-1.7.0-bin/bin/flume-ng agent --conf-file /usr/local/src/apache-flume-1.7.0-bin/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/usr/local/src/apache-flume-1.7.0-bin/log.txt 2>&1 &"
done
};;
"stop"){
for i in slave1
do
echo " --------停止 $i 消费 flume-------"
ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs kill"
done
};;
esac
检查:进入hdfs目录查看,数据已经被采集到hdfs:
如果只需要一个channel,只用etl拦截器,不用区分kafka分发类型,那么flume配置如下:
file-flume-kafka.conf
a1.sources=r1
a1.channels=c1
#configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /usr/local/src/apache-flume-1.7.0-bin/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/log/2020-11-07/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1
#interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zgjy.flume.interceptor.LogETLInterceptor$Builder
# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.channels.c1.kafka.topic = topic_event
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer
kafka-flume-hdfs.conf
# 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1
## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sources.r1.kafka.topics=topic_event
# source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 1000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sources.r2.kafka.topics=topic_action
# channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /usr/local/src/apache-flume-1.7.0-bin/checkpoint/behavior1
a1.channels.c1.dataDirs = /usr/local/src/apache-flume-1.7.0-bin/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
# channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /usr/local/src/apache-flume-1.7.0-bin/checkpoint/behavior2
a1.channels.c2.dataDirs = /usr/local/src/apache-flume-1.7.0-bin/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
# sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/zgjy/log/topic_event/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logevent-
#sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/zgjy/log/topic_action/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logaction-
# 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
# 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
# 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
补充:logback配置日志
配置:
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false">
<!--日志文件主目录:这里${user.home}为当前服务器用户主目录-->
<property name="LOG_HOME" value="/tmp/log"/>
<!--日志文件名称:这里spring.application.name表示工程名称-->
<!-- 控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder
class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!--格式化输出:%d 表示日期,%thread 表示线程名,%-5level:级别从左显示 5 个字符宽度%msg: 日志消息,%n 是换行符 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!--配置日志文件(File)-->
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!--设置策略-->
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<!--日志文件路径:这里%d{yyyyMMdd}表示按天分类日志-->
<FileNamePattern>${LOG_HOME}/%d{yyyyMMdd}/app-%d{yyyy-MM-dd}.%i.log</FileNamePattern>
<!--日志保留天数-->
<MaxHistory>30</MaxHistory>
<maxFileSize>200MB</maxFileSize>
</rollingPolicy>
<!--设置格式-->
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<pattern>%msg%n</pattern>
<!-- 或者使用默认配置 -->
<!--<pattern>${FILE_LOG_PATTERN}</pattern>-->
<charset>utf8</charset>
</encoder>
<!--日志文件最大的大小-->
<!--<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>100MB</MaxFileSize>
</triggeringPolicy>-->
</appender>
<!--异步打印日志-->
<appender name="ASYNC_FILE"
class="ch.qos.logback.classic.AsyncAppender"> <!-- 不丢失日志.默认的,如果队列的 80%已满,则会丢弃 TRACT、DEBUG、INFO 级别的日志 -->
<discardingThreshold>0</discardingThreshold> <!-- 更改默认的队列的深度,该值会影响性能.默认值为 256 -->
<queueSize>512</queueSize> <!-- 添加附加的 appender,最多只能添加一个 -->
<appender-ref ref="FILE"/>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="ASYNC_FILE"/>
<!-- <appender-ref ref="error"/>-->
</root>
</configuration>
此配置,每天会生成一个日期目录,并且在改日的目录里产生一个个小文件(此处配置200MB),如下图:
如果希望每天在同一个目录下生成日志,不会产生新的日期目录,配置需要更改两个地方,如下: