logstash 同步 mysql到elasticSearch

2021-10-29  本文已影响0人  王宣成

下载mysql jdbc包
https://dev.mysql.com/downloads/connector/j/

image.png

解压放入doker容器目录

 /usr/share/logstash/config/mysql-connector-java-8.0.27.jar

elk 安装配置 https://www.jianshu.com/p/3043c1644868

新建blog库 user表,article表

新建文件 logstash/pipeline/jdbc.conf

input {
    stdin {
    
    }
    
    jdbc {
        jdbc_driver_library => "/usr/share/logstash/config/mysql-connector-java-8.0.27.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://172.21.28.138:3306/blog"
        jdbc_user => "blog"
        jdbc_password => "blog"
        
        # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年),默认每分钟一次
        schedule => "* * * * *"
        
        # 需要执行的sql
        statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"
        #statement_filepath => "/usr/local/logstash/sql/user.sql"
        
        # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
        use_column_value => true
        
        # numeric 数值类型 || timestamp 时间戳
        tracking_column_type => "timestamp"
        # 递增字段的名称 如果 use_column_value 为真,需配置此参数
        tracking_column => "update_time"
        
        # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        #record_last_run => true
        #last_run_metadata_path => "./logstash_capital_bill_last_id"
        
        # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        clean_run => false
        
        type => "jdbc-blog-user"
    }
    
    jdbc {
        jdbc_driver_library => "/usr/share/logstash/config/mysql-connector-java-8.0.27.jar"
        jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://172.21.28.138:3306/blog"
        jdbc_user => "blog"
        jdbc_password => "blog"
        schedule => "* * * * *"
        statement => "SELECT * FROM article WHERE update_time >= :sql_last_value"
        use_column_value => true
        tracking_column_type => "timestamp"
        tracking_column => "update_time"
        clean_run => false
        type => "jdbc-blog-article"
    }
}

filter {
    
}

output {

    if [type] == "jdbc-blog-user" { 
        elasticsearch {
            hosts => "172.21.28.138:9200"
            user => "elastic"
            password => "123456"
            index => "jdbc-blog-user"
            document_id => "%{id}"
        }
    }
     
    if [type] == "jdbc-blog-article" { 
        elasticsearch {
            hosts => "172.21.28.138:9200"
            user => "elastic"
            password => "123456"
            index => "jdbc-blog-article"
            document_id => "%{id}"
        }
     }

}


上一篇下一篇

猜你喜欢

热点阅读