CentOS7 Logstash6.2 简单记录
零、Elastic Stack产品:
https://www.elastic.co/guide/index.html
1、Beats
Beats 是开源的data shipper(数据搬运者),以agent的形式运行,发送各种类型的可操作的数据给elasticsearch。beats可以直接发送数据给elasticsearch或通过先发送给Logstash。Packetbeat、Filebeat、Metricbeat和Winlogbeat是Beats的一些例子,你也可以可以创建你自己的beat。
2、Elasticsearch
Elasticsearch 负责存储、检索和分析。
Elasticsearch是一个易扩展的开源的全文本搜索和分析引擎。
它允许你快速地(接近实时地)存储、搜索和分析大量的数据。
场景应用例子:
(1)你运行一个在线的web存储并允许你的客户搜索你卖的商品,你可以使用Elasticsearch存储你所有的商品catalog和存货清单并提供搜索和自动完成建议;
(2)你想收集日志或事务数据并且你想分析这些数据来预测、统计等。
(3)你运行一个价格提醒平台,你可以将提醒条件放入elasticsearch并使用它的反向搜索能力并推送提醒给客户。
3、Elasticsearch Hadoop
Elasticsearch Hadoop 包含3个类似的、但互相独立的3个子项目,专注于不同的方面。
elasticsearch-hadoop proper是在Hadoop环境中与elasticsearch交互,如果你使用Map/Reduce、Hive、Pig、ApacheSpark、Apache Storm、Cascading,这个项目适合你。
repository-hdfs是使用HDFS作为后端的repository来snapshot/restore from/to elasticsearch。
elasticsearch on YARN是在YARN上运行elasticsearch。
4、Logstash
Logstash 负责集中、转换 & 存储你的Data。
5、Kibana
Kibana 与elasticsearch一起工作,用于用于分析并形象展示数据。你使用它搜索、查看并与数据交互。
6、X-Pack
X-Pack 是elastic stack的扩展,集合了安全、提醒、监控、报告、图形化等能力到同一个包中。
Logstash Docs
https://www.elastic.co/guide/en/logstash/current/index.html
一、logstash介绍
1. Logstash负责集中、转换 & 存储你的Data。
Logstash是一个开源的、服务端的数据处理pipepline,
它接收多个源的数据、对它们进行转换,
然后将它们发送到你喜欢的"stash"(就是Elasticsearch)。
- 接收数据用input
- 解析并转换数据用filter
- 输出数据用output
Logstash使用的是插件机制,各种插件在 https://github.com/logstash-plugins
当然也可以自己写插件
用X-Pack监控 Logstash (https://www.elastic.co/products/x-pack/monitoring)
2. 安装
注意 logstash需要 Java8 。并且不支持Java 9.
检查java version
java -version
2.1 使用二进制包安装
访问 https://www.elastic.co/downloads/logstash
找到你需要的包,解压。
# 下载tar包
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.2.tar.gz
# 解压
tar xf logstash-6.2.2.tar.gz
# 进入到目录
cd logstash-6.2.2/
# 准备一个config file
cat > logstash.conf << EOF
input {
beats {
port => "5044"
}
}
output {
stdout {
codec => rubydebug
}
}
EOF
# 运行
bin/logstash -f logstash.conf
# 使用 -e 在命令行上指定配置内容
bin/logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'
# 上面的命令执行可能需要等待一会
# 使用 -f或--path.config 指定配置文件
# 使用 --config.test_and_exit 解析配置文件并报告可能的错误
bin/logstash -f some.conf --config.test_and_exit
# 如果配置文件通过了检查,执行下面的命令
# 使用 --config.reload.automatic 表示修改了配置文件之后自动重新加载配置,这样你就不用重启Logstash
bin/logstash -f some.conf --config.reload.automatic
2.2 yum方式安装
# 下载并安装公共签名key
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
# 生成 yum 源文件
cat > /etc/yum.repos.d/logstash.repo <<EOF
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# 安装
sudo yum install logstash
pipeline的骨架:
# 使用#进行注释
input {
}
filter {
#filter是可选的
}
output {
}
3. 使用 logstash 解析日志
以Filebeat为例,
首先需要运行一个Filebeat服务端,
然后使用Logstash内置的Beats input来让Logstash可以接收Elastic Beats框架发出的事件。
3.1 配置Filebeat
https://www.jianshu.com/p/b7245ce58c6a
配置Filebeat的配置文件 让其output到Logstash的IP:port(5043端口,为什么是这个端口,看后面),
然后运行filebeat,
Filebeat会尝试连接到Logstash服务器端口。
3.2 接下来配置logstash
创建一个pipeline Configuration,这个pipeline使用beats input plugin来接收Beats的事件。
新建一个文件 first-pipeline.conf
input {
beats {
port => "5043"
}
}
output {
stdout {
codec => rubydebug
}
}
然后运行
bin/logstash -f first-pipeline.conf --config.reload.automatic
你会在console看到一些比较原始的信息
3.3 简单配置filter
grok filter是Logstash内置的几个插件之一。
grok filter插件使你能将非结构化的数据解析成结构化的、可查询的数据。
grok 使用正则表达式,例如 web server 的log 可以用 %{COMBINEDAPACHELOG} 来解析
将下面的配置放到配置文件中,然后观看输出,
可以看到除了原始的信息之外,还有解析出的各个字段的值
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}
除了将数据解析成可查询的数据之外,filter插件还可以基于提供的数据进行数据扩展。
例如geoip插件能根据IP地址,查找位置等信息
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
geoip {
source => "clientip"
}
}
现在web logs 已经分解成特定的字段(fields),Logstash pipeline可以将数据index到Elasticsearch集群中了。
将output更改为如下配置,Logstash将通过http协议连接Elasticsearch。
output {
elasticsearch {
hosts => [ "localhost:9200","IP2:port2",... ]
}
}
还可以在input和output中使用多个input和output插件,例如
output {
elasticsearch {
hosts => ["IP1:port1", "IP2:port2", "IP3"]
}
file {
path => "/path/to/target/file"
}
}
4. Logstash是如何工作的?
Logstash处理event的pipeline有3个阶段:
inputs=>filters=>outputs
- inputs 生成events,
- filters 修改它们,
- outputs 将他们发送到其他位置。
inputs和outputs支持codecs,让数据在进入或离开pipeline时进行decode或encode,而不用使用额外的filter。
- input codecs在数据进入到input之前对其进行decode操作,
- output codecs在数据离开output之前对其进行encode操作。
默认的plain codec是每一行算作一条message,如果是其他的codec,例如multiline或json,就不是这样了,关于codec的理解一定要清晰
4.1 一些常用的inputs
-
file: 读取一个文件,类似于UNIX的tail -fn0 someFile
-
syslog: 监听514端口的syslog messages,并使用RFC3164格式进行解析
-
redis: 从redis服务器读取,使用redis channels和redis lists。
Redis经常作为一个中间人,先将数据发送到Redis,然后logstash再从Redis读取。 -
beats: 处理Filebeat发出的事件,Filebeat用于将文件内容发送到远端logstash。
4.2 一些常用的filters
你可以设置条件来触发filters
-
grok: 解析并结构化任意数据。有很多内置的patterns供你使用。
-
mutate: 在event fields上执行通用的转换。
你可以rename、remove、replace、modify event fields。 -
drop: 丢弃events,例如debug events。
-
clone: 制作eventde一份拷贝,可能增加或移除一些fields。
-
geoip: 为IP地址增加一些关于地理位置的信息(可以在Kibana中展示amazing charts)
4.3 一些常用的outputs
一个event可以穿过多个outputs,直到所有的output处理完毕,event也就结束了。
-
elasticsearch: 发送数据到elasticsearch。
-
file: 发送数据到文件。
-
graphite: 发送数据到graphite,这是一个开源的工具用于存储和graphing metrics。
-
statsd: 发送数据到statsd,这是一个服务,它监听统计数据,例如counters和timers,通过UDP发送,并发送聚合数据到一个或多个可插拔的后端服务。
4.4 一些常用的Codecs
codecs是基础的流filters,它可以作为input和output的一部分。
-
json、plain、msgpack
-
multiline:合并多行文本event,例如合并java Exception和stacktrace信息多行文本到一个单独的event。
4.5 Execution Model(执行模型)
每个input都在其自己的线程中运行,
inputs将event写入到一个通用的Java SynchronousQueue(Java同步队列)。
这个队列不持有events,而是将他们发送到一个空闲的worker(如果没有worker是空闲的就等待阻塞)。
每个pipeline worker thread从这个队列中取一批events,创建一个buffer,然后运行配置好的filters,然后运行配置好的outputs。
每批次获取的event数量(即size of batch)是可以配置的,worker的数量(即worker threads的数量)也是可以配置的。
默认情况下,Logstash在pipeline场景之间(即input->filter和filter->output)使用的是内存有界队列来缓存events。
如果Logstash不安全的退出,内存中存储的events会丢失。
你可以让Logstash将in-flight events持久化到disk。(略)
5. 设置并运行Logstash
5.1 文件夹结构
5.1.1 tar文件解压后的文件夹结构
-
bin/ 包含了 logstash启动脚本和logstash-plugin安装插件的脚本等可执行文件
-
config/ 包含了logstash.yml和jvm.options文件等配置文件(命令行参数--path.settings)
-
logs/ 包含了日志文件。(命令行参数--path.logs)
-
plugins/ 包含了本地的,非Ruby-Gem的插件文件,每个plugin都对应一个子文件夹。推荐只用于开发(实际没看到这个文件夹) 。(命令行参数 --path.plugins)
5.1.2 RPM安装包安装后的文件夹结构
-
/usr/share/logstash/ 默认安装路径,以下使用${Logstash_home}代替
-
${Logstash_home}/bin 包含了 logstash启动脚本和logstash-plugin安装插件的脚本等可执行文件
-
/etc/logstash 包含了logstash.yml和jvm.options文件等配置文件(命令行参数--path.settings),这个文件夹还包含了startup.options。
logstash.yml包含了启动Logstash需要的命令行参数。
jvm.options的每一行都指令了JVM配置flags。
startup.options文件包含了bin/system-install脚本需要的选项,system-install脚本用于生成与你系统相对应的启动脚本。当你安装Logstash包时,system-install脚本在安装的最后执行(使用startup.options文件)来设置logstash.yml中的参数,例如user、group、service name等信息。在你更改startup.options文件后需要再次运行system-install脚本才能生效。 -
/etc/logstash/conf.d 包含了pipeline的配置文件,可以指定文件夹、文件和通配符,不要在该文件夹中存放除了pipeline配置文件之外的文件! (命令行参数 --path.config)
-
/var/log/logstash #包含了日志文件。(命令行参数--path.logs)
-
${Logstash_home}/plugins/ #包含了本地的,非Ruby-Gem的插件文件,每个plugin都对应一个子文件夹。推荐只用于开发(实际没看到这个文件夹) 。(命令行参数 --path.plugins)
5.2 logstash.yml文件
使用YAML格式编写。
如下所示:
pipeline:
batch:
size:125
delay:5
也可以写成
pipeline.batch.size:125
pipeline.batch.delay:5
包含如下设置:
-
node.name 默认是机器名
-
pipeline.workers 如果你发现event正在堆积,或者CPU够用,建议增大这个值,默认是CPU的核数。
-
pipeline.output.workers 表示每个output插件实例使用的workers的数量,默认是1.
-
pipeline.batch.size 更大的值一般更有效率,但是考虑到内存,你或许需要增加JVM heap size,通过设置LS_HEAP_SIZE变量。默认是125。
-
pipeline.batch.delay 在分发一个batch到filters和workers之前最长等待的时间,默认是5(ms)
-
pipeline.unsafe_shutdown 当设置为true时,将强制Logstash退出,即使在shutdown期间仍然有inflight event。默认为false,表示Logstash会拒绝退出直到所有接收的event被push到了outputs中。
-
path.config pipeline配置文件或文件夹,如果使用了通配符或指定了文件夹,配置文件按alphabetical(字母)顺序读取。
-
config.test_and_exit 当设置为true时,检查配置文件是否有效并退出。注意grok patterns不会被检查。默认为false。
-
config.reload.automatic 当设置为true时,会周期性检查配置是否发生了更改并reload。也可以手动发送SIGHUP信号来reload。默认为false
-
config.reload.interval 默认为3(s)
-
config.debug 当设置为true时,会显示fully compiled Configuration as a debug log message。你必须同时设置log.level:debug。 警告!log信息会包含传递给插件的密码。
-
log.level 有fatal、error、warn、info、debug、trace。默认值是info。
-
log.format 日志格式。可选值有json、plain(使用的是Object#.inspect)。默认值是plain。
-
queue.type 可选值有memory和persisted,默认是memory。关于persisted的相关参数略
-
http.host metrics REST endpoint的绑定的地址。默认是127.0.0.1
-
http.port metrics REST endpoint的绑定的端口。默认是9600
-
path.logs Logstash日志存放的文件夹。默认是${Logstash_home}/logs
-
path.plugins 查找自定义plugins的文件夹。你可以多次使用来指定多个路径。期望的文件夹结构为: PATH/logstash/TYPE/NAME.rb 。TYPE是inputs/filters/outputs/codecs,NAME是插件的名称。
5.3 命令行参数
(同logstash.yml文件中的配置参数就略了)
-
--version # -V 显示版本
-
--help # -h
-
--node.name 默认机器名
-
--path.config CONFIG_PATH -f 指定文件或文件夹。
如果多次指定,只取最后一个有效。
可以使用通配符,例如 --path.config '/tmp/{one,two,three}' -
--config.string CONFIG_STRING -e 在命令行上指定pipeline config字符串
-
--pipeline.workers COUNT -w 指定worker的数量
-
--pipeline.batch.size SIZE -b
-
--pipeline.batch.delay DELAY_IN_MS -u
-
--pipeline.unsafe_shutdown 等同于设置pipeline.unsafe_shutdown为true
-
--path.plugins PATH -p
-
--path.logs PATH -l
-
--log.level LEVEL
-
--config.debug
-
--interactive SHELL -i 丢带shell中而不是正常运行,可选的值有irb和pry
-
--config.test_and_exit -t 等同于设置config.test_and_exit为true
-
--config.reload.automatic -r
-
--config.reload.interval RELOAD_INTERVAL
-
--http.host HTTP_HOST
-
--http.port HTTP_PORT
-
--log.format FORMAT
-
--path.settings SETTINGS_DIR 指定包含logstash.yml、log4j2.properties等配置文件的文件夹。
也可以通过设置环境变量LS_SETTINGS_DIR来指定。
5.4 Logging(Logstash自己的日志)
Logstash会记录内部的日志到LS_HOME/logs或/var/log/logstash目录下。
默认的log级别是INFO。
Logstash使用log4j2框架。
当调试时,将loglevel降到debug会收集到更详细的信息。
以前你只能对整个Logstash设置log级别,从5.0以后你可以为特定的子系统单独进行设置。
例如你正在调试output的问题,你可以只对output组件的loglevel进行设置。
Logstash使用一个log4j2.properties(该文件位于config文件夹中)来进行日志设置。
你可以直接修改这个文件,然后必须重启Logstash生效。
slowlog:
可以在logs文件夹中找到。
slowlog可以在logstash.yml中进行配置:
slowlog.threshold.warn (default: -1)
slowlog.threshold.info (default: -1)
slowlog.threshold.debug (default: -1)
slowlog.threshold.trace (default: -1)
默认情况下,这些值被设置为-1纳秒,表示不限制threshold,即没有slowlog。
例子
slowlog.threshold.warn 2s
slowlog.threshold.info 1s
slowlog.threshold.debug 500ms
slowlog.threshold.trace 100ms
在上面的例子中,在一个filter中处理时间超过2s的events会被记录。
日志会记录event的全部和对造成slowness负责的filter配置。
Logging APIs
虽然你可以直接修改log4j2.properties文件并重启Logstash,但是这很无聊并可能会导致不必要的downtime。
你还可以 通过logging api 动态更新logging level.
例子1:设置Logstash的outputs子系统的elasticsearch模块的logger log级别。
PUT /_node/logging
输出:
{ "logger.logstash.outputs.elasticsearch" : "DEBUG" }
例子2:获取各子系统的运行时 logging list
GET /_node/logging?pretty
#可以看到一堆的loggers及其loglevel。
5.5 关闭Logstash:
systemctl stop logstash
kill -SIGTERM {logstash_pid} # 或 kill -15 或 kill
在Logstash安全关闭前,它会:
- 停止input、filter和output插件
- 处理所有的in-flight events
- 结束Logstash进程
下列的情形会影响shutdown的过程:
- 一个input plugin正在缓慢的接收数据
- 一个缓慢的filter,例如执行sleep(10000)的ruby filter或一个elasticsearch filter正在执行非常缓慢的查询。
- 一个断开的output插件正等待重新连接以便flush inflight events。
这些情况会使得shutdown过程的时间和成功变得不可预知。
Logstash有一个stall(陷入泥潭)检测机制,分析在shutdown期间pipeline和插件的行为。这个机制产生周期性的信息,包含了在内部队列中inflight event的数量和繁忙的worker线程列表。
可以使用--pipeline.unsafe_shutdown来强制关闭。
6. 配置Logstash
6.1 配置文件示例:
注释使用#开头,并且不需要必须在行的开头
input {
file {
path => "/var/log/messages"
type => "syslog"
}
file {
path => "/var/log/apache/access.log"
type => "apache"
}
}
插件可以要求配置项的值必须是特定类型,支持array、lists、boolean、bytes、codec、hash、number、password、uri、path、string
-
Array 示例:(现在已经基本弃用。只是仍然用来处理hashes或mixed types的lists,这些都不用核查。) users => [ {id => 1,name => bob},{id => 2,name => jane} ]
-
Lists 示例: path => [ "/var/log/messages", "/var/log/*.log" ]
-
Boolean示例:ssl_enable => true #注意true 或 false 没有引号
-
Bytes示例:
my_bytes => "1113" # 1113 bytes
my_bytes => "10MiB" # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes
-
Codec示例:codec => "json"
-
Hash示例:
match => {
"field1" => "value1"
"field2" => "value2"
...
}
-
Number示例:port => 33
-
Password示例:不会被logger或打印出来 my_password => "password"
-
URI示例:my_uri => "http://foo:bar@example.net"
-
Path示例:my_path => "/tmp/logstash"
-
String示例:name => 'It's a beautiful day'
6.2 在pipeline配置文件中访问Event Data和Fields:
events都拥有属性,例如一个apache access log中应该会有状态码(200,404)、请求路径(/或index.html)、HTTP方法(POST/GET)、客户端IP地址等,我们称这些属性为Fields。
一些配置选项需要这些Fields存在才能工作。因为inputs用于生成events,所以这时候还没有任何fields!
使用 ${[FieldReference]} 来引用字段内容,例如 ${type} === ${[type]}
Field references(引用)
语法:[fieldname]
如果你引用的是最顶级的field,你可以省略[],只是简单的使用fieldname。
如果你要引用一个nested field,你可以指定全路径: [top-level field][nested field]。
例如 对如下数据
{
"agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
"ip": "192.168.24.44",
"request": "/index.html"
"response": {
"status": 200,
"bytes": 52353
},
"ua": {
"os": "Windows 7"
}
}
上面的数据有5个top-level fields,有3个nested fields。
要引用 os field,你可以使用[ua][os]。
例子:
output {
statsd {
increment => "apache.%{[response][status]}"
}
}
Conditionals(条件执行)
只对特定条件的event进行 filter和output
Conditionals 支持if、else if和else的嵌套。
其语法是
if EXPRESSION {
...
} else if EXPRESSION {
...
} else {
...
}
EXPRESSION(表达式)是什么?是比较测试、布尔值逻辑等等
-
你可以使用 == != < <= > >=
-
你可以使用正则 =~ !~ (例如 someStr =~ pattern)
-
你可以使用 in 、 not in
-
支持 and or nand xor !
-
使用 () 分组
表达式可以很长并且很复杂。
例子:
filter {
if [action] == "login" {
mutate { remove_field => "secret" }
}
}
特殊的fields:
1、@metadata field
@metadata field 在logstash 1.5之后引入,主要用于conditionals。
@metadata的内容不是events的内容。
rubydebug codec 允许你暴露@metadata的内容,如下所示:
stdout {
codec => rubydebug {
metadata => true
}
}
6.3 在pipeline配置文件中使用环境变量
-
使用 ${var} 引用环境变量
-
在Logstash启动的时候,每个引用会被替换成值,之后就不会再动态变化。
-
区分大小写
-
引用的环境变量不存在会导致Configuration错误。
-
你可以使用 ${var:defaultValue}来指定一个默认值
6.4 logstash配置例子
处理apache2 access logs的例子
input {
file {
path => "/tmp/access_log"
start_position => "beginning"
}
}
filter {
if [path] =~ "access" {
mutate {
replace => {
"type" => "apache_access"
}
}
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout {
codec => rubydebug
}
}
处理 syslog messages的例子
input {
tcp {
port => 5000
type => syslog
}
udp {
port => 5000
type => syslog
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
}
stdout {
codec => rubydebug
}
}
6.5 重新加载配置文件:
重新加载配置文件的方式:
- --config.reload.automatic 需要配置文件的内容允许重新加载,默认每3秒检查一次配置文件的变化。
- kill -1 14175 ; 或 kill -HUP 14175
在配置文件reload期间,JVM不会重启。
对grok pattern files的更改也会被reloaded,但是不会触发reload。
6.6 处理多行的events
要处理多行的events,Logstash需要知道哪些行是一个单独events的组成部分。
优选的工具是multiline codec,它可以将一个单独的input的多个行合并到一个单独的行。
multiline codec最重要的配置部分是:
- pattern :指定一个正则表达式。那些匹配到的lines被认为是作为之前行的继续或是新的multiline event的开始。你可以使用grok正则表达式模版。
- what : 有2个可选值 previous或next。previous表示匹配到的行是前面的行的一部分。next表示那些匹配到的行是后面行的一部分。
- negate: true或false(默认值是false)。表示反向匹配。
例子1 处理Java Stack Traces
java的栈追踪信息除了第一行之外其他都以空格开头,以下就表示所有以空格开头的line都是之前行的一部分
input {
stdin {
codec => multiline {
pattern => "^\s"
what => "previous"
}
}
}
例子2 有些多行的日志在未结束时会在行尾加\
以下就表示所有以\结尾的line都是之后行的一部分
filter {
multiline {
type => "somefiletype"
pattern => "\\$"
what => "next"
}
}
例子3 服务的活动日志一般都以一个时间戳开始,然后是其他信息
以下就表示所有不是(negate=true)以时间戳开头的行都是前面行的一部分
input {
file {
path => "/var/log/someapp.log"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601} "
negate => true
what => previous
}
}
}
6.7 glob pattern支持:
- * 匹配任意文件或任意字符。但是不匹配隐藏文件(.开头的文件),要匹配.开头的文件,使用 {*,.*}
- ** 递归匹配任意的文件夹
- ? 匹配任意单个字符
- [set] 匹配区间内的任意单个字符,例如 [a-z] [^a-z]反向匹配
- {p,q} 匹配 或者是p或者是q。可以不是单个字符,可以大于2个选项,等同于正则表达式中的(foo|bar)
- \ 转义用
7. 部署并scaling Logstash:
7.1 最小化安装: 一个Logstash实例和一个elasticsearch实例。
7.2 将Logstash分为shipping instance 和 indexing instance
- shipping instance 用于将input信息输出到message queue中,可以有多个这样的shipping instance
- indexing instance 用于将messages queue中的信息index到elasticsearch集群中
8. 性能调优:
一次改一个配置,然后观察结果。不要一上来就更改pipeline.workers的数量。
性能检查列表(按顺序):
8.1 检查input source和output 目标的性能。
如果input源和output目标都不够快,那Logstash肯定也快不了。
8.2 检查系统信息:
- CPU: 是否负荷很重。可以使用top -H查看。如果CPU使用率很高,那就调到第(3)步(检查JVM heap),然后阅读第(4)步(调优worker设置)。
- memory:记住Logstash运行于一个Java VM内。这意味着Logstash总是使用分配给它的最大的内存。内存不足时可能会导致Logstash使用swap分区。
- I/O使用情况:监控磁盘I/O以便检查disk的负荷。如果你使用了file output,有可能会填满你的磁盘 ;如果Logstash产生了很多的错误日志,也可能会导致磁盘占满; 你可以使用iostat、dstat或其他类似的工具来监控磁盘I/O。
- 网络I/O:如果你的input和output使用了大量的网络操作。你可以使用dstat或iftop等工具来监控网络。
8.3 检查JVM heap:
很多时候CPU的性能降低都是因为 heap size太小,导致JVM频繁的进行垃圾回收。
一个简单的方法是 将heap size变成原来的双倍大小,然后观察性能改善。
不要将heap size设置超过实际的物理内存大小。
至少留1GB给OS和其他进程。
你可以使用jmap命令行工具或使用VisualVM来精确地度量JVM heap。
8.4 调优worker设置:
- 使用 --pipeline.workers COUNT选项来增加workers的数量。将其扩展到CPU核数的整数倍是安全的。
- 每个output默认只能使用一个单独的pipeline worker。你可以增加这个值,通过修改配置文件中的output的worker设置。
- 你还可以调优output的batch size。对于很多的outputs,例如elasticsearch output,这个设置和I/O操作的size一致。例如elasticsearch output,这个设置和batch size一致。
8.5 调优并Profiling Logstash performance:
Logstash提供了如下的pipeline性能调优选项:
- pipeline.workers
- pipeline.batch.size
- pipeline.batch.delay
9. 监控APIs
通用选项:
- (a) ?pretty=true 表示返回的JSON信息会被pretty格式化(影响性能,只在debug时使用!)
- (b) human=true 表示返回plain文本,而不是JSON格式。默认是false。
节点信息API
- (a) GET /_node/<types> #types是可选的,默认是显示pipeline、os、jvm信息,写上type就只显示type的信息。
- (b) GET /_node/plugins #获取所有的当前已安装的Logstash插件。这个API返回的是 bin/logstash-plugin list --verbose显示的内容。
- (c) GET /_node/stats/<types> #检索Logstash运行时stats。types是可选的,用于只显示types类型的信息,可选的值有jvm、process、pipeline、reloads、os。
- (d) GET /_node/hot_threads #输出JSON格式的文本,包含the top hot threads(负载最高的线程)的breakdown(分解)。可选的参数有
- threads=3 #设置显示的hot threads的数量。
- human=false #使用plain格式而不是JSON格式。
- ignore_idle_threads=true #不显示idle的threads。
10. plugins插件
bin/logstash-plugin 脚本用于管理插件的生命周期。
你可以使用这个脚本安装、移除、升级插件。
10.1 列出插件:列出当前可用的插件
logstash-plugin list #列出所有已安装的插件
logstash-plugin list --verbose #显示插件的版本信息
logstash-plugin list '*namefragment*' #搜索创建
logstash-plugin list --group output #列出output(input、filter、codec)插件
10.2 安装插件:
export HTTP_PROXY=http://127.0.0.1:3128 #设置代理
logstash-plugin install logstash-output-kafka #首先你需要联网,能够访问RubyGems.org来获取插件,一旦安装完毕,你就可以在配置文件中使用它们。
logstash-plugin install /path/to/logstash-output-kafka-1.0.0.gem #本地安装
关于--path.plugins指定插件文件夹,前面已经说了,这个文件夹的目录架构应该是PATH/logstash/TYPE/NAME.rb,TYPE是input、output、filter、codec,NAME就是插件名称。
10.3 更新插件:
logstash-plugin update [logstash-output-kafka] #更新所有插件或指定插件
10.4 删除插件:
logstash-plugin remove logstash-output-kafka
10.5 离线插件包制作:
logstash-plugin prepare-offline-pack --output OUTPUT [PLUGINS]
# OUTPUT指定压缩的插件包要被写往何处,默认是/LOGSTASH_HOME/logstash-offline-plugins-5.2.2.zip
# [PLUGINS] 指定要打进插件包中的插件(一个或多个,可以使用通配符)
例子
bin/logstash-plugin prepare-offline-pack logstash-input-beats
bin/logstash-plugin prepare-offline-pack logstash-filter-*
bin/logstash-plugin prepare-offline-pack logstash-filter-* logstash-input-beats
10.6 离线插件包使用:
logstash-plugin install file:///path/to/logstash-offline-plugins-5.2.2.zip
10.7 Event API:
供插件开发者使用和Ruby filter使用,Ruby filter例子
filter {
ruby {
code => 'event.set("lowercase_field", event.get("message").downcase)'
}
}
11. input插件
https://www.elastic.co/guide/en/logstash/current/input-plugins.html
elastic支持的插件查看 https://www.elastic.co/support/matrix#show_logstash_plugins
这里只列出常用的
beats
接收elastic beats框架发出的events
elasticsearch:
读取elasticsearch集群的查询结果,这对重现test logs、reindexing有帮助。
例子:
input {
# Read all documents from Elasticsearch matching the given query
elasticsearch {
hosts => "localhost"
query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
}
}
这会创建一个如下格式的elasticsearch query:
curl 'http://localhost:9200/logstash-*/_search?&scroll=1m&size=1000' -d '{
"query": {
"match": {
"statuscode": 200
}
},
"sort": [ "_doc" ]
}'
eventlog
拉取windows的event log。
exec
捕获shell命令的输出作为event。周期性的运行一个shell命令并捕获所有的输出作为event。
file
从文件流获取events,一般来说是根据tail -fn0来读取文件发生的变化,但是也可以从头开始读文件。一般是一行作为一个event,如果你想将多行合并到一个event中,你可能需要使用multiline codec 或 filter。这个插件目标是最终文件的变化,而不是从头到尾地读取文件然后将其整个内容合并到一个单独的event中。 插件会记录当前读取到的位置(通过sincedb文件),这就让即使logstash重启也不会丢失lines。默认情况下,sincedb文件位于运行logstash的用户根目录下,其文件名中有被读取的文件的名称,如果这个文件被重命名了,那么sincedb也就失去作用了。 处理File rotation:可以检测到,不管file是通过重命名还是copy操作被rotated了。
generator
生成随机的log event,用于测试
heartbeat
生成heartbeat event,用于测试
http
通过HTTP或HTTPS接收event
http_poller
解码一个HTTP API的输出到events
log4j
通过TCP socket接收来自log4j SocketAppender 的event。只对log4j 1.x有效。
pipe
接收来自一个长时间运行的command pipe的流events
redis
从Redis实例读取events
sqlite
基于SQLite数据库的rows创建events
stdin
从标准输入读取events
syslog
读取syslog message作为event。syslog是一个非常混乱的术语。这个input仅支持RFC3164加一些小的修改。日期格式必须是RFC3164格式或ISO8601。其它部分也必须遵守RFC3164。如果你不使用RFC3164,不要使用这个input。
tcp
从TCP socket读取event。和stdin和file inputs插件类似,每一个event被假定为一行文本。其可配置项有
add_field=>{}
codec=>"plain"
host=>"0.0.0.0"
id="someStr" #为插件的configuration增加一个唯一ID,如果没有提供ID,logstash会生成一个。强烈推荐你设置一个ID。
mode=>"server" #还可以设置为client,表示连接到一个server
port=>"some number"
proxy_protocol=>false #目前只支持v1
tags=>[ {id => 1,name => bob},{id => 2,name => jane} ] #设置任意数量的任意tags
type=>"someStr" #type一般用于启动filter。
udp
从UDP 读取events
unix
从UNIX socket读取event
xmpp
通过XMPP/Jabber协议接收events
log4j2
(实测安装有问题) 项目主页 https://github.com/jurmous/logstash-log4j2
安装 bin/logstash-plugin install logstash-input-log4j2 #如果不能安装,请先运行logstash
查看 bin/logstash-plugin list | grep log4j2
配置文件
input {
log4j2 {
port => 7000
mode => "server"
}
}
output {
stdout { codec => rubydebug }
}
12. output插件
https://www.elastic.co/guide/en/logstash/current/output-plugins.html
elasticsearch
存储logs到Elasticsearch。
file
将event写到磁盘上。
http
pipe
将event的输出作为另一个程序的标准输入
redis
将events发送到一个Redis queue,使用RPUSH命令。
stdout
tcp
udp
13. filter插件
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
aggregate
将属于一个task的多个events的信息聚合在一起,并最终聚合到最后一个events中。一定要将worker数量设置成1,否则处理顺序错乱会导致未知的结果。
alter
允许你做一些 在mutate filter中没有包含的对field的操作。
cidr
检查IP地址是否包含在network blocks中
clone
复制Event
collate
校对Event,通过time或count
csv
解析逗号分割的值到不同的fields
date
解析fields中的date作为Logstash的timestamp 。
例子:第一个值是field的name,然后是可能的时间格式
date {
match => [ "logdate", "MMM dd yyyy HH:mm:ss","MMM d yyyy HH:mm:ss", "ISO8601" ]
}
dissect
使用delimiters(定界符)来将数据分割到Fields
dns
执行一个标准的或反的DNS lookup
drop
丢掉所有events
elapsed
计算两个events之间消耗的时间
elasticsearch
拷贝前一个event的fields到当前的events
environment
存储环境变量作为metadata sub-fields。
extractnumbers
从一个字符串中提取数字
geoip
增加IP地址的地理信息
grok
使用正则来解析非结构化的数据。
Logstash有大约120个默认的patterns。
语法:
%{NUMBER:duration} %{IP:client}
NUMBER、IP是pattern,duration、client是解析后的field名称
整体的例子:
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
i18n
移除fields中的特殊字符
json
解析json events。
例子: target不写则默认被存储到event的root(top level)中
json {
source => "message"
target="doc"
}
json_encode
将一个field序列化成JSON
kv
解析key-value pairs
metaevent
增加任意的fields到event
mutate
对fields执行变化操作。
例子
# 将field1的value转换成integer类型
mutate {
convert => { "field1" => "integer" }
}
# 替换
mutate {
gsub => [ "field1","/","_", "field2","[\\?#-]","." ]
}
# field1如果不是array类型就没有效果
mutate {
join => { "field1" => "," }
}
mutate {
lowercase => [ "field1" ]
}
# 将两个field合并
mutate {
merge => { "dest_field" => "added_field" }
}
range
对指定的字段检查size或length limits
ruby
执行任意的ruby代码
sleep
睡眠特定的时间
split
将多行的信息split成单独的events
translate
将field的内容替换。
useragent
将user agent字符串解析到fields中
uuid
增加一个uuid到events
xml
将xml解析到fields。
14. codec插件
https://www.elastic.co/guide/en/logstash/current/codec-plugins.html
json
如果数据是一个JSON array,会创建多个events。
如果你要解析\n分割的JSON消息,请使用json_lines。
如果接收到的payload不是JSON,它会使用plain text并增加一个_jsonparsefailure的tag,然后payload被存储到message field中。
json_lines
line
multiline
将多行messages压缩合并成一个单独的事件,最初的目标是合并java exception and stacktrace和使用\分割的多行。
plain
rubydebug
将你输出的event使用Ruby Awesome Print库格式化。