DBA

使用Elastic Stack搭建日志集中分析平台

2019-08-01  本文已影响8人  mysia

前言

一般我们需要进行日志分析场景:直接在日志文件中 grep、awk 就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理,所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。

一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块,构建一套集中式日志系统,可以提高定位问题的效率。

一个完整的集中式日志系统,需要包含以下几个主要特点:

ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。

Elastic Stack简介

Elastic Stack包括Beats、Elasticsearch、Logstash、Kibana、APM等,ELK是其核心套件。

系统架构

第一种ELK架构,是最简单的一种ELK架构方式。优点是搭建简单,易于上手。缺点是Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患。建议小规模集群使用。此架构首先由Logstash分布于各个节点上搜集相关日志、数据,并经过分析、过滤后发送给远端服务器上的Elasticsearch进行存储。Elasticsearch将数据以分片的形式压缩存储并提供多种API供用户查询,操作。用户亦可以更直观的通过配置Kibana Web Portal方便的对日志查询,并根据数据生成报表。


基本ELK.png

第二种架构,引入了消息队列机制,位于各个节点上的Logstash Agent先将数据/日志传递给Kafka(或者Redis),并将队列中消息或数据间接传递给Logstash,Logstash过滤、分析后将数据传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了Kafka(或者Redis),所以即使远端Logstash server因故障停止运行,数据将会先被存储下来,从而避免数据丢失。这种架构适合于较大集群的解决方案,但由于Logstash中心节点和Elasticsearch的负荷会比较重,可将他们配置为集群模式,以分担负荷,这种架构的优点在于引入了消息队列机制,均衡了网络传输,从而降低了网络闭塞尤其是丢失数据的可能性,但依然存在Logstash占用系统资源过多的问题。


ELK进阶架构.png

第三种架构,引入了Logstash-forwarder。首先,Logstash-forwarder将日志数据搜集并统一发送给主节点上的Logstash,Logstash分析、过滤日志数据后发送至Elasticsearch存储,并由Kibana最终将数据呈现给用户。这种架构解决了Logstash在各计算机点上占用系统资源较高的问题。经测试得出,相比Logstash,Logstash-forwarder所占用系统CPU和MEM几乎可以忽略不计。另外,Logstash-forwarder和Logstash间的通信是通过SSL加密传输,起到了安全保障。如果是较大集群,用户亦可以如结构三那样配置logstash集群和Elasticsearch集群,引入High Available机制,提高数据传输和存储安全。更主要的配置多个Elasticsearch服务,有助于搜索和数据存储效率。但在此种架构下发现Logstash-forwarder和Logstash间通信必须由SSL加密传输,这样便有了一定的限制性。

Logstash-forwarder

第四种架构,将Logstash-forwarder替换为Beats。经测试,Beats满负荷状态所耗系统资源和Logstash-forwarder相当,但其扩展性和灵活性有很大提高。Beats platform目前包含有Packagebeat、Topbeat和Filebeat三个产品,均为Apache 2.0 License。同时用户可根据需要进行二次开发。这种架构原理基于第三种架构,但是更灵活,扩展性更强。同时可配置Logstash 和Elasticsearch 集群用于支持大集群系统的运维日志数据监控和查询。

ELK with Beats

一个例子:MySQL日志审计系统

MySQL日志审计系统,采用percona audit插件审计MySQL的访问情况,结果记录到指定文件中。通过Rsyslog将每个MySQL审计日志集中到Rsyslog Server的指定目录中,使用filebeat监控文件变化,上报到kafka。使用Logstash消费数据,把数据过滤切割后,写入ES中,用户通过kibana查询相关数据。

系统架构图如下:


MySQL审计日志系统.png

MySQL审计采用percona的审计插件,配置如下:

+----------------------------+---------------+
| Variable_name              | Value         |
+----------------------------+---------------+
| audit_log_buffer_size      | 1048576       |
| audit_log_exclude_accounts |               |
| audit_log_exclude_commands |               |
| audit_log_file             | audit.log     |
| audit_log_flush            | OFF           |
| audit_log_format           | OLD           |
| audit_log_handler          | FILE          |
| audit_log_include_accounts |               |
| audit_log_include_commands |               |
| audit_log_policy           | ALL           |
| audit_log_rotate_on_size   | 0             |
| audit_log_rotations        | 0             |
| audit_log_strategy         | ASYNCHRONOUS  |
| audit_log_syslog_facility  | LOG_USER      |
| audit_log_syslog_ident     | percona-audit |
| audit_log_syslog_priority  | LOG_INFO      |
+----------------------------+---------------+

