使用Elastic Stack搭建日志集中分析平台
前言
一般我们需要进行日志分析场景:直接在日志文件中 grep、awk 就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理,所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。
一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块,构建一套集中式日志系统,可以提高定位问题的效率。
一个完整的集中式日志系统,需要包含以下几个主要特点:
- 收集-能够采集多种来源的日志数据;
- 传输-能够稳定的把日志数据传输到中央系统;
- 存储-如何存储日志数据;
- 分析-可以支持 UI 分析;
- 警告-能够提供错误报告,监控机制;
ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。
Elastic Stack简介
Elastic Stack包括Beats、Elasticsearch、Logstash、Kibana、APM等,ELK是其核心套件。
-
Elasticsearch是实时全文搜索和分析引擎,提供搜集、分析、存储数据三大功能;是一套开放REST和JAVA API等结构提供高效搜索功能,可扩展的分布式系统。它构建于Apache Lucene搜索引擎库之上。
-
Logstash是一个用来搜集、分析、过滤日志的工具。它支持几乎任何类型的日志,包括系统日志、错误日志和自定义应用程序日志。它可以从许多来源接收日志,这些来源包括 syslog、消息传递(例如 RabbitMQ)和JMX,它能够以多种方式输出数据,包括电子邮件、websockets和Elasticsearch。
-
Kibana是一个基于Web的图形界面,用于搜索、分析和可视化存储在 Elasticsearch指标中的日志数据。它利用Elasticsearch的REST接口来检索数据,不仅允许用户创建他们自己的数据的定制仪表板视图,还允许他们以特殊的方式查询和过滤数据。
-
Beats是轻量级数据采集工具,包括:
- Packetbeat(搜集网络流量数据);
- Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据);
- Filebeat(搜集文件数据);
- Winlogbeat(搜集 Windows 事件日志数据)
- Metricbeat(收集系统级的 CPU 使用率、内存、文件系统、磁盘 IO 和网络 IO 统计数据);
- Auditbeat(采集linux审计日志);
系统架构
第一种ELK架构,是最简单的一种ELK架构方式。优点是搭建简单,易于上手。缺点是Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。建议小规模集群使用。此架构首先由Logstash分布于各个节点上搜集相关日志、数据,并经过分析、过滤后发送给远端服务器上的Elasticsearch进行存储。Elasticsearch将数据以分片的形式压缩存储并提供多种API供用户查询,操作。用户亦可以更直观的通过配置Kibana Web Portal方便的对日志查询,并根据数据生成报表。
基本ELK.png
第二种架构,引入了消息队列机制,位于各个节点上的Logstash Agent先将数据/日志传递给Kafka(或者Redis),并将队列中消息或数据间接传递给Logstash,Logstash过滤、分析后将数据传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了Kafka(或者Redis),所以即使远端Logstash server因故障停止运行,数据将会先被存储下来,从而避免数据丢失。这种架构适合于较大集群的解决方案,但由于Logstash中心节点和Elasticsearch的负荷会比较重,可将他们配置为集群模式,以分担负荷,这种架构的优点在于引入了消息队列机制,均衡了网络传输,从而降低了网络闭塞尤其是丢失数据的可能性,但依然存在Logstash占用系统资源过多的问题。
ELK进阶架构.png
第三种架构,引入了Logstash-forwarder。首先,Logstash-forwarder将日志数据搜集并统一发送给主节点上的Logstash,Logstash分析、过滤日志数据后发送至Elasticsearch存储,并由Kibana最终将数据呈现给用户。这种架构解决了Logstash在各计算机点上占用系统资源较高的问题。经测试得出,相比Logstash,Logstash-forwarder所占用系统CPU和MEM几乎可以忽略不计。另外,Logstash-forwarder和Logstash间的通信是通过SSL加密传输,起到了安全保障。如果是较大集群,用户亦可以如结构三那样配置logstash集群和Elasticsearch集群,引入High Available机制,提高数据传输和存储安全。更主要的配置多个Elasticsearch服务,有助于搜索和数据存储效率。但在此种架构下发现Logstash-forwarder和Logstash间通信必须由SSL加密传输,这样便有了一定的限制性。
Logstash-forwarder第四种架构,将Logstash-forwarder替换为Beats。经测试,Beats满负荷状态所耗系统资源和Logstash-forwarder相当,但其扩展性和灵活性有很大提高。Beats platform目前包含有Packagebeat、Topbeat和Filebeat三个产品,均为Apache 2.0 License。同时用户可根据需要进行二次开发。这种架构原理基于第三种架构,但是更灵活,扩展性更强。同时可配置Logstash 和Elasticsearch 集群用于支持大集群系统的运维日志数据监控和查询。
ELK with Beats一个例子:MySQL日志审计系统
MySQL日志审计系统,采用percona audit插件审计MySQL的访问情况,结果记录到指定文件中。通过Rsyslog将每个MySQL审计日志集中到Rsyslog Server的指定目录中,使用filebeat监控文件变化,上报到kafka。使用Logstash消费数据,把数据过滤切割后,写入ES中,用户通过kibana查询相关数据。
系统架构图如下:
MySQL审计日志系统.png
MySQL审计采用percona的审计插件,配置如下:
+----------------------------+---------------+
| Variable_name | Value |
+----------------------------+---------------+
| audit_log_buffer_size | 1048576 |
| audit_log_exclude_accounts | |
| audit_log_exclude_commands | |
| audit_log_file | audit.log |
| audit_log_flush | OFF |
| audit_log_format | OLD |
| audit_log_handler | FILE |
| audit_log_include_accounts | |
| audit_log_include_commands | |
| audit_log_policy | ALL |
| audit_log_rotate_on_size | 0 |
| audit_log_rotations | 0 |
| audit_log_strategy | ASYNCHRONOUS |
| audit_log_syslog_facility | LOG_USER |
| audit_log_syslog_ident | percona-audit |
| audit_log_syslog_priority | LOG_INFO |
+----------------------------+---------------+
收集到的审计日志,通过Rsyslog的imfile模块,采集审计日志,之后发送到Rsyslog Server。
input(type="imfile" File="audit.log" Tag="mysqlaudit" Severity="info" Facility="local2")
local2.info @Rsyslog.Server.dns:514
在Rsyslog上,创建相关目录接收上传的日志:
$template mysql_audit_file, "/data/log/mysql_audit_log/%$YEAR%/%$MONTH%/%$DAY%/%fromhost-ip%.log"
if ($syslogfacility-text == 'local2') and ($syslogseverity-text == 'info') then -?mysql_audit_file
& stop
Rsyslog上接收到的文件,通过filebeat上报kafka:
filebeat.prospectors:
- type: log
enabled: true
paths:
- /data/log/mysql_audit_log/*/*/*/*
multiline:
pattern: "^{\"audit_record\":"
negate: true
match: after
document_type: mysqlauditlog
Logstash负责消费kafka的数据,过滤切割后,写入到ES中:
input {
kafka {
add_field => {"myid"=>"mysql_audit"}
bootstrap_servers => ""
group_id => "logstash-mysql-audit"
topics => ["mysql_audit"]
client_id => "mysql_audit"
consumer_threads => 1
auto_offset_reset => "latest"
decorate_events => true
}
}
filter {
if [myid] == "mysql_audit" {
ruby {
code => "
array=event.get('message')
timestamp = array.scan(/\"@timestamp\":\"(.*?)\"/)
if timestamp.length > 0
event.set('timestamp',timestamp[0][0])
end
client_ip = array.scan(/\"client_ip\":\"(.*?)\"/)
if client_ip.length > 0
event.set('client_ip',client_ip[0][0])
end
request = array.scan(/\"request\":\"(.*?)\",/)
if request.length > 0
event.set('request',request[0][0])
end
status = array.scan(/\"status\":\"(.*?)\",/)
if status.length > 0
event.set('status',status[0][0])
end
bytes_in = array.scan(/\"bytes_in\":(\d+)/)
if bytes_in.length > 0
event.set('bytes_in',bytes_in[0][0])
end
type = array.scan(/\"type\":\"(.*?)\",/)
if type.length > 0
event.set('type',type[0][0])
end
query = array.scan(/\"query\":\"(.*?)\",/)
if query.length > 0
event.set('query',query[0][0])
end
bytes_out = array.scan(/\"bytes_out\":(\d+)/)
if bytes_out.length > 0
event.set('bytes_out',bytes_out[0][0])
end
responsetime = array.scan(/\"responsetime\":(\d+)/)
if responsetime.length > 0
event.set('responsetime',responsetime[0][0])
end
response = array.scan(/\"response\":\"(\{.*?\})\",/)
if response.length > 0
event.set('response',response[0][0])
end
port = array.scan(/\"port\":(\d+)/)
if port.length > 0
event.set('port',port[0][0])
end
client_port = array.scan(/\"client_port\":(\d+)/)
if client_port.length > 0
event.set('client_port',client_port[0][0])
end
method = array.scan(/\"method\":\"(.*?)\",/)
if method.length > 0
event.set('method',method[0][0])
end
ip = array.scan(/\"ip\":\"(.*?)\"/)
if ip.length > 0
event.set('ip',ip[0][0])
end
host = array.scan(/\"host\":\"(.*?)\"/)
if host.length > 0
event.set('host',host[0][0])
end
"
}
mutate {
remove_field =>["message"]
}
}
}
output {
if [myid] == "mysql_audit" {
elasticsearch {
hosts => [""]
index => "mysql_audit-%{+YYYY-MM-dd}"
codec => "json"
}
}
}
用户可以在kibana中查询自己所需的数据:
kibana
总结
目前,上报到公司kafka的日志,皆可接入数据库部门的ES,可通过kibana统一查询、分析,协助排查错误、分析性能。后续通过接入更多的beats组件,来丰富ES日志平台的使用场景。