ela+fiebeat+logstash 容器化 kiban

2019-11-21  本文已影响0人  _str_

ela+fiebeat+logstash 容器化 kibana web数据展示 基础
之前我们已经可以把kibana把日志展示出来了 现在我们可以增加一些新的功能 比如增加过滤条件,在上面这个文章所构建的环境的基础上。

基本做法就是

在日志挖掘的logstash的配置文件中添加filter条件
这里是在 `logstash/logstash_stdout.conf`中添加

使用Grok Filter Plugin 编辑解析Web日志

现在,您有一个工作流程,从Filebeat读取日志行。但是您会注意到日志消息的格式不理想。您要解析日志消息以从日志中创建特定的命名字段。为此,您将使用grok过滤器插件。

因为grok过滤器插件在传入的日志数据中查找模式,所以配置插件需要您决定如何识别用例感兴趣的模式。来自Web服务器日志示例的代表行如下所示:

信息 字段名
IP Address clientip
User ID ident
User Authentication auth
timestamp timestamp
HTTP Verb verb
Request body request
HTTP Version httpversion
HTTP Status Code response
Bytes served bytes
Referrer URL referrer
User agent agent

使用方法,编辑 logstash/logstash_stdout.conf
写入下面的内容:

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}

假如你在这之前已经运行了logstashfilebeat。要想生效现在的过滤配置,您需要强制Filebeat从头开始读取日志文件。
不必重新启动Logstash来接收更改,但是需要删除 filebeat下的注册表文件 registry,此文件一般在安装目录下的 data目录下。

由于Filebeat存储在注册表中收集的每个文件的状态,因此删除注册表文件会强制Filebeat读取从头开始捕获的所有文件。

接下来,使用以下命令重新启动Filebeat即可

使用Geoip Filter插件修改数据
除了解析日志数据以获得更好的搜索之外,过滤插件也可以从现有数据中导出补充信息。例如,geoip插件会查找IP地址,从地址中导出地理位置信息,并将该位置信息添加到日志中。

Logstash实例配置为使用geoip过滤器插件,将以下行添加到文件的filter部分

geoip {
        source => "clientip"
    }

完整的示例:

input {
  beats {
    port => 5044
    host => "0.0.0.0"
  }
}
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}

output {
  elasticsearch {             -----输出到ela中
    hosts => ["elasticsearch:9200"]
    manage_template => false
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
  # stdout { codec => rubydebug }  # 假如有问题,可以打开此行进行调试
}

修改好保存,生效的话,同样先删除Filebeat 注册文件,之后重启 filebeat

kibana截图展示

image.png image.png image.png image.png image.png image.png
测试您的管道编辑
现在,Logstash管道配置为将数据索引到Elasticsearch集群中,您可以查询Elasticsearch

ela的三个表示对应关系 类似于mangodb

索引 ---- 一个数据库
类型 ---- 一个表
文档 ---- 一条数据

过滤nginxlogstash/logstash_stdout.conf

input {
  beats {
    port => 5044
    host => "0.0.0.0"
  }
}
filter {
  if ([fileset][module] == "nginx") {
    if ([fileset][name] == "access") {
      grok {
        match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
        remove_field => "message"
      }
      mutate {
        add_field => { "read_timestamp" => "%{@timestamp}" }
      }
      date {
        match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
        remove_field => "[nginx][access][time]"
      }
      useragent {
        source => "[nginx][access][agent]"
        target => "[nginx][access][user_agent]"
        remove_field => "[nginx][access][agent]"
      }
      geoip {
        source => "[nginx][access][remote_ip]"
        target => "[nginx][access][geoip]"
      }
    }
    else if [fileset][name] == "error" {
      grok {
        match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
        remove_field => "message"
      }
      mutate {
        rename => { "@timestamp" => "read_timestamp" }
      }
      date {
        match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
        remove_field => "[nginx][error][time]"
      }
    }
  }
}
output {
  elasticsearch {
    hosts => ["elasticsearch:9200"] 
    manage_template => false
    index => "ddd---%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
  # stdout { codec => rubydebug }  # 假如有问题,可以打开此行进行调试
}
上一篇 下一篇

猜你喜欢

热点阅读