ElasticSearch

CentOS7 Logstash6.2 简单记录

2017-11-01  本文已影响1000人  坚持到底v2

零、Elastic Stack产品:

https://www.elastic.co/guide/index.html

1、Beats

Beats 是开源的data shipper(数据搬运者),以agent的形式运行,发送各种类型的可操作的数据给elasticsearch。beats可以直接发送数据给elasticsearch或通过先发送给Logstash。Packetbeat、Filebeat、Metricbeat和Winlogbeat是Beats的一些例子,你也可以可以创建你自己的beat。

2、Elasticsearch

Elasticsearch 负责存储、检索和分析。
Elasticsearch是一个易扩展的开源的全文本搜索和分析引擎。
它允许你快速地(接近实时地)存储、搜索和分析大量的数据。
场景应用例子:
(1)你运行一个在线的web存储并允许你的客户搜索你卖的商品,你可以使用Elasticsearch存储你所有的商品catalog和存货清单并提供搜索和自动完成建议;
(2)你想收集日志或事务数据并且你想分析这些数据来预测、统计等。
(3)你运行一个价格提醒平台,你可以将提醒条件放入elasticsearch并使用它的反向搜索能力并推送提醒给客户。

3、Elasticsearch Hadoop

Elasticsearch Hadoop 包含3个类似的、但互相独立的3个子项目,专注于不同的方面。
elasticsearch-hadoop proper是在Hadoop环境中与elasticsearch交互,如果你使用Map/Reduce、Hive、Pig、ApacheSpark、Apache Storm、Cascading,这个项目适合你。
repository-hdfs是使用HDFS作为后端的repository来snapshot/restore from/to elasticsearch。
elasticsearch on YARN是在YARN上运行elasticsearch。

4、Logstash

Logstash 负责集中、转换 & 存储你的Data。

5、Kibana

Kibana 与elasticsearch一起工作,用于用于分析并形象展示数据。你使用它搜索、查看并与数据交互。

6、X-Pack

X-Pack 是elastic stack的扩展,集合了安全、提醒、监控、报告、图形化等能力到同一个包中。


Logstash Docs

https://www.elastic.co/guide/en/logstash/current/index.html


一、logstash介绍

1. Logstash负责集中、转换 & 存储你的Data。

Logstash是一个开源的、服务端的数据处理pipepline,
它接收多个源的数据、对它们进行转换,
然后将它们发送到你喜欢的"stash"(就是Elasticsearch)。

Logstash使用的是插件机制,各种插件在 https://github.com/logstash-plugins
当然也可以自己写插件

