Logstash 从Mysql同步数据到ES
从Mysql
导出上亿数据到ES
中,Logstash
可所谓很方便的工具了,ES
详细的安装步骤前面文章有写(ElasticSearch + Kibana基础搭建),就不赘述了。
这节主要讲下Logstash
的使用:Download Logstash官网地址
首先下载logstash-7.12.0
上传到linux服务器然后解压即可,版本最好是跟es
的保持一致
1.进入logstash-7.12.0
文件夹
$ cd logstash-7.12.0/
2.进入logstash-7.12.0
安装目录,新建一个文件夹,这里面等会需要放你的配置文件,当然你也可以直接放在安装目录
$ mkdir my
3.进入新建文件夹
$ cd my
4.创建一个配置文件,这个conf
中配置你的mysql
链接、过滤规则以及es
的地址和索引模板
$ touch logstash-test-log-sync.conf
5.下载mysql的驱动到my
文件夹下
mysql-connector-java-8.0.19.jar
6.如果你的sql
很复杂,最好是使用文件sql
文件,然后在conf
中配置sql
文件的路径,当然你也可以直接写在conf
文件中。我这里选择使用sql
文件,新建一个sql
文件
$ touch my.sql
写入自己的sql
$ vim my.sql
自定义my
文件夹的目录结构
完整的logstash-test-log-sync.conf
配置
input {
jdbc {
# 设置 MySql/MariaDB 数据库url以及数据库名称
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/ps_manager_test?useUnicode=true&allowMultiQuerie=true&characterEncoding=utf-8&serverTimezone=UTC"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "123456"
# 数据库驱动mysql-connector-java-8.0.19.jar所在位置,可以是绝对路径或者相对路径
jdbc_driver_library => "/usr/local/nbin/logstash-7.12.0/my/mysql-connector-java-8.0.19.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 是否开启分页,ture为开启,我这里sql比较复杂所以就放弃使用这个,后面细讲
jdbc_paging_enabled => false
# 分页每页数量
jdbc_page_size => "50"
# 设置时区
jdbc_default_timezone =>"Asia/Shanghai"
# 执行的sql文件路径
statement_filepath => "/usr/local/nbin/logstash-7.12.0/my/my.sql"
#使用这个可以直接写sql语句,但是复杂的语句最好是写在文件内
#statement =>
# 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
schedule => "* * * * *"
#是否需要记录某个字段值,如果为true,我们可以自定义要记录的数据库某个字段值,例如id或date字段。如果为false,记录的是上次执行的标记,默认是一个timestamp
use_column_value => true
#记录上次执行字段值路径。我们可以在sql语句中这么写:WHERE ID > :last_sql_value。其中 :sql_last_value 取得就是该文件中的值,这个last_id会以文件形式存在,上面截图有
last_run_metadata_path => "/usr/local/nbin/logstash-7.12.0/my/last_id"
#如果use_column_value为真,需配置此参数. 指定增量更新的字段名。当然该字段必须是递增的,比如id或date字段。
tracking_column => "id"
# tracking_column 对应字段的类型,只能选择timestamp或者numeric(数字类型),默认numeric,所以可以不写这个配置
tracking_column_type => "numeric"
#如果为true,每次会记录所更新的字段的值,并保存到 last_run_metadata_path 指定的文件中
record_last_run => true
# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
clean_run => false
# 是否将字段名称转小写。默认是true。这里注意Elasticsearch是区分大小写的
lowercase_column_names => false
}
}
##过滤、格式化数据 这段单独有讲解,这里就不细说了
filter{
mutate {
add_field => {"temp_ts" => "%{actionTimeStamp}"}
}
date {
match => ["temp_ts", "ISO8601"]
target => "@timestamp"
}
mutate {
remove_field => ["@version","temp_ts","actionTimeStamp"]
}
}
output {
elasticsearch {
# es地址 集群数组hosts => ["127.0.0.1:9200","127.0.0.1:9201"]
hosts => ["127.0.0.1:9200","127.0.0.1:9201"]
# 同步的索引名必须要有@timestamp 不然yyyyMM不起效
index => "ps_sign_log%{+yyyy}"
# 设置_docID和数据相同
document_id => "%{id}"
#自定的模板名称
template_name => "ps_seal_log"
#自定义的模板配置文件
template => "/usr/local/nbin/logstash-7.12.0/my/ps_test_log_template.json"
#是否重写模板
template_overwrite => true
}
# 日志输出形式设置
stdout {
codec => json
#codec => rubydebug
}
}
其他的配置已经在上面的conf中注释了,这里主要单独说一下jdbc_paging_enabled
、jdbc_page_size
这两个参数
jdbc_paging_enabled
: 开启JDBC启用分页
- 值类型为布尔值
- 默认值为
false
jdbc_page_size
:这将导致sql语句分解为多个查询。每个查询将使用限制和偏移量来集体检索完整的结果集。限制大小通过设置jdbc_page_size
。
- 值类型是数字
- 默认值为
100000
分批处理的结果集意思就是,比如你sql
里设置了查询1000
条数据,而你jdbc_page_size
设置的是500
,那么他就会分为两个sql语句,第一条是0~500
,第二条是500~1000
,你可以观察下日志,最后打印的全部sql是这样的:
SELECT * FROM (你的sql查询语句) AS `t1` LIMIT 0 OFFSET 500
SELECT * FROM (你的sql查询语句) AS `t1` LIMIT 500 OFFSET 500
注意哈,别理解成它是帮你sql设定大小分页
比如我开始的sql
,是这样写的,因为数据量特别大,所以一直无法返回数据:
错误
select id,seal_id,user_id,business_type,action_time from ps_log where id > :sql_last_value
正确✔️
select id,seal_id,user_id,business_type,action_time from ps_log where id > :sql_last_value limit 1000
ps_test_log_template
自定义模板的说明
ES
会根据你输入的数据,自己会对数据类型映射,所以你也可以不用自定义模板,也就是说下面这个三个配置可以去掉,也能正常同步数据到ES
#自定的模板名称
template_name => "ps_seal_log"
#自定义的模板配置文件
template => "/usr/local/nbin/logstash-7.12.0/my/ps_test_log_template.json"
#是否重写模板
template_overwrite => true
自定义模板索引千万注意不要加这个,不然logstash
不会去es创建索引模板。
manage_template => false
如果自定义模板ps_test_log_template
,在启动logstash
时,会有打印
模板索引创建详见之前的文章 ElasticSearch + Kibana基础搭建
最后启动Logstash
./bin/logstash -f ./my/logstash-seal-log-sync.conf