记录一次logstash消费kafka导入elasticsear

2017-12-13  本文已影响168人  玩玩风行啦

源json数据:

{"vaid": 4, "layer_type": 0, "clid": 4, "chid": "1073266302", "sid_type": 2, "create_time": 1512724518.1473277, "pid": 1777607729, "content_type": 3, "sid": 1142957589, "seq": 1771, "seq_duration": 1.0026666666666666, "sub_seq_count": 0, "stage": 2, "state": 5, "ctime": 1512726502.305, "mtime": 1512726502.3050795}

需求:
ctime、mtime和create_time转换为毫秒 存入es为date格式
seq_duration转换成float
sub_seq_count转换成integer

新建elasticsearch template

curl -X PUT http://10.86.0.107:9200/_template/live_seq_monitor -d '{
    "template": "live_seq_monitor-*",
    "settings": {
        "number_of_shards": 1,
        "index.refresh_interval": "5s"
    },
    "mappings": {
        "seq": {
            "properties": {
                "@timestamp": {
                    "type": "date"
                },
                "@version": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "chid": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "clid": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "content_type": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "create_time": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                },
                "ctime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                },
                "host": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "layer_type": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "mtime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                },
                "pid": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "seq": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "seq_duration": {
                    "type": "float"
                },
                "sid": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "sid_type": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "stage": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "state": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "sub_seq_count": {
                    "type": "integer"
                },
                "vaid": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                }
            }
        }
    }
}'

设置logstash配置文件

input {
    kafka {
        bootstrap_servers => "10.86.0.106:9092"
                topics => ["live_seq_monitor"]
        group_id => "logstash"
        auto_offset_reset => "earliest"
        consumer_threads => 6
        decorate_events => true
  }
}
filter {
    json {
        source => "message"
        remove_field => [ "message"]
    }
    ruby {
        code => "
            event.set('create_time',event.get('create_time')*1000)
            event.set('ctime',event.get('ctime')*1000)
            event.set('mtime',event.get('mtime')*1000)
        "
    }
    mutate {
        convert => [
        "sub_seq_count","integer",
        "ctime","integer",
        "mtime","integer",
        "create_time","integer",
        "seq_duration","float",
        "sid_type","string",
        "clid","string",
        "pid","string",
        "chid","string",
        "layer_type","string",
        "sid","string",
        "vaid","string",
        "content_type","string",
        "stage","string",
        "state","string",
        "seq","string"
        ]
    }
}
output {
    stdout {
        codec => rubydebug
    }
    elasticsearch {
        index => "live_seq_monitor-%{+YYYY.MM.dd}"
        hosts => ["10.86.0.107:9200"]
    }
}

之前测试时候用命令行测试踩了很多坑,写入配置文件就不报错了
ruby code简单的调用event的set和get设置字段

bin/logstash -e 'input { stdin { } } filter { json {source => "message" } ruby{code => "event.get('message')" }  mutate{convert => ["sub_seq_count","integer","seq_duration","float","sid_type","string","clid","string","pid","string","chid","string","layer_type","string","sid","string","vaid","string","content_type","string","stage","string","state","string","seq","string"]}} output { stdout {codec => rubydebug} elasticsearch{index => "live_seq_monitor-%{+YYYY.MM.dd}" hosts => ["127.0.0.1:9200"] } }'
上一篇下一篇

猜你喜欢

热点阅读