用X-Pack监控 Logstash (https://www.elastic.co/products/x-pack/monitoring)

2. 安装

注意 logstash需要 Java8 。并且不支持Java 9.

检查java version

java -version

2.1 使用二进制包安装

访问 https://www.elastic.co/downloads/logstash
找到你需要的包,解压。

# 下载tar包
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.2.tar.gz
# 解压
tar xf logstash-6.2.2.tar.gz 

# 进入到目录
cd logstash-6.2.2/

# 准备一个config file
cat > logstash.conf << EOF
input {
  beats {
    port => "5044"
  }
}

output {
  stdout { 
    codec => rubydebug 
  }
}
EOF

# 运行
bin/logstash -f logstash.conf

# 使用 -e  在命令行上指定配置内容
bin/logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'
# 上面的命令执行可能需要等待一会

# 使用 -f或--path.config  指定配置文件 
# 使用 --config.test_and_exit  解析配置文件并报告可能的错误
bin/logstash -f some.conf --config.test_and_exit 

# 如果配置文件通过了检查,执行下面的命令
# 使用 --config.reload.automatic 表示修改了配置文件之后自动重新加载配置,这样你就不用重启Logstash
bin/logstash -f some.conf --config.reload.automatic

2.2 yum方式安装

# 下载并安装公共签名key
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

# 生成 yum 源文件
cat > /etc/yum.repos.d/logstash.repo <<EOF
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

# 安装
sudo yum install logstash
pipeline的骨架:
# 使用#进行注释 
input {
}

filter {
  #filter是可选的
}

output {
}

3. 使用 logstash 解析日志

以Filebeat为例,
首先需要运行一个Filebeat服务端,
然后使用Logstash内置的Beats input来让Logstash可以接收Elastic Beats框架发出的事件。

3.1 配置Filebeat

https://www.jianshu.com/p/b7245ce58c6a
配置Filebeat的配置文件 让其output到Logstash的IP:port(5043端口,为什么是这个端口,看后面),
然后运行filebeat,
Filebeat会尝试连接到Logstash服务器端口。

3.2 接下来配置logstash

创建一个pipeline Configuration,这个pipeline使用beats input plugin来接收Beats的事件。
新建一个文件 first-pipeline.conf

input {
  beats {
    port => "5043"
  }
}

output {
  stdout { 
    codec => rubydebug 
  }
}

然后运行

bin/logstash -f first-pipeline.conf --config.reload.automatic

你会在console看到一些比较原始的信息

3.3 简单配置filter

grok filter是Logstash内置的几个插件之一。
grok filter插件使你能将非结构化的数据解析成结构化的、可查询的数据。
grok 使用正则表达式,例如 web server 的log 可以用 %{COMBINEDAPACHELOG} 来解析

将下面的配置放到配置文件中,然后观看输出,
可以看到除了原始的信息之外,还有解析出的各个字段的值

filter {
  grok {
     match => { 
        "message" => "%{COMBINEDAPACHELOG}"
     }
  }
}

除了将数据解析成可查询的数据之外,filter插件还可以基于提供的数据进行数据扩展。
例如geoip插件能根据IP地址,查找位置等信息

filter {
  grok {
     match => { 
        "message" => "%{COMBINEDAPACHELOG}"
     }
  }
  geoip {
    source => "clientip"
  }
}

现在web logs 已经分解成特定的字段(fields),Logstash pipeline可以将数据index到Elasticsearch集群中了。
将output更改为如下配置,Logstash将通过http协议连接Elasticsearch。

output {
  elasticsearch {
     hosts => [ "localhost:9200","IP2:port2",... ]
  }
}

还可以在input和output中使用多个input和output插件,例如

output {
    elasticsearch {
        hosts => ["IP1:port1", "IP2:port2", "IP3"]
    }
    file {
        path => "/path/to/target/file"
    }
}

4. Logstash是如何工作的?

Logstash处理event的pipeline有3个阶段:
inputs=>filters=>outputs

inputs和outputs支持codecs,让数据在进入或离开pipeline时进行decode或encode,而不用使用额外的filter。

默认的plain codec是每一行算作一条message,如果是其他的codec,例如multiline或json,就不是这样了,关于codec的理解一定要清晰

4.1 一些常用的inputs

4.2 一些常用的filters

你可以设置条件来触发filters

4.3 一些常用的outputs

一个event可以穿过多个outputs,直到所有的output处理完毕,event也就结束了。

4.4 一些常用的Codecs

codecs是基础的流filters,它可以作为input和output的一部分。

4.5 Execution Model(执行模型)

每个input都在其自己的线程中运行,
inputs将event写入到一个通用的Java SynchronousQueue(Java同步队列)。
这个队列不持有events,而是将他们发送到一个空闲的worker(如果没有worker是空闲的就等待阻塞)。
每个pipeline worker thread从这个队列中取一批events,创建一个buffer,然后运行配置好的filters,然后运行配置好的outputs。

每批次获取的event数量(即size of batch)是可以配置的,worker的数量(即worker threads的数量)也是可以配置的。

默认情况下,Logstash在pipeline场景之间(即input->filter和filter->output)使用的是内存有界队列来缓存events。

如果Logstash不安全的退出,内存中存储的events会丢失。

你可以让Logstash将in-flight events持久化到disk。(略)

5. 设置并运行Logstash

5.1 文件夹结构

5.1.1 tar文件解压后的文件夹结构

5.1.2 RPM安装包安装后的文件夹结构

5.2 logstash.yml文件

使用YAML格式编写。
如下所示:

pipeline:
  batch:
    size:125
    delay:5

也可以写成

pipeline.batch.size:125
pipeline.batch.delay:5

包含如下设置:

5.3 命令行参数

(同logstash.yml文件中的配置参数就略了)

5.4 Logging(Logstash自己的日志)

Logstash会记录内部的日志到LS_HOME/logs或/var/log/logstash目录下。
默认的log级别是INFO。
Logstash使用log4j2框架。

当调试时,将loglevel降到debug会收集到更详细的信息。
以前你只能对整个Logstash设置log级别,从5.0以后你可以为特定的子系统单独进行设置。
例如你正在调试output的问题,你可以只对output组件的loglevel进行设置。

Logstash使用一个log4j2.properties(该文件位于config文件夹中)来进行日志设置。
你可以直接修改这个文件,然后必须重启Logstash生效。

slowlog:

可以在logs文件夹中找到。
slowlog可以在logstash.yml中进行配置:

slowlog.threshold.warn (default: -1)
slowlog.threshold.info (default: -1)
slowlog.threshold.debug (default: -1)
slowlog.threshold.trace (default: -1)

默认情况下,这些值被设置为-1纳秒,表示不限制threshold,即没有slowlog。
例子

slowlog.threshold.warn 2s
slowlog.threshold.info 1s
slowlog.threshold.debug 500ms
slowlog.threshold.trace 100ms

在上面的例子中,在一个filter中处理时间超过2s的events会被记录。
日志会记录event的全部和对造成slowness负责的filter配置。

Logging APIs

虽然你可以直接修改log4j2.properties文件并重启Logstash,但是这很无聊并可能会导致不必要的downtime。
你还可以 通过logging api 动态更新logging level.

例子1:设置Logstash的outputs子系统的elasticsearch模块的logger log级别。

PUT /_node/logging
输出:
{ "logger.logstash.outputs.elasticsearch" : "DEBUG" }

例子2:获取各子系统的运行时 logging list

GET /_node/logging?pretty
#可以看到一堆的loggers及其loglevel。

5.5 关闭Logstash:

systemctl stop logstash
kill -SIGTERM {logstash_pid} # 或 kill -15 或 kill

在Logstash安全关闭前,它会:

下列的情形会影响shutdown的过程:

这些情况会使得shutdown过程的时间和成功变得不可预知。
Logstash有一个stall(陷入泥潭)检测机制,分析在shutdown期间pipeline和插件的行为。这个机制产生周期性的信息,包含了在内部队列中inflight event的数量和繁忙的worker线程列表。

可以使用--pipeline.unsafe_shutdown来强制关闭。

6. 配置Logstash

6.1 配置文件示例:

注释使用#开头,并且不需要必须在行的开头

input {
  file {
    path => "/var/log/messages"
    type => "syslog"
  }

  file {
    path => "/var/log/apache/access.log"
    type => "apache"
  }
}

插件可以要求配置项的值必须是特定类型,支持array、lists、boolean、bytes、codec、hash、number、password、uri、path、string

my_bytes => "1113"   # 1113 bytes
my_bytes => "10MiB"  # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes
match => {
  "field1" => "value1"
  "field2" => "value2"
  ...
}

6.2 在pipeline配置文件中访问Event Data和Fields:

events都拥有属性,例如一个apache access log中应该会有状态码(200,404)、请求路径(/或index.html)、HTTP方法(POST/GET)、客户端IP地址等,我们称这些属性为Fields。

一些配置选项需要这些Fields存在才能工作。因为inputs用于生成events,所以这时候还没有任何fields!

使用 ${[FieldReference]} 来引用字段内容,例如 ${type} === ${[type]}

Field references(引用)
语法:[fieldname]

如果你引用的是最顶级的field,你可以省略[],只是简单的使用fieldname。
如果你要引用一个nested field,你可以指定全路径: [top-level field][nested field]。

例如 对如下数据

{
  "agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
  "ip": "192.168.24.44",
  "request": "/index.html"
  "response": {
    "status": 200,
    "bytes": 52353
  },
  "ua": {
    "os": "Windows 7"
  }
}

上面的数据有5个top-level fields,有3个nested fields。
要引用 os field,你可以使用[ua][os]。
例子:

output {
  statsd {
     increment => "apache.%{[response][status]}"
  }
}
Conditionals(条件执行)

只对特定条件的event进行 filter和output
Conditionals 支持if、else if和else的嵌套。
其语法是

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

EXPRESSION(表达式)是什么?是比较测试、布尔值逻辑等等

表达式可以很长并且很复杂。
例子:

filter {
  if [action] == "login" {
    mutate { remove_field => "secret" }
  }
}
特殊的fields:
1、@metadata field

@metadata field 在logstash 1.5之后引入,主要用于conditionals。
@metadata的内容不是events的内容。

rubydebug codec 允许你暴露@metadata的内容,如下所示:

stdout { 
  codec => rubydebug { 
      metadata => true 
   } 
}

6.3 在pipeline配置文件中使用环境变量

6.4 logstash配置例子

处理apache2 access logs的例子
input {
  file {
    path => "/tmp/access_log"
    start_position => "beginning"
  }
}

filter {
  if [path] =~ "access" {
    mutate { 
        replace => { 
           "type" => "apache_access" 
        } 
    }
    grok {
      match => { 
            "message" => "%{COMBINEDAPACHELOG}" 
      }
    }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
  }
  stdout { 
    codec => rubydebug 
  }
}
处理 syslog messages的例子
input {
  tcp {
    port => 5000
    type => syslog
  }
  udp {
    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

output {
  elasticsearch { 
    hosts => ["localhost:9200"] 
  }
  stdout { 
    codec => rubydebug 
  }
}

6.5 重新加载配置文件:

重新加载配置文件的方式:

在配置文件reload期间,JVM不会重启。

对grok pattern files的更改也会被reloaded,但是不会触发reload。

6.6 处理多行的events

要处理多行的events,Logstash需要知道哪些行是一个单独events的组成部分。
优选的工具是multiline codec,它可以将一个单独的input的多个行合并到一个单独的行。
multiline codec最重要的配置部分是:

例子1 处理Java Stack Traces

java的栈追踪信息除了第一行之外其他都以空格开头,以下就表示所有以空格开头的line都是之前行的一部分

input {
  stdin {
    codec => multiline {
      pattern => "^\s"
      what => "previous"
    }
  }
}
例子2 有些多行的日志在未结束时会在行尾加\

以下就表示所有以\结尾的line都是之后行的一部分

filter {
  multiline {
    type => "somefiletype"
    pattern => "\\$"
    what => "next"
  }
}
例子3 服务的活动日志一般都以一个时间戳开始,然后是其他信息

以下就表示所有不是(negate=true)以时间戳开头的行都是前面行的一部分

input {
  file {
    path => "/var/log/someapp.log"
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601} "
      negate => true
      what => previous
    }
  }
}

6.7 glob pattern支持:

7. 部署并scaling Logstash:

7.1 最小化安装: 一个Logstash实例和一个elasticsearch实例。

7.2 将Logstash分为shipping instance 和 indexing instance

8. 性能调优:

一次改一个配置,然后观察结果。不要一上来就更改pipeline.workers的数量。

性能检查列表(按顺序):

8.1 检查input source和output 目标的性能。

如果input源和output目标都不够快,那Logstash肯定也快不了。

8.2 检查系统信息:

8.3 检查JVM heap:

很多时候CPU的性能降低都是因为 heap size太小,导致JVM频繁的进行垃圾回收。
一个简单的方法是 将heap size变成原来的双倍大小,然后观察性能改善。
不要将heap size设置超过实际的物理内存大小。
至少留1GB给OS和其他进程。
你可以使用jmap命令行工具或使用VisualVM来精确地度量JVM heap。

8.4 调优worker设置:

8.5 调优并Profiling Logstash performance:

Logstash提供了如下的pipeline性能调优选项:

9. 监控APIs

通用选项:
节点信息API

10. plugins插件

bin/logstash-plugin 脚本用于管理插件的生命周期。
你可以使用这个脚本安装、移除、升级插件。

10.1 列出插件:列出当前可用的插件

logstash-plugin list  #列出所有已安装的插件
logstash-plugin list --verbose  #显示插件的版本信息
logstash-plugin list '*namefragment*' #搜索创建
logstash-plugin list --group output #列出output(input、filter、codec)插件

10.2 安装插件:

export HTTP_PROXY=http://127.0.0.1:3128 #设置代理
logstash-plugin install logstash-output-kafka #首先你需要联网,能够访问RubyGems.org来获取插件,一旦安装完毕,你就可以在配置文件中使用它们。
logstash-plugin install /path/to/logstash-output-kafka-1.0.0.gem #本地安装

关于--path.plugins指定插件文件夹,前面已经说了,这个文件夹的目录架构应该是PATH/logstash/TYPE/NAME.rb,TYPE是input、output、filter、codec,NAME就是插件名称。

10.3 更新插件:

logstash-plugin update [logstash-output-kafka] #更新所有插件或指定插件

10.4 删除插件:

logstash-plugin remove logstash-output-kafka

10.5 离线插件包制作:

logstash-plugin prepare-offline-pack --output OUTPUT [PLUGINS]
# OUTPUT指定压缩的插件包要被写往何处,默认是/LOGSTASH_HOME/logstash-offline-plugins-5.2.2.zip
# [PLUGINS] 指定要打进插件包中的插件(一个或多个,可以使用通配符)

例子

bin/logstash-plugin prepare-offline-pack logstash-input-beats 
bin/logstash-plugin prepare-offline-pack logstash-filter-* 
bin/logstash-plugin prepare-offline-pack logstash-filter-* logstash-input-beats

10.6 离线插件包使用:

logstash-plugin install file:///path/to/logstash-offline-plugins-5.2.2.zip

10.7 Event API:

供插件开发者使用和Ruby filter使用,Ruby filter例子

filter {
  ruby {
    code => 'event.set("lowercase_field", event.get("message").downcase)'
  }
}

11. input插件

https://www.elastic.co/guide/en/logstash/current/input-plugins.html
elastic支持的插件查看 https://www.elastic.co/support/matrix#show_logstash_plugins
这里只列出常用的

beats

接收elastic beats框架发出的events

elasticsearch:

读取elasticsearch集群的查询结果,这对重现test logs、reindexing有帮助。
例子:

input {
  # Read all documents from Elasticsearch matching the given query
  elasticsearch {
    hosts => "localhost"
    query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
  }
}

这会创建一个如下格式的elasticsearch query:

curl 'http://localhost:9200/logstash-*/_search?&scroll=1m&size=1000' -d '{
  "query": {
    "match": {
      "statuscode": 200
    }
  },
  "sort": [ "_doc" ]
}'
eventlog

