ELK文集

ElasticSearch数据导入/导出至Logstash

2017-09-20  本文已影响158人  lily_佳忆

Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,常用于日志处理、或一些具有一定格式的数据导入到ES的处理。

Logstash 的安装使用步骤:

1、下载安装包
wget https://artifacts.elastic.co/downloads/logstash/logstash-5.6.1.tar.gz
2、解压安装包 : tar -zxvf logstash-5.6.1.tar.gz
部分数据集较大,需要在/bin/logstash目录下添加: exportLS_HEAP_SIZE=10g
3、在config路径下创建配置文件,如 test.conf
4、进入 logstash-5.6.1目录,执行bin/logstash -f test.conf

Logstash 的三个阶段:

input 数据输入端,接收源数据,如:
file:从文件中读取
syslog:监听系统日志信息,并解析成RFC3164格式。

Filter 数据规范,进行格式处理,如:
grok: 通过正则解析和结构化任何文本日志。
mutate: 对字段进行转换,如重命名、删除、替换字段等。
drop: 丢弃字段(日志)

output 将数据输出到指定位置,如:
file: 将事件数据写入到文件内;
elasticsearch: 发送事件数据到 Elasticsearch;

logstash导入数据时常用的参数:

参数 类型 描述 示例
path array 用来设置导入目标文件的文件路径 path => [ "/data/test.log", "/data/.txt" ]     path => "/data/.txt"
exclude array 排除文件,支持glob 展开 exclude => "/data/*.txt"
codec codec 用于对规范数据数据进行解码 codec => json
discover_interval number 设置logstash读取新文件的时间间隔,默认值是15秒 discover_interval => 15
max_open_files number 配置当前input可以监控的文件的最大值 max_open_files => 10
close_older number 设置结束时间,即如果在限制时间段内没有更新内容,就关闭监听它的文件句柄,默认是3600秒 close_older => 3600
delimiter string 设置行与行之间的分隔符,默认使用\n delimiter => "\n"
sincedb_path string 记录数据读取的位置,配置存放该位置的文件路径 sincedb_path => "/data/sincedb_test.txt"
sincedb_write_interval number logstash 每隔多久写一次sincedb文件,默认是15秒 sincedb_write_interval => 15
start_position string 设置logstash是从“beginning”还是“end”去读取文件,默认是“end” start_position => "beginning"
stat_interval number logstash每隔多久检查被监听文件是否有更新,默认是1秒 stat_interval => 1
tags array 添加一个任意的数字作为当前事件的标签 tags => 12
add_field hash 在事件中加入一个field add_field => { test }
type string type属性主要在filter场景中使用 type => "testlog"

从elasticsearch导出数据至文件 demo :

input {
elasticsearch {
hosts => "192.168.31.130:9200" #配置 elasticsearch的地址及index
index => "test"
query => '{ "query": {"match_all" : {} } }' #配置匹配模式
size => 10000 #配置遍历数据
scroll => "5m" # 配置遍历时间间隔
docinfo => true
}
}
output {
file {
#配置文件导出路径
path => "/data01/test.txt"
}
}

将json文件导入elasticsearch配置demo:

input {
file {
path => "/data/*.log" #配置读取的文件
discover_interval => 5
start_position => "beginning" #从文件开始位置读取
sincedb_path => "/data/sincedb_test.txt" #记录读取的位置
sincedb_write_interval => 15
codec => json { #配置文本类型
charset => "UTF-8"
}
}
}
output {
elasticsearch {
hosts => "192.168.31.130:9200"
index => "test"
}
}

从elasticsearch导入到另一个elasticsearch配置demo:

input {
elasticsearch {
hosts => "192.168.31.130:9200"
index => "test*"
query => '{ "query": {"match_all" : {} } }'
size => 10000
scroll => "5m"
docinfo => true
}
}
output {
elasticsearch {
hosts => "192.168.31.130:9200"
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
}
}

持续更新。。。

上一篇下一篇

猜你喜欢

热点阅读