收藏

Filebeat Logstash 安装配置

2021-12-08  本文已影响0人  深入浅出

1 安装及配置filebeat

1.1 安装

文件filebeat-7.15.2-linux-x86_64.tar.gz下载至/opt/src目录

[root@neiwang ~]# cd /opt/src/
[root@neiwang src]# tar -zxvf filebeat-7.15.2-linux-x86_64.tar.gz -C /opt/
[root@neiwang ~]# cd /opt/
[root@neiwang opt]# mv filebeat-7.15.2-linux-x86_64/ filebeat-7.15.2/

1.2 配置

[root@neiwang ~]# cd /opt/filebeat-7.15.2
[root@neiwang config]# vim filebeat-demo.yml

filebeat-demo.yml添加如下配置

#用于缓冲要发布的事件的内部队列配置
queue:
  #内存队列
  mem:
    #内存队列的最大缓冲事件数
    events: 2048
    #发布所需的最小事件数,设置为0则发布事件直接输出,无需等待
    flush.min_events: 1536
    #达到flush.min_events的最大等待事件,设置为0则无需等待
    flush.timeout: 1s
#设置可以同时执行的最大CPU数。默认值为系统中可用的逻辑CPU的数量
max_procs: 1

filebeat.inputs:
#日志输入
- type: log
  #配置生效
  enabled: true
  #采集日志路径
  paths:
    - /data/ecms-8030.log
    - /data/ecms-8031.log
  #包含的正则表达式列表,只采集包含ERROR的记录
  include_lines: ['ERROR']
  #50k 每个收割机获取文件时使用的缓冲区大小
  harvester_buffer_size: 51200
  #10M 单个日志消息的最大字节数,超出部分丢弃
  max_bytes: 10485760
  #扫描频率,默认10秒
  scan_frequency: 10s
  #匹配所有以 [ 开头的行,并且后面非以 [ 开头的行将被追加到匹配的那一行。
  #multiline.pattern: '^\['
  #匹配所有以 [xxxx-xx-xx 开头的行,并且后面非以 [xxxx-xx-xx 开头的行将被追加到匹配的那一行。
  multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2}'
  multiline.negate: true
  multiline.match: after
  #属性中添加附加信息的可选字段,区分数据来源
  fields:
    type: ecms
    env: prod
  #将自定义字段作为顶级字段存储到到输出文档中,默认false
  fields_under_root: true

#日志输入
- type: log
  #配置生效
  enabled: true
  #采集日志路径
  paths:
    - /opt/logstash-7.15.2/logs/logstash-plain.log
  #包含的正则表达式列表,只采集包含ERROR的记录
  include_lines: ['ERROR']
  #50k 每个收割机获取文件时使用的缓冲区大小
  harvester_buffer_size: 51200
  #10M 单个日志消息的最大字节数,超出部分丢弃
  max_bytes: 10485760
  #扫描频率,默认10秒
  scan_frequency: 10s
  #属性中添加附加信息的可选字段,区分数据来源
  fields:
    type: logstash
    env: prod
  #将自定义字段作为顶级字段存储到到输出文档中,默认false
  fields_under_root: true

output.logstash:
  hosts: ["127.0.0.1:5044"]

1.3 启动

[root@neiwang ~]# /opt/filebeat-7.15.2/filebeat -e -c /opt/filebeat-7.15.2/filebeat-demo.yml

2 安装及配置logstash

2.1 安装

文件logstash-7.15.2-linux-x86_64.tar.gz下载至/opt/src目录

[root@neiwang ~]# cd /opt/src/
[root@neiwang src]# tar -zxvf logstash-7.15.2-linux-x86_64.tar.gz -C /opt/

2.2 配置

处理Filebeat发来的数据

[root@neiwang ~]# cd /opt/logstash-7.15.2/config/
[root@neiwang config]# vim logstash-demo.conf

logstash-demo.conf添加如下配置

input {
  beats {
    port => "5044"
  }
}

filter {
  if [type] == "ecms" {
    #Grok正则捕获,从非结构化数据中派生出结构
    grok {
      match => [
        "message", "\[%{TIMESTAMP_ISO8601:log_date}\] \[%{DATA:thread}\] %{LOGLEVEL:log_level} %{DATA:class} \- %{GREEDYDATA:log_msg}"
      ]
    }
    
    #预警-数值统计
    metrics {
      #定义metrics计数器数据保存的字段名
      meter => [ "ecms_event_%{log_level}" ]
      #给该metrics添加tag标签,用于区分metrics
      add_tag => [ "ecms_metric" ]
      #每隔5分钟统计一次
      flush_interval => 300
      #每隔5分钟清空计数器
      clear_interval => 300
    }
  
    if "ecms_metric" in [tags] {
      ruby {
        #5分钟内异常日志数量大于等于1报警
        path => "/etc/logstash/warning.rb"
        script_params => { "quantity" => 1 }
      }
    }

    mutate {
      remove_field => ["message", "log_msg"]
    }
  }

  if [type] == "logstash" {
    #预警-数值统计
    metrics {
      #定义metrics计数器数据保存的字段名
      meter => [ "logstash_event_error" ]
      #给该metrics添加tag标签,用于区分metrics
      add_tag => [ "logstash_metric" ]
      #每隔5分钟统计一次
      flush_interval => 300
      #每隔5分钟(flush_interval + 1秒)清空计数器
      clear_interval => 301
      #10秒内的message数据才统计,避免延迟
      ignore_older_than => 10
    }
  
    if "logstash_metric" in [tags] {
      ruby {
        #如果日志级别是ERROR的日志count小于1条,就忽略此事件(即不发送任何消息)。
        code => 'event.cancel if event.get("[logstash_event_error][count]") < 1'
      }
    }
  }
}