拉取windows的event log。

exec

捕获shell命令的输出作为event。周期性的运行一个shell命令并捕获所有的输出作为event。

file

从文件流获取events,一般来说是根据tail -fn0来读取文件发生的变化,但是也可以从头开始读文件。一般是一行作为一个event,如果你想将多行合并到一个event中,你可能需要使用multiline codec 或 filter。这个插件目标是最终文件的变化,而不是从头到尾地读取文件然后将其整个内容合并到一个单独的event中。 插件会记录当前读取到的位置(通过sincedb文件),这就让即使logstash重启也不会丢失lines。默认情况下,sincedb文件位于运行logstash的用户根目录下,其文件名中有被读取的文件的名称,如果这个文件被重命名了,那么sincedb也就失去作用了。 处理File rotation:可以检测到,不管file是通过重命名还是copy操作被rotated了。

generator

生成随机的log event,用于测试

heartbeat

生成heartbeat event,用于测试

http

通过HTTP或HTTPS接收event

http_poller

解码一个HTTP API的输出到events

log4j

通过TCP socket接收来自log4j SocketAppender 的event。只对log4j 1.x有效。

pipe

接收来自一个长时间运行的command pipe的流events

redis

从Redis实例读取events

sqlite

基于SQLite数据库的rows创建events

