canal 系列:ES中nested嵌套类型同步
在日常的业务开发场景中,像 一个人有多套房子,多个住址 ,一篇文章中有多个评论这种需求还是非常常见的。当我们使用 Elasticsearch 来进行存储时, ES 的字段类型是 nested 类型 ,虽然这个效率不高。
当我们使用 canal 对数据进行增量同步到 ES 时,canal-adapter 是否是支持 nested 类型呢?
查看 issue 提问
好遗憾。大佬说暂时不支持。
在本想放弃的时候,看到 issue 中有位小伙伴说,配置 object 兼容 nested ,但是并没有给出解决方案。废话不多说,实践一把,走起。
1. 数据处理
1.1 创建 存在 字段类型为 nested 的索引 canal_test
首先创建 elasticsearch的索引名为 canal_test ,其中 addresses 字段是 nested 类型
curl -XPUT "http://localhost:9200/canal_test" -H 'Content-Type: application/json' -d'
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"dynamic": false,
"properties": {
"addresses": {
"type": "nested",
"properties": {
"address": {
"analyzer": "ik_max_word",
"type": "text"
},
"houseId": {
"type": "long"
},
"zxbs": {
"type": "long"
},
"jwhdm": {
"type": "keyword"
},
"id": {
"type": "long"
}
}
},
"death": {
"type": "boolean"
},
"gender": {
"type": "keyword"
},
"nation": {
"type": "keyword"
},
"zxbs": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"fulltext": {
"type": "text"
}
}
},
"residentId": {
"type": "long"
},
"type": {
"type": "long"
}
}
}
}'
1.2 表结构
创建 两张表 t_address(地址表),t_rk (人口表)。一个人可以有多个地址,一对多的关系
CREATE TABLE `t_address` (
`address` varchar(255) DEFAULT NULL,
`houseId` int(11) DEFAULT NULL,
`zxbs` int(11) DEFAULT NULL,
`jwhdm` varchar(255) DEFAULT NULL,
`id` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `t_rk` (
`residentId` int(11) NOT NULL AUTO_INCREMENT,
`type` int(255) DEFAULT NULL,
`zxbs` varchar(255) DEFAULT NULL,
`name` varchar(255) DEFAULT NULL,
`nation` varchar(255) DEFAULT NULL,
`gender` varchar(255) DEFAULT NULL,
`death` char(1) DEFAULT NULL,
PRIMARY KEY (`residentId`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4;
1.3测试数据
INSERT INTO `demo`.`t_rk`(`residentId`, `type`, `zxbs`, `name`, `nation`, `gender`, `death`) VALUES (3, 1, '1', 'huang123', '113', '1', '1');
INSERT INTO `demo`.`t_address`(`address`, `houseId`, `zxbs`, `jwhdm`, `id`) VALUES ('厦门', 12, 23, '23', 3);
INSERT INTO `demo`.`t_address`(`address`, `houseId`, `zxbs`, `jwhdm`, `id`) VALUES ('漳州', 233, 23, '23', 3);
INSERT INTO `demo`.`t_address`(`address`, `houseId`, `zxbs`, `jwhdm`, `id`) VALUES ('泉州', 233, 23, '23', 3);
2. canal-adapter 配置
默认已经熟悉 canal 和 canal-adapter 的使用。在同步到 es 中,我们知道需要为每个索引配置一份 yml 的配置文件,下面创建canal_test.yml 文件 ,同步配置如下
2.1 nested 配置的正确姿势
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: canal_test
_type: _doc
_id: _id
upsert: true
# pk: id
sql: "SELECT
t1.residentId AS _id,
t1.`name`,
t1.death,
t1.gender,
t1.nation,
t1.type,
t1.zxbs,
CONCAT('[',c.address,']') AS addresses
FROM
t_rk t1
LEFT JOIN (
SELECT
id,
GROUP_CONCAT(JSON_OBJECT('address',address,'houseId',houseId)) AS address
FROM
t_address
GROUP BY
id
) c ON c.id = t1.residentId"
objFields:
addresses: object
# etlCondition: "where a.c_time>={}"
commitBatch: 3000
* 重点关注
SELECT
t1.residentId AS _id,
t1.`name`,
t1.death,
t1.gender,
t1.nation,
t1.type,
t1.zxbs,
CONCAT('[',c.address,']') AS addresses
FROM
t_rk t1
LEFT JOIN (
SELECT
id,
GROUP_CONCAT(JSON_OBJECT('address',address,'houseId',houseId)) AS address
FROM
t_address
GROUP BY
id
) c ON c.id = t1.residentId"
配置中的关键
objFields:
addresses: object
2. 测试同步情况
* 获取第 1 步中准备的sql,执行测试数据 sql
canal 执行日志
3. 同步结果
4. 搜索验证
GET canal_test/_search
{
"query":{
"bool": {
"must": [
{
"nested": {
"path": "addresses",
"query": {
"bool": {
"must": [
{
"match_phrase": {
"addresses.address": "漳州"
}
}
]
}
}
}
}
]
}
}
}
查询结果:
好了 ,今天的实践就到这里。你学废了吗?
这边格式有点奇怪,有需要可以到公众号看
https://mp.weixin.qq.com/s/jIVxxHp9GsE2WL2DNPsVJQ