FlinkSql cdc 从mysql表join之后写入elas
2020-08-12 本文已影响0人
Jaming
github地址:https://github.com/ververica/flink-cdc-connectors
Flink CDC Connectors支持:
MySQL Database: 5.7, 8.0.x
JDBC Driver: 8.0.16
PostgreSQL Database: 9.6, 10, 11, 12
JDBC Driver: 42.2.12
之前我们可以用debezium,或者canal将binlog采集到kafka,现在有了CDC
connectors的支持,它内置了debezium,简化了数据采集链路,使我们开发运维更方便了。
- mysql中有两张表
orders
customers - orders表作为事实表,customers表作为维度表,使用left join连接
-
首先需要将jar包放到flink的lib目录下
flink-sql-connector-mysql-cdc-1.0.0.jar
flink-sql-connector-elasticsearch7_2.11-1.11.1.jar
我的flink lib:
image.png
-
启动flink集群
./bin/start-cluster.sh -
启动sqlclient
./bin/sql-client.sh embedded -
mysql中创建两张表
customers:
CREATE TABLE `test11`.`customers` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`first_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`last_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`email` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
orders:
CREATE TABLE `test11`.`orders` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`price` decimal(10, 2) DEFAULT NULL,
`cid` int(11) DEFAULT NULL,
`pid` int(11) DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
5.在sql-client中创建三个表
CREATE TABLE customers (
id INT NOT NULL,
first_name STRING,
last_name STRING,
email STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xiao100',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'test11',
'table-name' = 'customers'
);
CREATE TABLE orders (
id INT NOT NULL,
price DECIMAL(10,3),
cid INT,
pid INT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xiao100',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'test11',
'table-name' = 'orders'
);
CREATE TABLE res (
id INT,
first_name STRING,
price DECIMAL(10,3),
primary key (id) not enforced
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://xiao100:9200',
'index' = 'res'
);
然后在sql-client中执行语句:
insert into res
select o.id,c.first_name,o.price from orders o
left join customers c on c.id = o.cid;
-
在flink的web中查看任务执行情况:
image.png
在mysql中向两个表中插入数据,


打开kibana查看结果:

可以更新维度表,再看看结果表是否更新。