stdin

从标准输入读取events

syslog

读取syslog message作为event。syslog是一个非常混乱的术语。这个input仅支持RFC3164加一些小的修改。日期格式必须是RFC3164格式或ISO8601。其它部分也必须遵守RFC3164。如果你不使用RFC3164,不要使用这个input。

tcp

从TCP socket读取event。和stdin和file inputs插件类似,每一个event被假定为一行文本。其可配置项有

add_field=>{}
codec=>"plain"
host=>"0.0.0.0"
id="someStr" #为插件的configuration增加一个唯一ID,如果没有提供ID,logstash会生成一个。强烈推荐你设置一个ID。
mode=>"server" #还可以设置为client,表示连接到一个server
port=>"some number"
proxy_protocol=>false #目前只支持v1
tags=>[ {id => 1,name => bob},{id => 2,name => jane} ] #设置任意数量的任意tags
type=>"someStr" #type一般用于启动filter。
udp

从UDP 读取events

unix

从UNIX socket读取event

xmpp

通过XMPP/Jabber协议接收events

log4j2

(实测安装有问题) 项目主页 https://github.com/jurmous/logstash-log4j2
安装 bin/logstash-plugin install logstash-input-log4j2 #如果不能安装,请先运行logstash
查看 bin/logstash-plugin list | grep log4j2
配置文件

