Flink学习笔记(3):Sink to JDBC

2016-10-05  本文已影响8142人  郭寻抚

1. 前言

1.1 说明

本文通过一个Demo程序,演示Flink从Kafka中读取数据,并将数据以JDBC的方式持久化到关系型数据库中。通过本文,可以学习如何自定义Flink Sink和Flink Steaming编程的步骤。

1.2 软件版本

1.3 依赖jar包

请将以下依赖放在pom.xml中。这里使用的关系型数据是PostgreSQL,也可以换成其它关系型数据库的驱动程序。

 <properties>
    <flink.version>1.1.2</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>9.1-901-1.jdbc4</version>
    </dependency>
</dependencies>

2. 自定义Sink

2.1 内置的Streaming Connector

Flink 内置了一些Streaming Connector,用于和第三方的系统交互。截至到当前为止,Flink支持以下Connector。括号中的source代表数据从这些第三方系统中流入Flink中,sink代表数据从Flink流到这些第三方系统中。

除此之外,Flink还允许我们自定义source和sink。本文所述例子是从Kafka中读取数据,并把数据写入数据库中;由于Flink已经内置了Kafka source,因此还需要自定义JDBC sink。

2.2 自定义JDBC sink

下面的代码就是一个JDBC sink的实现,其效果就是向PostgreSQL数据库中插入数据,具体请看代码中的注释说明。

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;


public class PostgreSQLSink extends RichSinkFunction<Tuple3<String,String,String>> {

    private static final long serialVersionUID = 1L;

    private Connection connection;
    private PreparedStatement preparedStatement;
    /**
     * open方法是初始化方法,会在invoke方法之前执行,执行一次。
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        // JDBC连接信息
        String USERNAME = "postgres" ;
        String PASSWORD = "********";
        String DRIVERNAME = "org.postgresql.Driver";
        String DBURL = "jdbc:postgresql://192.168.1.213/flink";
        // 加载JDBC驱动
        Class.forName(DRIVERNAME);
        // 获取数据库连接
        connection = DriverManager.getConnection(DBURL,USERNAME,PASSWORD);
        String sql = "insert into kafka_message(
                        timeseq, thread, message) values (?,?,?)";
        preparedStatement = connection.prepareStatement(sql);
        super.open(parameters);
    }

    /**
     * invoke()方法解析一个元组数据,并插入到数据库中。
     * @param data 输入的数据
     * @throws Exception
     */
    @Override
    public  void invoke(Tuple3<String,String,String> data) throws Exception{
        try {
            String timeseq = data.getField(0);
            String thread = data.getField(1);
            String message = data.getField(2);
            preparedStatement.setString(1,timeseq);
            preparedStatement.setString(2,thread);
            preparedStatement.setString(3,message);
            preparedStatement.executeUpdate();
        }catch (Exception e){
            e.printStackTrace();
        }

    };

    /**
     * close()是tear down的方法,在销毁时执行,关闭连接。
     */
    @Override
    public void close() throws Exception {
        if(preparedStatement != null){
            preparedStatement.close();
        }
        if(connection != null){
            connection.close();
        }
        super.close();
    }
}

3. Flink Streaming Job 编程

3.1 Flink Stream编程的步骤

Flink job 编程基本上都是由一些基本部分组成:

  1. 获得一个 execution environment
  2. 加载/创建初始数据(Source)
  3. 指定在该数据上进行的转换(Transformations)
  4. 指定计算结果的存储地方(Sink)
  5. 启动程序执行。

3.2 Kafka-Flink-DB

下面的代码,是一个Flink Job,从Kafka中读取消息,并把消息写到关系型数据库中。

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class KafkaToDB {

    public static void main(String[] args) throws Exception {
        // 解析参数
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        if (parameterTool.getNumberOfParameters() < 4) {
            System.out.println("Missing parameters!");
            System.out.println("\nUsage: Kafka --topic <topic> " +
                    "--bootstrap.servers <kafka brokers> "+
                    "--zookeeper.connect <zk quorum> --group.id <some id>");
            return;
        }

        // 获取StreamExecutionEnvironment。
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
        // create a checkpoint every 5 secodns
        env.enableCheckpointing(5000); 
        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(parameterTool); 

        // source
        DataStream<String> sourceStream = env.addSource(
                new FlinkKafkaConsumer08<String>(parameterTool.getRequired("topic"),
                        new SimpleStringSchema(), parameterTool.getProperties()));
        // Transformation,这里仅仅是过滤了null。
        DataStream<Tuple3<String, String, String>> messageStream = sourceStream
                .map(new InputMap())
                .filter(new NullFilter());
        //sink
        messageStream.addSink(new PostgreSQLSink());

        env.execute("Write into PostgreSQL");
    }
    
    // 过滤Null数据。
    public static class NullFilter implements FilterFunction<Tuple3<String, String, String>>{
        @Override
        public boolean filter(Tuple3<String, String, String> value) throws Exception {
            return value != null;
        }
    }
    
    // 对输入数据做map操作。
    public static class InputMap implements MapFunction<String, Tuple3<String, String, String>> {
        private static final long serialVersionUID = 1L;

        @Override
        public Tuple3<String, String, String> map(String line) throws Exception {
            // normalize and split the line
            String[] arr = line.toLowerCase().split(",");
            if (arr.length > 2) {
                return new Tuple3<>(arr[0], arr[1], arr[2]);
            }
            return null;
        }
    }

}

4. 把Job提交Flink集群

将上面的代码打包成jar后,通过下面的命令把job提交到Flink集群上。其中-c指定了flink-db.jar的Main class,其余的参数是本文job所用的kafka相关的参数。

bin/flink run -c com.bigknow.flink.KafkaToDB examples/flink-db.jar \
--topic my-topic \
 --bootstrap.servers 192.168.1.170:9092 \
--zookeeper.connect 192.168.1.170:2181 \
--group.id test01`

(完)

上一篇 下一篇

猜你喜欢

热点阅读