一文学会logstash安装和使用

2021-06-22  本文已影响0人  sknfie

概述

Logstash是一款轻量级的、开源的日志收集处理框架,它可以方便的把分散的、多样化的日志搜集起来,并进行自定义过滤分析处理,然后传输到指定的位置,比如某个服务器或者文件。
Logstash的理念很简单,从功能上来讲,它只做三件事情:

Logstash实现的功能主要分为接收数据、解析过滤并转换数据、输出数据三个部分,这三个功能对应的插件依次是input插件、filter插件、output插件,其中,filter插件是可选的,其它两个是必须插件。

Logstash的安装

从elastic官网https://www.elastic.co/downloads/logstash 获取logstash安装包,这里下载的版本是logstash-7.10.2.tar.gz。
将下载下来的安装包直接解压到一个路径下即可完成logstash的安装。
这里我将logstash安装到主机(192.168.2.136)上,将logstash程序安装到/usr/local目录下,基本操作过程如下:

tar -zxvf logstash-7.10.2-linux-x86_64.tar.gz -C /usr/local
mv /usr/local/logstash-7.10.2 /usr/local/logstash

配置文件

logstash的配置文件在安装程序下的config子目录下:

1. 命令行方式

Logstash提供了一个shell脚本/usr/local/logstash/bin/logstash,可以方便快速的启动一个logstash进程,在Linux命令行下,运行如下命令启动Logstash进程:

cd /usr/local/logstash/
bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'