input {
  log4j2 {
    port => 7000
    mode => "server"
  }
}

output {
  stdout { codec => rubydebug }
}

12. output插件

https://www.elastic.co/guide/en/logstash/current/output-plugins.html

elasticsearch

存储logs到Elasticsearch。

file

将event写到磁盘上。

http
pipe

将event的输出作为另一个程序的标准输入

redis

将events发送到一个Redis queue,使用RPUSH命令。

stdout
tcp
udp

13. filter插件

https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

aggregate

将属于一个task的多个events的信息聚合在一起,并最终聚合到最后一个events中。一定要将worker数量设置成1,否则处理顺序错乱会导致未知的结果。

alter

允许你做一些 在mutate filter中没有包含的对field的操作。

cidr

检查IP地址是否包含在network blocks中

clone

复制Event

collate

校对Event,通过time或count

csv

解析逗号分割的值到不同的fields

date

解析fields中的date作为Logstash的timestamp 。
例子:第一个值是field的name,然后是可能的时间格式

date {
  match => [ "logdate", "MMM dd yyyy HH:mm:ss","MMM  d yyyy HH:mm:ss", "ISO8601" ] 
}
dissect

使用delimiters(定界符)来将数据分割到Fields

dns

执行一个标准的或反的DNS lookup

drop