output {
  if "ecms_metric" in [tags] {
    email {
      port           =>    "587"
      address        =>    "smtp-n.global-mail.cn"
      username       =>    "from@qq.com"
      password       =>    "123456"
      use_tls        =>    "true"
      from           =>    "from@qq.com"
      subject        =>    "万古云签日志报警-PROD"
      to             =>    "to@qq.com"
      via            =>    "smtp"
      body           =>    "预警:万古云签存在ERROR日志,请注意排查!"
    }
  }

  if "logstash_metric" in [tags] {
    email {
      port           =>    "587"
      address        =>    "smtp-n.global-mail.cn"
      username       =>    "from@qq.com"
      password       =>    "123456"
      use_tls        =>    "true"
      from           =>    "from@qq.com"
      subject        =>    "Logstash日志报警-PROD"
      to             =>    "to@qq.com"
      via            =>    "smtp"
      body           =>    "预警:Logstash存在ERROR日志,请注意排查!"
    }
  }
  
  if [type] == "ecms" {
    #输出到Redis
    redis {
      data_type => "list"
      key => "logstash-%{[type]}-%{[env]}-%{+yyyy.MM.dd}"
      host => "127.0.0.1"
      port => 6379
      db => 0
    }
  }

  if [type] == "logstash" {
    #输出到Redis
    redis {
      data_type => "list"
      key => "logstash-%{[type]}-%{[env]}-%{+yyyy.MM.dd}"
      host => "127.0.0.1"
      port => 6379
      db => 0
    }
  }
}

warning.rb

def register(params)
    @limited_quantity = params["quantity"]
end

def filter(event)
    if event.get("[ecms_event_ERROR][count]") >= @limited_quantity
        return [event]
    else
        return [] # return empty array to cancel event
    end
end

2.3 启动

[root@neiwang ~]# /opt/logstash-7.15.2/bin/logstash -f /opt/logstash-7.15.2/config/logstash-demo.conf

2.4 logback日志格式

<!-- 日志输出格式 -->
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
<property name="log.pattern" value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5level %logger{50} - %msg%n"/>

2.5 grok调试

  1. 安装并启动elasticsearch&kibana
  2. 访问grok调试地址
    http://115.28.77.238:5601/app/dev_tools#/grokdebugger

3 supervisor长期运行

/opt/supervisor/conf/conf.d目录下增加配置

[program:logstash]
user=root
directory=/opt/logstash-7.15.2
command=/opt/logstash-7.15.2/bin/logstash -f /opt/logstash-7.15.2/config/logstash-demo.conf
autostart=true
autorestart=true
[program:filebeat]
user=root
directory=/opt/filebeat-7.15.2
command=/opt/filebeat-7.15.2/filebeat -c /opt/filebeat-7.15.2/filebeat-demo.yml
autostart=true
autorestart=true
#加载配置
supervisorctl -c /opt/supervisor/conf/supervisord.conf update
#全部启动
supervisorctl -c /opt/supervisor/conf/supervisord.conf start all
#指定关闭
supervisorctl -c /opt/supervisor/conf/supervisord.conf stop filebeat
supervisorctl -c /opt/supervisor/conf/supervisord.conf stop logstash
supervisorctl -c /opt/supervisor/conf/supervisord.conf stop all
#指定启动
supervisorctl -c /opt/supervisor/conf/supervisord.conf start logstash
supervisorctl -c /opt/supervisor/conf/supervisord.conf start filebeat
#指定重启
supervisorctl -c /opt/supervisor/conf/supervisord.conf restart logstash
#查看状态
supervisorctl -c /opt/supervisor/conf/supervisord.conf status

4 文献

elastic stack:https://www.elastic.co/cn/elastic-stack/
beats:https://www.elastic.co/cn/beats/
Filebeat Reference:https://www.elastic.co/guide/en/beats/filebeat/current/index.html
logstash:https://www.elastic.co/cn/logstash/
Logstash Reference:https://www.elastic.co/guide/en/logstash/current/index.html
grok-patterns:https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns
Logstash 实用介绍:https://www.elastic.co/cn/blog/a-practical-introduction-to-logstash
ELK教程:http://docs.flycloud.me/docs/ELKStack/index.html

上一篇下一篇

猜你喜欢

热点阅读