[2021-06-21T06:57:35,776][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
Hello world
{
      "@version" => "1",
       "message" => "Hello world",
          "host" => "centos7_9-mod",
    "@timestamp" => 2021-06-21T10:58:55.457Z
}
###2.配置文件启动方式

使用-e参数在命令行中指定配置是很常用的方式,但是如果logstash需要配置更多规则的话,就必须把配置固化到文件里,这就是logstash事件配置文件,如果把上面命令行执行的logstash命令,写到一个配置文件logstash-1.conf中,就变成如下内容:

input { 
  stdin { }
}
output {
  stdout { codec => rubydebug }
}

这就是最简单的Logstash事件配置文件。此时,可以使用logstash的-f参数来读取配置文件,然后启动logstash进程,操作如下:

bin/logstash -f config/logstash-1.conf

通过这种方式也可以启动logstash进程,不过这种方式启动的进程是在前台运行的,要放到后台运行,可通过nohup命令实现,操作如下:

nohup bin/logstash -f config/logstash-1 &
tail -f nohup.out

输入插件

1.基本语法组成

input {
输入插件
}
filter {
过滤匹配插件
}
output {
输出插件
}

Logstash配置文件有如下三部分组成,其中input、output部分是必须配置,filter部分是可选配置。

2.从文件读取数据

input {
    file {
        path => ["/var/log/secure"]
        type => "system"
        start_position => "beginning"
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}

start_position表示按时间戳记录的地方开始读取,如果没有时间戳则从头开始读取,默认情况下,logstash会从文件的结束位置开始读取数据。type用来标记事件类型,通常会在输入区域通过type标记事件类型。

bin/logstash -f config/logstash-file.conf

[2021-06-21T22:16:50,615][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
{
          "type" => "system",
       "message" => "Jun 21 06:28:22 centos7_9-mod sshd[18648]: Accepted password for root from 192.168.2.233 port 51559 ssh2",
    "@timestamp" => 2021-06-22T02:16:50.787Z,
          "host" => "centos7_9-mod",
      "@version" => "1",
          "path" => "/var/log/secure"
}
{
          "type" => "system",
       "message" => "Jun 21 06:28:22 centos7_9-mod sshd[18648]: pam_unix(sshd:session): session opened for user root by (uid=0)",
    "@timestamp" => 2021-06-22T02:16:50.788Z,
          "host" => "centos7_9-mod",
      "@version" => "1",
          "path" => "/var/log/secure"
}

3.从标准输入读取数据

input{
    stdin{
        add_field=>{"key"=>"ok"}
        tags=>["add field"]
        type=>"mytype"
    }
}
output {
    stdout{
        codec=>rubydebug    
    }
}

type和tags是logstash的两个特殊字段, type一般会放在input中标记事件类型, tags主要用于在事件中增加标签,以便在后续的处理流程中使用,主要用于filter或output阶段。

bin/logstash -f config/logstash-stdin.conf

hello
{
          "tags" => [
        [0] "add field"
    ],
      "@version" => "1",
           "key" => "ok",
       "message" => "hello",
    "@timestamp" => 2021-06-22T02:49:57.721Z,
          "host" => "centos7_9-mod",
          "type" => "mytype"
}

4.从网络读取TCP数据

input {
  tcp {
    port => "5044"
  }
}

filter {
  grok {
    match => { "message" => "%{SYSLOGLINE}" }
  }
}

output {
    stdout{
        codec=>rubydebug
    }
}

5044端口是logstash启动的tcp监听端口。注意这里用到了日志过滤"LogStash::Filters::Grok"功能.

bin/logstash -f config/logstash-net.conf

[2021-06-21T23:04:32,647][INFO ][logstash.inputs.tcp      ][main][b59e9fd08e530a78065b475d52a49eae15a65cead44875d4e7226628ab208d0c] Starting tcp input listener {:address=>"0.0.0.0:5044", :ssl_enable=>"false"}

//在node03上执行
yum install nc //没有nc先安装
nc 192.168.2.136 5044 < /var/log/secure

{
       "pid" => "9459",
       "message" => [
        [0] "Jun 21 23:10:37 node03 sshd[9459]: pam_unix(sshd:session): session opened for user root by (uid=0)",
        [1] "pam_unix(sshd:session): session opened for user root by (uid=0)"
    ],
    "@timestamp" => 2021-06-22T03:22:24.082Z,
          "host" => "192.168.2.193",
     "logsource" => "node03",
       "program" => "sshd",
          "port" => 44478,
      "@version" => "1",
     "timestamp" => "Jun 21 23:10:37"
}

编码插件

1、codec插件之plain

input{
    stdin {
    }
}
output{
    stdout {
         codec => "plain"
        }
}

plain是一个空的解析器,它可以让用户自己指定格式,也就是说输入是什么格式,输出就是什么格式。

bin/logstash -f config/logstash-plain.conf

hello 
2021-06-22T03:39:09.687Z centos7_9-mod hello

2、codec插件之json

input {
    stdin {
        }
    }
output {
    stdout {
        codec => json
        }
}

如果发送给logstash的数据内容为json格式,可以在input字段加入codec=>json来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。
如果想让logstash输出为json格式,可以在output字段加入codec=>json

bin/logstash -f config/logstash-json.conf

hello
{"@version":"1","@timestamp":"2021-06-22T03:43:07.588Z","message":"hello","host":"centos7_9-mod"}

输入是什么,输出是什么:

input {
    stdin {
        }
    }
output {
    stdout {
        codec => line { format => "%{message}"}
        }
}

bin/logstash -f config/logstash-json.conf
hello
hello

过滤器插件(Filter)

1、Grok 正则捕获

grok是一个十分强大的logstash filter插件,它可以通过正则解析任意文本,将非结构化日志数据格式为结构化和方便查询的结构。它是目前logstash 中解析非结构化日志数据最好的方式。

[root@centos7_9-mod patterns]# pwd
/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns
[root@centos7_9-mod patterns]# cat grok-patterns
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+
EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
。。。。。。

Grok 的语法规则是:
%{语法: 语义}
这里的“语法”指的就是匹配的模式,例如使用NUMBER模式可以匹配出数字,IP模式则会匹配出127.0.0.1这样的IP地址

input {
  stdin {
        }
}

filter {
  grok {
    match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
  }
}

output {
    stdout{
        codec=>rubydebug
    }
}

查看执行结果:

bin/logstash -f config/logstash-grok.conf

172.16.213.132 [16/Jun/2020:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
{
     "timestamp" => "16/Jun/2020:16:24:19 +0800",
      "referrer" => "\"GET / HTTP/1.1\"",
       "message" => "172.16.213.132 [16/Jun/2020:16:24:19 +0800] \"GET / HTTP/1.1\" 403 5039",
      "@version" => "1",
      "host" => "centos7_9-mod",
       "@timestamp" => 2021-06-22T08:05:06.358Z,
      "bytes" => "5039",
      "response" => "403",
      "clientip" => "172.16.213.132"
}

删除多余的message,时间格式化,删除字段

input {
  stdin {
        }
}

filter {
  grok {
    match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
    remove_field => [ "message" ]
  }
   date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
    mutate { 
       remove_field  =>  ["timestamp"] 
    }
}

output {
    stdout{
        codec=>rubydebug
    }
}

2、时间处理(Date)

下面是date插件的一个配置示例(这里仅仅列出filter部分):

filter {
    grok {
        match => ["message", "%{HTTPDATE:timestamp}"]
    }
    date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
}

date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp对象,然后转存到@timestamp字段里.

3、数据修改(Mutate)

mutate插件是Logstash另一个非常重要插件。它提供了丰富的基础类型数据处理能力。包括重命名、删除、替换和修改日志事件中的字段。

filter {    mutate {        convert => ["filed_name", "integer"]    }}  #字段类型转换功能
filter {    mutate {        gsub => ["filed_name_1", "/" , "_"]    }}  #正则表达式替换匹配字段
filter {    mutate {        split => {"filed_name_2", "|"}    }}  #分隔符分割字符串为数组
filter {    mutate {        rename => {"old_field" => "new_field"}    }}    #重命名字段
filter {    mutate {        remove_field  =>  ["timestamp"]    }}  #删除字段

举个例子:

input {
  stdin {
        }
}

filter {
  grok {
    match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
    remove_field => [ "message" ]
  }
   date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }
    mutate { 
       rename => {"response" => "response_new"} 
       convert => ["bytes", "float"] 
       gsub => ["referrer", "\"" , ""] 
       split => ["clientip", "."]
       remove_field  =>  ["timestamp"] 
    }
}

output {
    stdout{
        codec=>rubydebug
    }
}

172.16.213.132 [16/Jun/2020:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
{
           "bytes" => 5039.0,
            "host" => "centos7_9-mod",
      "@timestamp" => 2020-06-16T08:24:19.000Z,
    "response_new" => "403",
        "clientip" => [
        [0] "172",
        [1] "16",
        [2] "213",
        [3] "132"
    ],
        "referrer" => "GET / HTTP/1.1",
        "@version" => "1"
}

Logstash输出插件(output)

output {    stdout {        codec => rubydebug    }}  #输出到标准输出(stdout)
output {    file {        path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"    }    #保存为文件
output {    file {        path => “/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log.gz”        codec => line { format => “%{message}”}        gzip => true    }  #保存为文件(file),保存日志原始格式

输出到文件:

input {
  stdin {
        }
}

filter {
  grok {
    match => { "message" => "%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}" }
  }
  date {
        match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
  }
  mutate {
       remove_field  =>  ["timestamp"]
  }
}

output {
  file {
     path => "/data/log/%{+yyyy-MM-dd}/%{host}_%{+HH}.log" 
     codec => line { format => "%{message}" }
  } 
}

参考

Logstash 最佳实践:
http://doc.yonyoucloud.com/doc/logstash-best-practice-cn/index.html
grok调式工具:
http://grokdebug.herokuapp.com/

上一篇 下一篇

猜你喜欢

热点阅读