丢掉所有events

elapsed

计算两个events之间消耗的时间

elasticsearch

拷贝前一个event的fields到当前的events

environment

存储环境变量作为metadata sub-fields。

extractnumbers

从一个字符串中提取数字

geoip

增加IP地址的地理信息

grok

使用正则来解析非结构化的数据。
Logstash有大约120个默认的patterns。
语法:

%{NUMBER:duration} %{IP:client}

NUMBER、IP是pattern,duration、client是解析后的field名称
整体的例子:

grok {
  match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
i18n

移除fields中的特殊字符

json

解析json events。
例子: target不写则默认被存储到event的root(top level)中

json { 
  source => "message" 
  target="doc" 
}
json_encode

将一个field序列化成JSON

kv

解析key-value pairs

metaevent

增加任意的fields到event

mutate

对fields执行变化操作。
例子

# 将field1的value转换成integer类型
mutate { 
  convert => { "field1" => "integer" }
}

# 替换
mutate { 
  gsub => [ "field1","/","_", "field2","[\\?#-]","." ]
} 

# field1如果不是array类型就没有效果
mutate { 
  join => { "field1" => "," } 
} 

mutate { 
  lowercase => [ "field1" ]
} 

# 将两个field合并
mutate { 
  merge => { "dest_field" => "added_field" } 
} 
range

对指定的字段检查size或length limits

ruby

执行任意的ruby代码

sleep

睡眠特定的时间

split

将多行的信息split成单独的events

translate

将field的内容替换。

useragent

将user agent字符串解析到fields中

uuid

增加一个uuid到events

xml

将xml解析到fields。

14. codec插件

https://www.elastic.co/guide/en/logstash/current/codec-plugins.html

json

如果数据是一个JSON array,会创建多个events。
如果你要解析\n分割的JSON消息,请使用json_lines。
如果接收到的payload不是JSON,它会使用plain text并增加一个_jsonparsefailure的tag,然后payload被存储到message field中。

json_lines
line
multiline

将多行messages压缩合并成一个单独的事件,最初的目标是合并java exception and stacktrace和使用\分割的多行。

plain
rubydebug

将你输出的event使用Ruby Awesome Print库格式化。

上一篇下一篇

猜你喜欢

热点阅读