使用ELK离线同步Mysql数据(非实时)

2019-04-20  本文已影响0人  DreamsonMa

该文内容为利用Logstash的 logstash-input-jdbc 插件同步Mysql数据,实现Mysql到ElasticSearch的数据异构。这个方案非实时,调度任务最短间延1m。

新增方案

Logstashinput-jdbc-plugin插件天然支持全量,增量同步。

更新方案

使用一个更新时间的字段,作为每次Logstash增量更新的tracking_column。这样Logstash每次增量更新就会根据更新时间来作为标记。索引的document_id必须是数据库中的主键, 这样在每次增量更新的时候, 之前ID相同的数据就会被覆盖, 从而达到update的效果。

删除方案

删除是建立在上面更新的原理之上, 就是再加一个删除标记的字段作软删除。

安装ELK

很简单,按官网文档操作一遍就行。这里给出安装文档连接。文档写得很清晰,基本一看就懂,安装方法也多样。完美。Elasticsearch | Logstash | Kibana

核心配置

添加配置后,重启logstash,等一会儿就能看到结果。配置很简单,对着文档看看就能理解。快速略过看结果。。

[root@local14 ~]# /usr/share/logstash/bin/logstash-plugin install logstash-input-jdbc
[root@local14 ~]# mkdir -p /tmp/logstash ;touch /tmp/logstash/last_run_value.txt
[root@local14 ~]# cat /etc/logstash/conf.d/logstash-mysql.conf 
input {
    stdin {
    }
    jdbc {
        jdbc_connection_string => "jdbc:mysql://192.168.56.101:3306/test?autoReconnect=true&characterEncoding=UTF8"
        jdbc_user => "root"
        jdbc_password => "123456"
        jdbc_driver_library => "/mnt/mysql-connector-java-5.1.30.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "1000"
        statement => "SELECT * from user where  test_id > :sql_last_value"
        schedule => "* * * * *"
        record_last_run => true
        last_run_metadata_path => "/tmp/logstash/last_run_value.txt"
        use_column_value => true
        lowercase_column_names =>true
        tracking_column => "test_id"
        tracking_column_type => "numeric"
        type => "jdbc"
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "test"
    document_type => "user"
    document_id => "%{test_id}"
    #user => "elastic"
    #password => "changeme"
  }
  stdout {
    codec => json_lines
  }
}
[root@local14 ~]# systemctl start logstash ; systemctl enable logstash
[root@local14 ~]# tail -f /var/log/logstash/logstash-plain.log 
[2019-04-19T20:42:00,231][INFO ][logstash.inputs.jdbc     ] (0.000951s) SELECT version()
[2019-04-19T20:42:00,236][INFO ][logstash.inputs.jdbc     ] (0.002074s) SELECT version()
[2019-04-19T20:42:00,263][INFO ][logstash.inputs.jdbc     ] (0.010025s) SELECT count(*) AS `count` FROM (SELECT * from user where  test_id > 491604) AS `t1` LIMIT 1
[2019-04-19T20:43:00,045][INFO ][logstash.inputs.jdbc     ] (0.000790s) SELECT version()
[2019-04-19T20:43:00,053][INFO ][logstash.inputs.jdbc     ] (0.000930s) SELECT version()
[2019-04-19T20:43:00,058][INFO ][logstash.inputs.jdbc     ] (0.001285s) SELECT count(*) AS `count` FROM (SELECT * from user where  test_id > 491604) AS `t1` LIMIT 1
[2019-04-19T20:43:00,101][INFO ][logstash.inputs.jdbc     ] (0.015010s) SELECT * FROM (SELECT * from user where  test_id > 491604) AS `t1` LIMIT 1000 OFFSET 0

对比同步结果

数据库结果 Kibana中结果

存在问题

第一,从上面结果,可以看到任务每分钟执行一次,也就是有1分钟的间延。因此,并不适合做实时性要求高的处理。对实时性要求高的,可以使用canal+撸代码解决。
第二,只能做软删除,如果有硬删要求,还请自行撸代码。
第三,比对的标为更新时间,如果出现遗漏处理情况,请自行脑补。

Mark些ES设计技巧

先记录下,后面会另外写篇详细点的article。

1、多表关联查询
canal -> mq -> db -> es 多表关联(mq-db 用主键关联查),存部分字段即可。通过es查出主键,拿主键再查db (es相当于索引库)

2、单表查询
canal -> mq -> es 单表 ,存全量字段,查询只走es (es做db用)

上一篇下一篇

猜你喜欢

热点阅读