flink-cdc 读取mysql数据
2022-07-01 本文已影响0人
远行的夜色
通过flink-cdc的Connector读取mysql数据,并写入到其他系统或者数据库,需要先开启mysql的binlog功能
1. 导入maven 依赖
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.6</version>
</dependency>
<!-- 引入日志管理相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.6</version>
<type>test-jar</type>
</dependency>
</dependencies>
2. 新建Flink-cdc测试类
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@Slf4j
public class FlinkCDC {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(3306)
.databaseList("user") // set captured database
.tableList("user.log_info") // set captured table
.username("root")
.password("password")
// 自定义反序列化方式
.deserializer(new CustomDeserialization())
// StartupOptions.initial() 先全量后增量
// .startupOptions(StartupOptions.initial())
// StartupOptions.latest() 从最新binlog读取,增量方式
.startupOptions(StartupOptions.latest())
.build();
Configuration config = new Configuration();
// config.setString("execution.savepoint.path", "file:///D:\\flink\\checkpoints\\cc52b93fd24977e5388f0a19a30d49d2\\chk-87");
// 启动时设置
if(ArrayUtils.isNotEmpty(args)) {
String lasCheckpointPath = args[0];
// 例如 D:\flink\checkpoints\8bdf5d49bb1b4cda56aaa0a590fc2cef\chk-55
// 重启服务器指定最新checkpoint路径,从该路径指定checkpoint位置恢复读取数据
config.setString("execution.savepoint.path", lasCheckpointPath);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
// enable checkpoint
env.enableCheckpointing(3000);
// env.getCheckpointConfig().setCheckpointStorage("file:///D:\\flink\\checkpoints");
// 设置checkpoint保存位置,这里设置为本地文件存储
env.setStateBackend(new FsStateBackend("file:///D:\\flink\\checkpoints"));
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.addSink(new CustomSink()).setParallelism(1);
// .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("flinkcdc");
}
}
3.自定义反序列化类
import com.alibaba.fastjson2.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
* 自定义序列化器
*/
public class CustomDeserialization implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
throws Exception {
JSONObject res = new JSONObject();
// 获取数据库和表名称
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct) sourceRecord.value();
// 获取before数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
// 获取after数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//获取操作类型 READ DELETE UPDATE CREATE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
// 将字段写到json对象中
res.put("database", database);
res.put("tableName", tableName);
res.put("before", beforeJson);
res.put("after", afterJson);
res.put("type", type);
//输出数据
collector.collect(res.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
4.自定义Sink输出
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class CustomSink extends RichSinkFunction {
@Override
public void invoke(Object value,Context context) throws Exception {
String v = value.toString();
TableData<LogInfo> tableData = JSON.parseObject(v, new
TypeReference<TableData<LogInfo>>() {});
System.out.println(t.toString());
// TODO 保存到其他系统/中间件(mq等)/其他数据库,同学可以自己根据情况实现
// 发送到消息队列rabbitmq或者kafaka中处理
// rabbitmqtemplate.send(tableData );
// 保存数据库
// testMapper.insert(tableData.getAfter());
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableData<T> {
private String database;
private String tableName;
private String update;
private T before;
private T after;
}