ELK文集我爱编程大数据应用

logstash+kafka收集hue日志,分析用户下载数据情况

2018-05-21  本文已影响10人  jhonshonjs

1.文档编写目的

主要是为了给大家提供一种监控某个服务运行情况或者是针对该服务日志进行一些其他分析功能的技术选型方式。本篇文档将针对大数据集群的hue组件,用户在使用hue查询hive数据后会有一个下载数据的操作,这个操作将严重影响hue服务的稳定性。所以我们决定通过logstash+kafka的方式分析hue日志,监控下载数据的操作。

  1. 环境描述
  2. 利用logstash收集hue日志并发送到kafka
  3. 编写python代码消费Kafka消息,并分析日志,最后发送钉钉报警
  4. 验证报警机制
  1. logstash已安装并能正常运行
  2. Kafka已创建相关topic

2.环境描述

  1. hue的日志都分布在.........*上的/var/log/hue/目录下。
  2. logstash版本logstash-6.2.3(里面有个坑:这个版本必须用java8才能使用,由于生产环境都是jdk1.7的所以这里要单独配置环境:编辑bin/logstash 添加jdk1.8的路径)
  3. kafka地址:.........**,端口都是9092(由于版本较新不需要zookeeper的相关信息)。topic信息:hue-download-group。

3.logstash收集hue日志

4.代码编写

`from kafka import KafkaConsumer
import re
import requests
import json

dingTalkProdURL = '*****'

def findDownload(inputMessage):
#一条日志中包含‘download’就是在下载数据
if 'download' in inputMessage:
print inputMessage
elements = re.split(r"[ ]+", inputMessage)
# server ip , time , user ip , user name
return (elements[1],elements[3],elements[6],elements[7])

def postToDingTalk(monitorMessage):
#封装发送的消息
messageBody = {}
messageBody["msgtype"] = "markdown"
messageBody["markdown"] = {}
messageBody["markdown"]["title"] = "Hue Download Monitor"
messageBody["markdown"]["text"] = "### Download User Name: " + monitorMessage[3] + "\n" +
"1. User IP: " + monitorMessage[2] + "\n" +
"2. Download Time: " + monitorMessage[1] + "\n" +
"3. Hue Server: " + monitorMessage[0] + "\n"
messageBody["at"] = {}
messageBody["at"]["atMobiles"] = []
messageBody["at"]["isAtAll"] = 'false'

header = {}
header["Content-Type"] = "application/json; charset=utf-8"
print json.dumps(messageBody)
r = requests.post(dingTalkProdURL, json.dumps(messageBody), headers=header)
print r.status_code
print r.reason

if name == 'main':
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('hue-access-log',
group_id='hue-download-group',
bootstrap_servers=['...:9092', '...:9092', '...**:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: message.value.decode('utf-8')
# print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
# message.offset, message.key,
# message.value))
elementTuple = findDownload(message.value)
if elementTuple != None:
#将报警信息封装好后post到钉钉的接口
postToDingTalk(elementTuple)`

5.验证报警机制的准确性

6.总结

上一篇下一篇

猜你喜欢

热点阅读