收集到的审计日志,通过Rsyslog的imfile模块,采集审计日志,之后发送到Rsyslog Server。

input(type="imfile" File="audit.log" Tag="mysqlaudit" Severity="info" Facility="local2")
local2.info                             @Rsyslog.Server.dns:514

在Rsyslog上,创建相关目录接收上传的日志:

$template mysql_audit_file, "/data/log/mysql_audit_log/%$YEAR%/%$MONTH%/%$DAY%/%fromhost-ip%.log"
if ($syslogfacility-text == 'local2') and ($syslogseverity-text == 'info') then -?mysql_audit_file
& stop

Rsyslog上接收到的文件,通过filebeat上报kafka:

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /data/log/mysql_audit_log/*/*/*/*
  multiline:
    pattern: "^{\"audit_record\":"
    negate: true
    match: after
  document_type: mysqlauditlog

Logstash负责消费kafka的数据,过滤切割后,写入到ES中:

input {
  kafka {
    add_field => {"myid"=>"mysql_audit"}
    bootstrap_servers => ""
    group_id => "logstash-mysql-audit"
    topics => ["mysql_audit"]
    client_id => "mysql_audit"
    consumer_threads => 1
    auto_offset_reset => "latest"
    decorate_events => true
  }
}

filter {
  if [myid] == "mysql_audit" {
    ruby {
      code => "
        array=event.get('message')

        timestamp = array.scan(/\"@timestamp\":\"(.*?)\"/)
        if timestamp.length > 0
                event.set('timestamp',timestamp[0][0])
        end
 
        client_ip = array.scan(/\"client_ip\":\"(.*?)\"/)
        if client_ip.length > 0
                event.set('client_ip',client_ip[0][0])
        end

        request = array.scan(/\"request\":\"(.*?)\",/)
        if request.length > 0
                event.set('request',request[0][0])
        end

        status = array.scan(/\"status\":\"(.*?)\",/)
        if status.length > 0
                event.set('status',status[0][0])
        end

        bytes_in = array.scan(/\"bytes_in\":(\d+)/)
        if bytes_in.length > 0
                event.set('bytes_in',bytes_in[0][0])
        end 

        type = array.scan(/\"type\":\"(.*?)\",/)
        if type.length > 0
                event.set('type',type[0][0])
        end 

        query = array.scan(/\"query\":\"(.*?)\",/)
        if query.length > 0
                event.set('query',query[0][0])
        end

        bytes_out = array.scan(/\"bytes_out\":(\d+)/)
        if bytes_out.length > 0
                event.set('bytes_out',bytes_out[0][0])
        end

        responsetime = array.scan(/\"responsetime\":(\d+)/)
        if responsetime.length > 0
                event.set('responsetime',responsetime[0][0])
        end

        response = array.scan(/\"response\":\"(\{.*?\})\",/)
        if response.length > 0
                event.set('response',response[0][0])
        end 

        port = array.scan(/\"port\":(\d+)/)
        if port.length > 0
                event.set('port',port[0][0])
        end  

        client_port = array.scan(/\"client_port\":(\d+)/)
        if client_port.length > 0
                event.set('client_port',client_port[0][0])
        end

        method = array.scan(/\"method\":\"(.*?)\",/)
        if method.length > 0
                event.set('method',method[0][0])
        end

        ip = array.scan(/\"ip\":\"(.*?)\"/)
        if ip.length > 0
                event.set('ip',ip[0][0])
        end

        host = array.scan(/\"host\":\"(.*?)\"/)
        if host.length > 0
                event.set('host',host[0][0])
        end

      "
    }
    mutate {
      remove_field =>["message"]
    }
  }
}

output {
  if [myid] == "mysql_audit" {
    elasticsearch {
      hosts => [""]
      index => "mysql_audit-%{+YYYY-MM-dd}"
      codec => "json"
    }
  }
}

用户可以在kibana中查询自己所需的数据:


kibana

总结

目前,上报到公司kafka的日志,皆可接入数据库部门的ES,可通过kibana统一查询、分析,协助排查错误、分析性能。后续通过接入更多的beats组件,来丰富ES日志平台的使用场景。

上一篇下一篇

猜你喜欢

热点阅读