Flink 自定义UDTF函数 同步数组类型到ES中
2022-03-07 本文已影响0人
lodestar
将Mysql中 test表同步到ES中,并且将tags(逗号分隔的字符串)转化数组同步到ES中的数组。
Mysql中test表结构
CREATE TABLE `test` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`tags` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8mb4 COLLATE=utf8mb4_general_ci
数据如下:
image.png
ES中数据结构
PUT info-flow-test3
{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"tags": {
"type": "keyword"
}
}
}
}
Flink 中
CREATE TABLE es_info_flow_test3 (
id string,
tags ARRAY < string >,
PRIMARY KEY (id) NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。
)
WITH (
'connector' = 'elasticsearch-7',
'hosts' = '127.0.0.1:9200',
'index' = 'info-flow-test3',
'username' = 'elastic',
'password' = '123456'
);
CREATE TABLE mysqlcdc_test (
id INT, tags string, PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'admin',
'password' = '123456',
'database-name' = 'test_db',
'table-name' = 'test'
);
运行Flink任务脚本如下:
insert into es_info_flow_test3 (id, tags)
select CAST(t.id as STRING) as id, t
from
mysqlcdc_test t, lateral table (ASI_UDTF (`tags`)) as T (t)
自定义UDTF函数参考阿里云链接,注意需要使用java8
https://help.aliyun.com/document_detail/188055.html
上传jar包后,如果返回如下表明包可以上传。
查看tags的类型
POST info-flow-test3/_search
返回值:
{
"took" : 0,
"timed_out" : false,
"max_score" : 1.0,
"hits" : [
{
"_index" : "info-flow-test3",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"id" : "2",
"tags" : [
"3",
"4",
"5"
]
}
},
{
"_index" : "info-flow-test3",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"id" : "3",
"tags" : [
"6"
]
}
}
]
}
}