filebeat 替换logstash采集日志实验场景

2019-03-07  本文已影响0人  周周周_e600
image

解决标签tag问题

场景 将ELK结构切换至filebeat +KAFKA +ELK架构(这里以NGINX做简单实验)

需要解决的问题和遇见的难题

filebeat 配置文件

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /log-dir/access.log
  fields: 
    log_type: access
- type: log
  enabled: true
  paths:
    - /log-dir/error.log
  fields: 
    log_type: error
output.kafka:
  enabled: true

  hosts: ["10.66.200.220:9092","10.66.201.120:9092","10.66.202.201:9092"]

  topic: nginx_logs_app

logstash 配置文件

input {
    kafka {
        bootstrap_servers => ["10.66.200.220:9092,10.66.201.120:9092"] ## 多kafka broker写法
        topics => ["nginx_logs_app"] ## 消费的topic
        group_id => "idaas_log"  ## 用于kafka区分消费者
        codec => "json" ## 重要,定义读取过来的数据为json,则filter直接过滤
        consumer_threads => 10 ## 这个数不能大于 kafka 的分区数
        decorate_events => true ### 是否将kafka消息的元数据加入ES

    }
}
filter {
    if [fields][log_type] == "access" { ## 根据filebeat 添加的自定义字段来区分不同的日志
    mutate{
        split => ["message"," "]

        add_field =>   {
            "ip" => "%{[message][0]}"
        }
        add_field =>   {
            "method" => "%{[message][5]}"
        }
        add_field =>   {
            "statuscode" => "%{[message][8]}"
        }
        remove_field => [ "message" ]
    }
    }
}
output {
    if [fields][log_type] == "access" {
  elasticsearch {
    hosts => "elasticsearchlog-lb.elasticsearch-log"
    ssl => false
    index => "filebeat_test"
  }
    } else  if [fields][log_type] == "error" {
  elasticsearch {
    hosts => "elasticsearchlog-lb.elasticsearch-log"
    ssl => false
    index => "filebeat_error_test"
  }
    }

}
上一篇 下一篇

猜你喜欢

热点阅读