【elasticsearch实操】1、logstash导入mys
2020-03-21 本文已影响0人
cutieagain
es安装
logstash安装
下载mysql的connector的jar包
mysql的connector的jar包
我这里是用的是mysql-connector-java-5.1.38.jar
安装相关插件
#安装jdbc的入插件
bin/logstash-plugin install logstash-input-jdbc
#安装elasticsearch的出插件
bin/logstash-plugin install logstash-output-elasticsearch
编写logstash-mysql-es.conf文件用于启动数据传输
input {
jdbc {
# mysql相关jdbc配置
jdbc_connection_string => "jdbc:mysql://127.0.0.1/database_name?useUnicode=true&characterEncoding=utf8&useSSL=true"
jdbc_user => "xxx"
jdbc_password => "xxx"
# jdbc连接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/
jdbc_driver_library => "../config/jar/mysql-connector-java-5.1.38.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => "50000"
jdbc_default_timezone =>"Asia/Shanghai"
# mysql文件, 也可以直接写SQL语句在此处,如下:
statement => "select * from exp_store where create_time >= :sql_last_value"
# statement_filepath => "./config/jdbc.sql"
# 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
schedule => "* * * * *"
#type => "jdbc"
# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
#record_last_run => true
# 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => true
# 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
tracking_column => "create_time"
tracking_column_type => "timestamp"
last_run_metadata_path => "./logstash_ineyes_exp_store_last_id"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
#是否将 字段(column) 名称转小写
lowercase_column_names => false
}
}
output {
elasticsearch {
hosts => "127.0.0.1:9200"
index => "ineyes_exp_store"
document_id => "%{id}"
template_overwrite => true
}
# 这里输出调试,正式运行时可以注释掉
stdout {
codec => json_lines
}
}
启动数据传输
#启动数据传输
nohup bin/logstash -f config/transfer-config/logstash-mysql-es.conf > logs/logstash.out &
启动问题
block in load_driver_jars
Sending Logstash logs to /Users/cutie/workspace/elasticsearch/logstash-7.6.0/logs which is now configured via log4j2.properties
[2020-03-21T17:07:30,361][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-03-21T17:07:30,466][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.6.0"}
[2020-03-21T17:07:32,347][INFO ][org.reflections.Reflections] Reflections took 33 ms to scan 1 urls, producing 20 keys and 40 values
[2020-03-21T17:07:33,115][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
[2020-03-21T17:07:33,281][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://127.0.0.1:9200/"}
[2020-03-21T17:07:33,326][INFO ][logstash.outputs.elasticsearch][main] ES Output version determined {:es_version=>7}
[2020-03-21T17:07:33,330][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-03-21T17:07:33,527][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//127.0.0.1:9200"]}
[2020-03-21T17:07:33,571][INFO ][logstash.outputs.elasticsearch][main] Using default mapping template
[2020-03-21T17:07:33,604][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team.
[2020-03-21T17:07:33,609][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>12, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1500, "pipeline.sources"=>["/Users/cutie/workspace/elasticsearch/logstash-7.6.0/config/transfer-config/logstash-mysql-es.conf"], :thread=>"#<Thread:0x4f7fd90b run>"}
[2020-03-21T17:07:33,631][INFO ][logstash.outputs.elasticsearch][main] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-03-21T17:07:33,649][INFO ][logstash.outputs.elasticsearch][main] Installing elasticsearch template to _template/logstash
[2020-03-21T17:07:34,622][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-03-21T17:07:34,683][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-03-21T17:07:34,976][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
/Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/cronline.rb:77: warning: constant ::Fixnum is deprecated
{ 2054 rufus-scheduler intercepted an error:
2054 job:
2054 Rufus::Scheduler::CronJob "* * * * *" {}
2054 error:
2054 2054
2054 LogStash::PluginLoadingError
2054 unable to load /Users/cutie/workspace/elasticsearch/logstash-7.6.0/config/transfer-config/logstash-mysql-es.conf from :jdbc_driver_library, no such file to load -- /Users/cutie/workspace/elasticsearch/logstash-7.6.0/config/transfer-config/logstash-mysql-es.conf
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:151:in `block in load_driver_jars'
2054 org/jruby/RubyArray.java:1814:in `each'
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:144:in `load_driver_jars'
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:166:in `open_jdbc_connection'
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/plugin_mixins/jdbc/jdbc.rb:242:in `execute_statement'
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/inputs/jdbc.rb:309:in `execute_query'
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/logstash-integration-jdbc-5.0.0/lib/logstash/inputs/jdbc.rb:276:in `block in run'
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:234:in `do_call'
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:258:in `do_trigger'
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:300:in `block in start_work_thread'
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:299:in `block in start_work_thread'
2054 org/jruby/RubyKernel.java:1446:in `loop'
2054 /Users/cutie/workspace/elasticsearch/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:289:in `block in start_work_thread'
2054 tz:
2054 ENV['TZ']:
2054 Time.now: 2020-03-21 17:08:00 +0800
2054 scheduler:
2054 object_id: 2052
2054 opts:
2054 {:max_work_threads=>1}
2054 frequency: 0.3
2054 scheduler_lock: #<Rufus::Scheduler::NullLock:0x403e43b8>
2054 trigger_lock: #<Rufus::Scheduler::NullLock:0x7eba4eef>
2054 uptime: 26.022160999999997 (26s23)
2054 down?: false
2054 threads: 2
2054 thread: #<Thread:0x67e9c5b3>
2054 thread_key: rufus_scheduler_2052
2054 work_threads: 1
2054 active: 1
2054 vacant: 0
2054 max_work_threads: 1
2054 mutexes: {}
2054 jobs: 1
2054 at_jobs: 0
2054 in_jobs: 0
2054 every_jobs: 0
2054 interval_jobs: 0
2054 cron_jobs: 1
2054 running_jobs: 1
2054 work_queue: 0
}
需要检查配置文件,检查数据库连接是否正确等
成功运行启动
启动后会定时根据sql拉取指定的数据已供后续的操作
2020-03-21T23:13:00,350][INFO ][logstash.inputs.jdbc ][main] (0.006952s) SELECT version()
[2020-03-21T23:13:00,373][INFO ][logstash.inputs.jdbc ][main] (0.019545s) SELECT version()
[2020-03-21T23:13:00,391][INFO ][logstash.inputs.jdbc ][main] (0.013471s) SELECT count(*) AS `count` FROM (select * from exp_store where create_time >= '2020-01-16 15:00:34') AS `t1` LIMIT 1
[2020-03-21T23:13:00,417][INFO ][logstash.inputs.jdbc ][main] (0.023679s) SELECT * FROM (select * from exp_store where create_time >= '2020-01-16 15:00:34') AS `t1` LIMIT 50000 OFFSET 0
{"time_out":null,"create_time":"2020-01-16T07:00:34.000Z","retention_pay_amount":0,"exp_mail_id":2114,"express_name":null,"receiver_city":null,"receiver_address":null,"exp_no":null,"retention_status":0,"paid_exp_delay_reward":false,"express_id":null,"receiver_province":null,"courier_id":null,"update_time":"2020-01-16T07:00:34.000Z","arriver_time":"2020-01-16T07:00:34.000Z","push_type":null,"exp_locker_no":"C46E7B1A1D1D","exp_store_success_reason":null,"quit_store_exp_reason":null,"exp_type":"EXP_LOCKER","receiver_phone":null,"exp_agent_company_id":-1,"reverse_receiver_phone":null,"exp_locker_detail_id":150,"exp_locker_manager_id":12,"paid_exp_delivery_pay":false,"exp_locker_detail_money":0,"express_status":9,"take_delivery_code":null,"waybill_no":null,"push_flag":null,"exp_locker_id":5,"shop_id":null,"id":2106,"receiver_district":null,"leave_time":null,"delivery_pay_amount":0,"@version":"1","receiver_name":null,"is_secret":false,"@timestamp":"2020-03-21T15:13:00.419Z"}
配置logstash以服务方式运行
https://www.jianshu.com/p/54cdddf89989
如何优雅关停logstash:https://www.elastic.co/guide/en/logstash/current/shutdown.html
数据库新增字段同步是否有影响
数据库新增字段,必须先在es中声明新字段的mapping信息,新增后再在数据库中新增并进行同步;原来的数据无影响