Filebeat Logstash 日志预警
2021-12-03 本文已影响0人
深入浅出
1、Filebeat抓取日志,配置filebeat-demo.yml
# ================================== General ===================================
#用于缓冲要发布的事件的内部队列配置。默认mem(内存队列)
queue:
mem:
#内存队列的最大缓冲事件数
events: 2048
#发布所需的最小事件数,设置为0则发布事件直接输出使用,无需等待
flush.min_events: 1536
#达到flush.min_events的最大等待事件,设置为0则无需等待
flush.timeout: 1s
max_procs: 1 #设置可以同时执行的最大CPU数。默认值为系统中可用的逻辑CPU的数量
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log #日志输入
enabled: true #配置生效
paths: #采集日志路径
- /var/log/temp.log
include_lines: ['ERROR'] #包含的正则表达式列表,只采集包含ERROR的记录
harvester_buffer_size: 51200 #50k 每个收割机获取文件时使用的缓冲区大小
max_bytes: 10485760 #10M 单个日志消息的最大字节数,超出部分丢弃
scan_frequency: 10s #扫描频率,默认10秒
fields: #属性中添加附加信息的可选字段,区分数据来源
type: demo
ip: pre
fields_under_root: true #将自定义字段作为顶级字段存储到到输出文档中,默认false
# ================================== Outputs ===================================
output.logstash:
hosts: ["127.0.0.1:5044"]
2、Logstash接收日志并过滤及报警,配置logstash-demo.conf
input {
beats {
port => "5044"
}
}
filter {
if [type] == "droplog" {
#Grok正则捕获,从非结构化数据中派生出结构
grok {
match => [
"message", "\[%{LOGLEVEL:log_level}\]\[%{TIMESTAMP_ISO8601:log_date}\]\[%{DATA:log_file}\]%{GREEDYDATA:log_type}\|\|ctx_format=%{GREEDYDATA:log_format}\|\|msg=\[%{GREEDYDATA:req_method} failed\]\|\|requestParams=%{GREEDYDATA:req_params}"
]
remove_field => ["message","@version"]
}
#解析json结构的数据
json {
source => "req_params"
target => "json_content"
remove_field => "req_params"
}
#预警-数值统计
metrics {
# 定义metrics计数器数据保存的字段名
meter => [ "demo_event_%{log_level}" ]
# 给该metrics添加tag标签,用于区分metrics
add_tag => [ "demo_metric" ]
# 每隔5分钟统计一次
flush_interval => 300
# 每隔5分钟清空计数器
clear_interval => 300
}
if "droplog_metric" in [tags] {
ruby {
path => "/etc/logstash/warning.rb"
script_params => { "quantity" => 1 }
}
}
}
}
output {
if "droplog_metric" in [tags] {
#邮件预警
email {
port => "587"
address => "smtp.qq.com"
username => "from@qq.com"
password => "vvurrjfxjcpscaag"
authentication => "plain"
use_tls => true
from => "from@qq.com"
subject => "droplog告警"
to => "to@qq.com"
via => "smtp"
body => "预警:120秒内droplog日志超过3条,请注意排查!"
}
#短信预警
http {
http_method => "post"
url => "http://xxx.xxx.com/sms/batch_send"
format => "message"
message => '{"phone":["15811112222"],"content":"预警:300秒内droplog日志超过3条,请注意排查!"}'
}
}
#调用后台API接口
if [type] == "droplog" {
http {
http_method => "post"
url => "http://xxx.xxx.com/methodName"
format => "message"
message => '%{json_content}'
retry_failed => true
automatic_retries => 2
}
}
}
warning.rb
def register(params)
@limited_quantity = params["quantity"]
end
def filter(event)
if event.get("[demo_event_ERROR][count]") >= @limited_quantity
return [event]
else
return [] # return empty array to cancel event
end
end
QQ邮箱需要开启客户端授权
邮箱设置 > 账户
客户端授权.png
3 grok调试
- 安装并启动
elasticsearch&kibana
- 访问
grok
调试地址
http://127.0.0.1:5601/app/dev_tools#/grokdebugger
4 supervisor长期运行
在/opt/supervisor/conf/conf.d
目录下增加配置
logstash
配置
[program:logstash]
user=root
directory=/opt/logstash-7.15.2
command=/opt/logstash-7.15.2/bin/logstash -f /opt/logstash-7.15.2/config/logstash-demo.conf
autostart=true
autorestart=true
filebeat
配置
[program:filebeat]
user=root
directory=/opt/filebeat-7.15.2
command=/opt/filebeat-7.15.2/filebeat -c /opt/filebeat-7.15.2/filebeat-demo.yml
autostart=true
autorestart=true
常用命令
#加载配置
supervisorctl -c /opt/supervisor/conf/supervisord.conf update
#全部启动
supervisorctl -c /opt/supervisor/conf/supervisord.conf start all
#指定关闭
supervisorctl -c /opt/supervisor/conf/supervisord.conf stop logstash
supervisorctl -c /opt/supervisor/conf/supervisord.conf stop filebeat
#指定启动
supervisorctl -c /opt/supervisor/conf/supervisord.conf start logstash
supervisorctl -c /opt/supervisor/conf/supervisord.conf start filebeat
#指定重启
supervisorctl -c /opt/supervisor/conf/supervisord.conf restart logstash
#查看状态
supervisorctl -c /opt/supervisor/conf/supervisord.conf status