自定义flume拦截器-练习1
2021-11-27 本文已影响0人
夜希辰
参考文章1:Flume 自定义 Interceptor(拦截器)
参考文章2:java静态内部类和非静态内部类对外部类属性的使用
问题1:flume自定义拦截器时,为什么要分单event处理,和多个event处理
问题2:静态内部类,创建外部类对象并访问外部类对象
问题3:avro source 、avro sink 定义
问题4:avro source 、avro sink 采集通道还有问题,数据传输不过去
一、flume拦截器介绍
拦截器是简单的插件式组件,设置在source和channel之间。source接收到的事件event,在写入channel之前,拦截器都可以进行转换或者删除这些事件。每个拦截器只处理同一个source接收到的事件。可以自定义拦截器。
本篇文章主要讲解自定义连接器。flume内置连接器,可参考该文章。
二、自定义连接器
需求:在bigdata02机器上,监听44444端口。将包含hello的数据发送到bigdata03机器控制台,将不包含hello的数据发送到bigdata03机器控制台。
步骤:
1、自定义flume拦截器
2、在bigdata02、bigdata03、bigdata04服务器上编写conf配置文件。flume2.conf 、flume3.conf、flume4.conf
3、测试
1、自定义flume拦截器
1)引入 pom 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ydm</groupId>
<artifactId>flumeinterceptor1127</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<!--包名,一般是域名的反写-->
<groupId>org.apache.flume</groupId>
<!--项目名-->
<artifactId>flume-ng-core</artifactId>
<!--所需要的jar的版本-->
<version>1.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
2)编写拦截器类
package com.atguigu.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 定义flume连接器
* 1、实现org.apache.flume.interceptor.Interceptor 接口,需要重写接口中的方法
* 接口比如电脑的接口,插座的接口,虽然是不同厂家生产的,但是我们都可以用。
* 接口是一个公共的规范,只要符合规范,大家都可以使用,java中的接口更多体现在对行为的抽象。
*/
public class TypeInterceptor implements Interceptor {
//声明一个存放事件的集合
private List<Event> addHeaderEvents;
@Override
public void initialize() {
//初始化
addHeaderEvents = new ArrayList<>();
}
//单个事件拦截
@Override
public Event intercept(Event event) {
//1.获取事件中的投信息
Map<String, String> headers = event.getHeaders();
//2.获取事件中的body 信息
// event.getBody(); 返回字节数组,需要将数组转化为字符串
String body = new String(event.getBody());
//3.根据body中是否含有hello单词来决定添加怎样的头信息
if(body.contains("hello")){
//4.添加头信息
headers.put("type","hello");
}else {
//4.添加头信息
headers.put("type","unhello");
}
return event;
}
//批量事件拦截
@Override
public List<Event> intercept(List<Event> events) {
//1.清空集合
addHeaderEvents.clear();
//2.遍历events
for(Event event: events){
//3.给每一个事件添加头信息
addHeaderEvents.add(intercept(event));
}
//4.返回结果
return addHeaderEvents;
}
@Override
public void close() {
}
/**
* 定义一个静态内部类
* 内部类:就是在一个类中定义一个类。举例:在一个类A的内部定义一个类B,类B就称为内部类
* EG:生活中,在笔记本内部有CPU,笔记本可以看成外部类,CPU可以看成内部类
*
* 静态内部类
* 1.只可以访问外部类的静态属性,包括静态私有属性
* 2.不可以访问外部类的非静态属性,包括私有属性。但可以通过new 外部类().成员的方式访问
*
* 使用内部类最吸引人的原因是:每个内部类都能独立地继承一个(接口的)实现
*
*/
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TypeInterceptor();
}
//配置信息
@Override
public void configure(Context context) {
}
}
}
2、在bigdata02、bigdata03、bigdata04服务器上编写conf配置文件
1) 在bigdata02服务器上,cd /usr/flume/conf 在conf目录下新建flume2.conf文件
#name
a2.sources = r1
a2.channels = c1 c2
a2.sinks = k1 k2
#source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444
#Interceptor
a1.sources.r1.interceptors = i1
#全类名
a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.TypeInterceptor$Builder
#channel selector
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = type
a2.sources.r1.selector.mapping.hello = c1
a2.sources.r1.selector.mapping.unhello = c2
#channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
#sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop103
a2.sinks.k1.port = 4142
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = hadoop104
a2.sinks.k2.port = 4142
#bind
a2.sources.r1.channels = c1 c2
a2.sources.k1.channel = c1
a2.sources.k2.channel = c2
2) 在bigdata03服务器上,cd /usr/flume/conf 在conf目录下新建flume3.conf文件
#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata03
a3.sources.r1.prot = 4142
#channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
3) 在bigdata03服务器上,cd /usr/flume/conf 在conf目录下新建flume4.conf文件
#name
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#source
a4.sources.r1.type = avro
a4.sources.r1.bind = bigdata04
a4.sources.r1.prot = 4142
#channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100
#sink
a4.sinks.k1.type = logger
#Bind
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
3、测试
flume3 和 flume4 需要先启动,flume2 需要连接flume3 和 flume4,若先启动 flume2 会报连接不上(也可以无视错误日志,先启动)
cd /opt/apache-flume-1.7.0-bin
bin/flume-ng agent --conf conf/ --name a3 --conf-file /tmp/flume-job/interceptor/flume3 -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a2 --conf-file /tmp/flume-job/interceptor/flume2 -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/interceptor/flume1 -Dflume.root.logger=INFO,console
先启动bigdata03\bigdata04 的配置文件