logstash 根据时间增量抽取
2020-08-10 本文已影响0人
觉释
input {
jdbc {
#数据库驱动路径(linux写法:/home/mysql/mysql-connector-java-5.1.24.jar)
jdbc_driver_library => "D:\Program Files\logstash-7.8.0\mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#数据库连接相关配置()
jdbc_connection_string => "jdbc:mysql://localhost:3306/testdata"
jdbc_user => "root"
jdbc_password => "root"
#任务计划,多久执行一次,在此每分钟执行一次(每分钟开始,和cron位不一样"*/5 * * * *")
schedule =>"* * * * *"
#启用追踪,则需要指定tracking_column,默认是timestamp()
use_column_value => true
#指定追踪的字段,在此我设置的追踪的字段为updateTime(这个是es的对应字段不是sql)
tracking_column => "updateTime"
#追踪字段的类型,目前只有数字(numeric)和时间类型(timestamp),默认是数字类型()
tracking_column_type => "timestamp"
#是否将字段名称转小写,当字段已经为小写时,不用此项()
lowercase_column_names => false
#记录最后一次运行的结果()
record_last_run => true
#上面运行结果的保存位置(新建个文件,会把上次sql_last_value记录下来,也就是更新时间)
last_run_metadata_path => " \Program Files\logstash-7.8.0\bin\jdbc-position.txt"
#执行的语句,也可以通过statement_filepath指定sql的文件()
#注意执行语句后的()where update_time>:sql_last_value
#判断是否更新的主要环节会记录上次运行的时间并且下次执行时大于存储记录的时间则对mysql中该行读取进行更新()
#对于设置id也就是数值型做更新字段的话并不适用于增量更新的逻辑尽量不去使用()
statement => "SELECT id,name,update_time as updateTime FROM testdb where update_time >:sql_last_value"
#我们设置的字段看下es mapping的说明()
#使用sql文件来读取
#statement_filepath => "/data/test.sql"
#如果配置多个数据源需要用type来区分
type => "testdb"
}
}
#这部分我也不会()
filter {
}
output {
#使用if语句判断type来指定输出的块()
if[type]=="testdb"{
#es输出块(当然logstach还有其他的输出)
elasticsearch{
#ES的ip地址和端口(也可以写成["",""]多个)
hosts => "http://localhost:9200"
#ES索引库名称()
index => "testdb"
#ES主键()
document_id => "%{id}"
#ESType()
document_type => "testdb"
}
}
stdout {
# JSON格式输出()
codec => json_lines
}
}
逻辑是这样的当我们数据改变时一定要更新update_time字段,
而在logstach增量更新初始化时会记录时间1,
第二次跑时数据库时间变化则更新这条数据到es
执行 .\logstash.bat -f .\mysql.conf
linux
bin目录下 nohup ./logstash -f sqlcslz-lin.conf &
查看进程 ps aux | grep logstach