尚硅谷大数据技术之Flume
第5章 Flume高级之自定义MySQLSource
5.1 自定义Source说明
Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Source。
如:实时监控MySQL,从MySQL中获取数据传输到HDFS或者其他存储框架,所以此时需要我们自己实现MySQLSource。
官方也提供了自定义source的接口:
官网说明:https://flume.apache.org/FlumeDeveloperGuide.html#source
5.3 自定义MySQLSource组成
image.png5.2 自定义MySQLSource步骤
根据官方说明自定义MySqlSource需要继承AbstractSource类并实现Configurable和PollableSource接口。
实现相应方法:
getBackOffSleepIncrement()//暂不用
getMaxBackOffSleepInterval()//暂不用
configure(Context context)//初始化context
process()//获取数据(从MySql获取数据,业务处理比较复杂,所以我们定义一个专门的类——SQLSourceHelper来处理跟MySql的交互),封装成Event并写入Channel,这个方法被循环调用
stop()//关闭相关的资源
5.4 代码实现
5.4.1 导入Pom依赖
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
5.4.2 添加配置信息
在ClassPath下添加jdbc.properties和log4j. properties
jdbc.properties:
dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://hadoop102:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8
dbUser=root
dbPassword=000000
log4j. properties:
--------console-----------
log4j.rootLogger=info,myconsole,myfile
log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
log4j.rootLogger=error,myfile
log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.myfile.File=/tmp/flume.log
log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
5.4.3 SQLSourceHelper
1)属性说明:
属性 说明(括号中为默认值)
runQueryDelay 查询时间间隔(10000)
batchSize 缓存大小(100)
startFrom 查询语句开始id(0)
currentIndex 查询语句当前id,每次查询之前需要查元数据表
recordSixe 查询返回条数
table 监控的表名
columnsToSelect 查询字段(*)
customQuery 用户传入的查询语句
query 查询语句
defaultCharsetResultSet 编码格式(UTF-8)
2)方法说明:
方法 说明
SQLSourceHelper(Context context) 构造方法,初始化属性及获取JDBC连接
InitConnection(String url, String user, String pw) 获取JDBC连接
checkMandatoryProperties() 校验相关属性是否设置(实际开发中可增加内容)
buildQuery() 根据实际情况构建sql语句,返回值String
executeQuery() 执行sql语句的查询操作,返回值List<List<Object>>
getAllRows(List<List<Object>> queryResult) 将查询结果转换为String,方便后续操作
updateOffset2DB(int size) 根据每次查询结果将offset写入元数据表
execSql(String sql) 具体执行sql语句方法
getStatusDBIndex(int startFrom) 获取元数据表中的offset
queryOne(String sql) 获取元数据表中的offset实际sql语句执行方法
close() 关闭资源
3)代码分析
image.png
本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。