Flume 实时获取日志内容插入MySQL

2018-11-28  本文已影响0人  CNSTT

前言:

本文章适用于在Windows上使用Flume 自定义sink,实时获取日志文件内容并输出到Mysql表中。首先确保你的flume-ng可以启动,跳过个别步骤可自行百度。

1、MySQL创建表:

CREATE TABLE `fruit` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `salesman` varchar(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;

2、创建自定义Sink:

由于Flume的sink无法连接数据库,需要自己写一个自定义sink来连接

2.1、打开Eclipse,新建一个maven project

勾上 Create a simple project (skip archtype selection)
Group Id:

org.flume.mysql.sink

Artifact Id:

flumedemo
2.2、配置pom.xml

<build>...</build> 方便后续maven打包插件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.flume.mysql.sink</groupId>
  <artifactId>flumedemo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.5.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.7.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.25</version>
        </dependency>
        
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                   <artifactId> maven-assembly-plugin </artifactId>
                   <configuration>
                        <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                             <manifest>
                                  <mainClass></mainClass>
                             </manifest>
                        </archive>
                   </configuration>
                   <executions>
                        <execution>
                             <id>make-assembly</id>
                             <phase>package</phase>
                             <goals>
                                  <goal>single</goal>
                             </goals>
                        </execution>
                   </executions>
              </plugin>
              <plugin>  
                <groupId>org.apache.maven.plugins</groupId>  
                <artifactId>maven-compiler-plugin</artifactId>  
                <version>3.1</version>  
                <configuration>  
                    <source>1.7</source>  
                    <target>1.7</target>  
                </configuration>  
            </plugin>

        </plugins>
    </build>
</project>
2.3、新建包和类

右击 src/main/java 新建一个package org.flume.mysql.sink
实体类Fruit.java

package org.flume.mysql.sink;

public class Fruit {
    private String name;
    private String salesman;
    
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getSalesman() {
        return salesman;
    }
    public void setSalesman(String salesman) {
        this.salesman = salesman;
    }
    
}

自定义sink MysqlSink.java

package org.flume.mysql.sink;

import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
 
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
 
public class MysqlSink extends AbstractSink implements Configurable {

    private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);
    private String hostname;
    private String port;
    private String databaseName;
    private String tableName;
    private String user;
    private String password;
    private PreparedStatement preparedStatement;
    private Connection conn;
    private int batchSize;

    public MysqlSink() {
        LOG.info("Start MysqlSink");
    }

    public void configure(Context context) {
        hostname = context.getString("hostname");
        Preconditions.checkNotNull(hostname, "hostname must be set!!");
        port = context.getString("port");
        Preconditions.checkNotNull(port, "port must be set!!");
        databaseName = context.getString("databaseName");
        Preconditions.checkNotNull(databaseName, "databaseName must be set!!");
        tableName = context.getString("tableName");
        Preconditions.checkNotNull(tableName, "tableName must be set!!");
        user = context.getString("user");
        Preconditions.checkNotNull(user, "user must be set!!");
        password = context.getString("password");
        Preconditions.checkNotNull(password, "password must be set!!");
        batchSize = context.getInteger("batchSize", 100);
        Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
    }

    @Override
    public void start() {
        super.start();
        try {            
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;        

        try {
            conn = DriverManager.getConnection(url, user, password);
            conn.setAutoCommit(false);
            //创建一个Statement对象
            preparedStatement = conn.prepareStatement("insert into " + tableName +
                    " (name,salesman) values (?,?)");

        } catch (SQLException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void stop() {
        super.stop();
        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        if (conn != null) {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }

    public Status process() throws EventDeliveryException {
        Status result = Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        Event event;
        String content;
        List<Fruit> infos = Lists.newArrayList();
        transaction.begin();        
        try {
            for (int i = 0; i < batchSize; i++) {
                event = channel.take();     //从channel中获取一条数据
                LOG.info("i : " + i);                
                if (event != null) {        //对事件进行处理                    
                    content = new String(event.getBody());  //event 的 body为 "apple,wang"
                    LOG.info("content : " + content);
                    Fruit fruit=new Fruit();
                    if (content.contains(",")) {                        
                        //存储 event 的fruit. name
                        fruit.setName(content.substring(0, content.indexOf(",")));
                        //存储 event 的 fruit.salesman 逗号分开 ","
                        fruit.setSalesman(content.substring(content.indexOf(",") + 1));
                    }else{
                        fruit.setName(content);                       
                    }
                    infos.add(fruit);
                } else {
                    result = Status.BACKOFF;
                    //LOG.info("result : " + result);
                    break;
                }
            }

            if (infos.size() > 0) {
                preparedStatement.clearBatch();
                for (Fruit temp : infos) {
                    preparedStatement.setString(1, temp.getName());
                    preparedStatement.setString(2, temp.getSalesman());
                    preparedStatement.addBatch();
                }
                preparedStatement.executeBatch();
                conn.commit();
            }
            transaction.commit();       //执行提交操作
        } catch (Exception e) {
            try {
                transaction.rollback(); //执行回滚操作
                //LOG.info("------------transaction.rollback()------------");
            } catch (Exception e2) {
                LOG.error("Exception in rollback. Rollback might not have been" +
                        "successful.", e2);
            }
            LOG.error("Failed to commit transaction." +
                    "Transaction rolled back.", e);
            Throwables.propagate(e);
        } finally {
            transaction.close();
        }
        return result;
    }
}
LOG.info("***");    //输出语句,调试,方便之后再flume-ng输出看到
2.4、mvn打包*

右击项目 flumedemo
1、Maven → update project
2、Run As → Maven clean
3、Run As → Maven install (第一次可能比较久)
打包完成!如下图

image.png
在 target 目录下复制 flumedemo-0.0.1-SNAPSHOT-jar-with-dependencies.jar到flume的lib目录下

3、配置conf:

D:\com\apache-flume-1.8.0-bin\logs目录新建一个空的fruitdata.log (之后添加数据)
D:\com\apache-flume-1.8.0-bin\conf目录新建一个fruit.conf

fruit.conf

agent1.sources = source1
agent1.sinks = mysqlSink
agent1.channels = channel1

# Execsource 命令tail -f 实时获取文件新增变化
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f D:/com/apache-flume-1.8.0-bin/logs/fruitdata.log
agent1.sources.source1.channels = channel1

# MysqlSink配置  package名.类名
agent1.sinks.mysqlSink.type = org.flume.mysql.sink.MysqlSink
agent1.sinks.mysqlSink.hostname = localhost
agent1.sinks.mysqlSink.port = 3306
agent1.sinks.mysqlSink.databaseName = flume     //数据库名
agent1.sinks.mysqlSink.tableName = fruit        //表名字
agent1.sinks.mysqlSink.user=root                //用户名
agent1.sinks.mysqlSink.password = ****          //密码
agent1.sinks.mysqlSink.channel = channel1

agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100

由于在Windows中没有 tail -f 的命令,找了很久感谢前人的分享
tail 下载地址见文末zip
不建议放到C:\Windows\System32
解压到D:\tail 环境变量path追加;D:\tail即可

4、实验:

目录D:\com\apache-flume-1.8.0-bin 右键+shift打开cmd

D:\com\apache-flume-1.8.0-bin>flume-ng agent -c conf -f conf/fruit.conf -n agent1 -property "flume.root.logger=INFO,console"

此时向fruitdata.log添加数据 字段之间用,隔开

apple,wang
banana,peng
kiwifruit,cnstt
lemon,bob

截图

image.png
image.png
逐条添加数据,看到数据库表内也增加数据,因为tail -f 命令可以实时读取。

至此完成在Windows环境下使用Flume 自定义Sink获取日志输出到MySQL中表内,中文会出现乱码,后续继续研究!

谢谢阅读,有帮助的点个❤!

上一篇下一篇

猜你喜欢

热点阅读