利用logstash将mysql多表数据增量同步到es
2020-07-01 本文已影响0人
Sunny捏
同步原理:
第一次发送sql请求查询,修改时间参数值是为系统最开始的时间(1970年),可以查询的
到所有大于1970年的数据,并且会将最后一条数据的update_time时间记录下来,
作为下一次定时查询的条件
一、启动es + kibana
如何安装,以及如何运行,这里就不做描述,没有装过的,可以参考我的这篇文章
https://www.jianshu.com/p/f52d9c843bd8
二、安装mysql
查询mysql版本
docker search mysql
通过docker下载MySQL5.7版本
如何安装docker,不是本文重点,这里不做多描述
docker pull mysql:5.7 (这里选择的是第一个mysql镜像, :5.7选择的5.7版本)
docker pull mysql # 拉取最新版mysql镜像
运行mysql
docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7
账号:root
密码:123456
通过工具连接mysql
新建数据库
DROP DATABASE IF EXISTS `test`;
CREATE DATABASE `test` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
USE test;
#商品表
DROP TABLE IF EXISTS `goods`;
CREATE TABLE `goods` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `goods` VALUES (1, '黎明电脑', '2020-07-01 00:40:19');
INSERT INTO `goods` VALUES (2, '黎明手机', '2020-07-01 00:40:32');
#用户表
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `user` VALUES (1, '黎明1', '2020-07-01 00:40:01');
INSERT INTO `user` VALUES (2, '黎明2', '2020-07-01 00:40:09');
现在已经启动了3台容器了
三、下载logstash源码包
官方地址
https://www.elastic.co/cn/downloads/logstash
国内加速下载网址
https://www.newbe.pro/Mirrors/Mirrors-Logstash/
下载地址
wget https://mirrors.huaweicloud.com/logstash/6.7.2/logstash-6.7.2.zip
下载zip命令解压
yum -y install zip
解压
unzip logstash-6.7.2.zip
四、下载mysql驱动
可能会有人疑问?为什么要下载mysql驱动
因为logstash需要连接mysql,并查询表数据,才确定是否同步数据
如下,是maven仓库,所有版本mysql驱动连接
https://mvnrepository.com/artifact/mysql/mysql-connector-java
我的数据库是5.7版本,我这里下载5.1.47的驱动了,当然如果你们的数据库是8.0以上的版本,那么就下相应的版本就行
现在两种下载方式
1.下载到本地,然后通过ftp工具上传到服务器
2.在服务器上下载,右击复制链接地址,通过wget命令下载即可
五、进入logstash目录,安装同步插件
安装会有点慢,大概2分钟左右吧
bin/logstash-plugin install logstash-input-jdbc
bin/logstash-plugin install logstash-output-elasticsearch
六、添加Mysql与ES同步配置
进入logstash/config目录下,新建 user.conf文件
vim user.conf
添加内容
input {
jdbc {
jdbc_driver_library => "/usr/local/software/my/mysql-connector-java-5.1.47.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.137.11:3306/test"
jdbc_user => "root"
jdbc_password => "123456"
schedule => "* * * * *"
statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time"
last_run_metadata_path => "syncpoint_table"
}
}
output {
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.137.11:9200"]
# 索引名称 可自定义
index => "user"
# 需要关联的数据库中有有一个id字段,对应类型中的id
document_id => "%{id}"
document_type => "user"
}
stdout {
# JSON格式输出
codec => json_lines
}
}
进入bin目录启动
./logstash -f ../config/user.conf
可以看到下图,如我标记的地方,logstash在第一次进行同步数据,会先从1970年开始,进行一次同步数据
之后每隔一分钟,会以最后的update_time作为条件,查询是否同步数据,如果查询的结果update_time时间大于所记录的update_time时间,则会继续同步数据,接下来在记录最后一次同步的update_time时间,依次类推
然后我们通过kibana,查询一下我们的索引结果
七、多表同步
到此,我们的单表同步已经完成,接下来我们开始实现多表同步
规则如下:
一个表,一个配置
多个表,多个配置
需要同步多少表,就需要加多少配置
当然配置的内容都差不多,改的地方是查询的表名,和es的索引以及类型的名称
添加第二张表的配置,配置就是上面的配置,稍微改动即可
进入logstash/config目录,修改配置文件
vim pipelines.yml
编辑文件
直接到最后,添加配置
- pipeline.id: table1
path.config: "/usr/local/software/my/logstash-6.7.2/config/user.conf"
- pipeline.id: table2
path.config: "/usr/local/software/my/logstash-6.7.2/config/goods.conf"
启动方式稍有改变
进入bin目录
./logstash
这里goods同步,为什么不是1970年呢,因为之前同步一次过,logstash会帮你记录,所以就以logstash最后一次同步时间计算
现在商品表也同步数据了
那如何证明,能够多表同步呢,很简单,我们修改两个表的数据,看是否都能查询的到,如下图,就可以证明商品表和用户表,都是根据各自表的最后时间进行同步的数据的
注意:有数据才会创建索引哦