flink-cdc 读取mysql数据

2022-07-01  本文已影响0人  远行的夜色

通过flink-cdc的Connector读取mysql数据,并写入到其他系统或者数据库,需要先开启mysql的binlog功能

1. 导入maven 依赖

 <dependencies>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.19</version>
    </dependency>
 <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>2.0.6</version>
    </dependency>
    <!-- 引入日志管理相关依赖-->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-to-slf4j</artifactId>
      <version>2.14.0</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.16.18</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.6</version>
    </dependency>
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.2.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>1.13.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>1.13.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.13.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.13.6</version>
      <type>test-jar</type>
    </dependency>
  </dependencies>

2. 新建Flink-cdc测试类

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

@Slf4j
public class FlinkCDC {


    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("127.0.0.1")
            .port(3306)
            .databaseList("user") // set captured database
            .tableList("user.log_info") // set captured table
            .username("root")
            .password("password")
             // 自定义反序列化方式
            .deserializer(new CustomDeserialization())
             //           StartupOptions.initial() 先全量后增量
//            .startupOptions(StartupOptions.initial())
              //   StartupOptions.latest() 从最新binlog读取,增量方式
            .startupOptions(StartupOptions.latest())
            .build();


        Configuration config = new Configuration();

//        config.setString("execution.savepoint.path", "file:///D:\\flink\\checkpoints\\cc52b93fd24977e5388f0a19a30d49d2\\chk-87");
        // 启动时设置
        if(ArrayUtils.isNotEmpty(args)) {
             String lasCheckpointPath =  args[0];
             // 例如 D:\flink\checkpoints\8bdf5d49bb1b4cda56aaa0a590fc2cef\chk-55
            // 重启服务器指定最新checkpoint路径,从该路径指定checkpoint位置恢复读取数据
             config.setString("execution.savepoint.path", lasCheckpointPath);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

        // enable checkpoint
        env.enableCheckpointing(3000);
//      env.getCheckpointConfig().setCheckpointStorage("file:///D:\\flink\\checkpoints");
// 设置checkpoint保存位置,这里设置为本地文件存储
        env.setStateBackend(new FsStateBackend("file:///D:\\flink\\checkpoints"));
        env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            // set 4 parallel source tasks
            .setParallelism(4)
            .addSink(new CustomSink()).setParallelism(1);
//            .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("flinkcdc");
    }
}

3.自定义反序列化类

import com.alibaba.fastjson2.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/**
 * 自定义序列化器
 */
public class CustomDeserialization implements DebeziumDeserializationSchema<String> {



    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
        throws Exception {

        JSONObject res = new JSONObject();

        // 获取数据库和表名称
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct value = (Struct) sourceRecord.value();
        // 获取before数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        // 获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //获取操作类型 READ DELETE UPDATE CREATE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        // 将字段写到json对象中
        res.put("database", database);
        res.put("tableName", tableName);
        res.put("before", beforeJson);
        res.put("after", afterJson);
        res.put("type", type);

        //输出数据
        collector.collect(res.toString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

4.自定义Sink输出

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class CustomSink extends RichSinkFunction {


    @Override
    public void invoke(Object value,Context context) throws Exception {
        String v = value.toString();

         TableData<LogInfo>  tableData = JSON.parseObject(v,  new 
            TypeReference<TableData<LogInfo>>() {});
           System.out.println(t.toString());
            // TODO 保存到其他系统/中间件(mq等)/其他数据库,同学可以自己根据情况实现
            //  发送到消息队列rabbitmq或者kafaka中处理
           // rabbitmqtemplate.send(tableData );
          //  保存数据库
          //  testMapper.insert(tableData.getAfter());
    }
}


@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableData<T> {

    private String database;

    private String tableName;

    private String update;

    private T before;

    private T after;
}

上一篇下一篇

猜你喜欢

热点阅读