storm capacity性能状态和kafka分区状态检查监控
2023-04-24 本文已影响0人
平凡的运维之路
storm 性能状态检查监控
代码说明
- 代码说明
#!/usr/bin/python3
from decimal import Decimal
# -*- coding:utf-8 -*-
import os,sys
#获取服务器版本信息,导入对应模块
VersionShell =" cat /etc/redhat-release |awk -F'release ' '{print $2}'|cut -d. -f1 | cut -b 1"
OSVersion = int([ Errorinfo for Errorinfo in os.popen(VersionShell) ][0].split("\n")[0])
if OSVersion == 5 or OSVersion == 4 or OSVersion == 6:
import time, logging.config, ConfigParser, requests, os
from decimal import Decimal
reload(sys)
sys.setdefaultencoding('utf8')
else:
import time, logging.config, configparser, requests, os
from decimal import Decimal
def get_component_info(storm_id):
TimeStamp = int(time.time())
user_headers = {
'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Encoding' : 'gzip,deflate',
'Accept-Language' : 'zh-CN,zh;q=0.9'}
#user_headers['Referer'] = storm_ui_curl + "/topology.html?id=" + storm_id
user_headers['Upgrade-Insecure-Requests'] = '1'
cloudRouteRsyncURL = storm_ui_curl + "/api/v1/topology/" + storm_id
logger.info("Requests Api Url: " + cloudRouteRsyncURL)
RetrunData=requests.get(cloudRouteRsyncURL, headers = user_headers)
RetrunData_json= RetrunData.json()
boltsMsg = RetrunData_json.get("bolts")
for msg in boltsMsg:
capacity = dict(msg).get("capacity")
boltId = dict(msg).get("boltId")
# logger.info(storm_id + " ===> " + boltId + " ===> " + str(capacity) )
if Decimal(capacity) >= Decimal(Capacity_threshold):
if "MongoBolt" in boltId:
msg = storm_id.split("-")[0] + "拓扑中的入库bolt," + boltId + "比Capacity性能指标设置阈值" +Capacity_threshold +"大,当前是:" + capacity + ",请检查storm服务状态或mongodb入库延迟问题及缓存数据清理!!!"
logger.error(msg)
elif "ComputeBolt" in boltId:
msg = storm_id.split("-")[0] + "拓扑中的计算bolt," + boltId + "比Capacity性能指标设置阈值" +Capacity_threshold +"大,当前是:" + capacity + ",请检查storm服务状态或mongodb入库延迟问题及缓存数据清理!!!"
logger.error(msg)
#获取topologies id
def get_topology_id():
id_list = []
TimeStamp = int(time.time())
user_headers = {
'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
'Accept' : 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Encoding' : 'gzip,deflate',
'Accept-Language' : 'zh-CN,zh;q=0.9'}
user_headers['Upgrade-Insecure-Requests'] = '1'
get_id_api = storm_ui_curl + "/api/v1/topology/summary"
logger.info("Requests Api Url: " + get_id_api + " get topology id ")
try:
RetrunRsyncData=requests.get(get_id_api, headers = user_headers,timeout=int(Timeout))
except Exception as Error:
logger.error("Calling interface Error Return: " + str(Error) + " exit!!!")
try:
result_msg= RetrunRsyncData.json()
for line in result_msg.get("topologies"):
storm_id = dict(line).get("id")
id_list.append(storm_id)
logger.info("get storm id list: " + str(id_list))
except Exception as ErrorMsg:
logger.error("Calling interface Error Return: " + str(ErrorMsg) + " exit!!!")
return id_list
#发送网管告警
def SendApex(ApexInfo):
GetDomainMsg = ApexInfo
if "True" == IsSendApex:
os.system("/bin/logger -p local0.crit \"ccod: result=ERROR "+ GetDomainMsg + " \"")
else:
logger.info("Not Send Rsyslog Info data to Apex ")
#超过三次不发送告警频次
def RepeatSendMsg(Msg):
#判断是00:10 时,则情况前面的记录信息
if time.localtime().tm_hour == 00 and time.localtime().tm_min == 10 :
os.popen("> " + WirteMsgfilename)
Shell = " grep '" + Msg + "' " + WirteMsgfilename + " |wc -l"
logger.info("Shell: " + Shell )
RunShell = os.popen(Shell)
ShellMsg = [msg.split("\n")[0] for msg in RunShell]
Countnum = int(ShellMsg[0])
if Countnum < 3:
os.popen("echo " + Msg + " >>" + WirteMsgfilename)
return True
else:
return False
def Check_kafka_Leader_status():
shell = ""
shell_msg = ""
if "True" == Is_open_sudo:
# shell = "sudo -i -u " + kafka_run_user + " /bin/sh -c ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper " + zookeerp_ip_port + " --topic " + topics + " |grep Leader'"
shell = "sudo -i -u " + kafka_run_user + " /bin/sh -c ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper " + zookeerp_ip_port + " |grep Leader | grep -v offset'"
else:
# shell = "/bin/sh -c ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper " + zookeerp_ip_port + " --topic " + topics + " |grep Leader'"
shell = "/bin/sh -c ' source ~/.bash_profile ; " + kafka_comm_path + " --describe --zookeeper " + zookeerp_ip_port + " |grep Leader | grep -v offset '"
shell_msg = os.popen(shell)
logger.info("run shell command info: " + shell)
msg_list = [msg for msg in shell_msg ]
logger.info("run shell msg return: " + str(msg_list) + " Partition num count: " + str(len(msg_list)))
if len(msg_list) != 0:
for Partition_info in msg_list:
Topic = Partition_info.split("\t")[1].split()[1]
Partition = Partition_info.split("\t")[2].split()[1]
Leader = Partition_info.split("\t")[3].split()[1]
if Leader == "-1":
alarm_msg = "kafka node Topic " + Topic + " Partition " + Partition + " Leader " + Leader + " No leader found for partition 0"
logger.error(alarm_msg)
SendApex(alarm_msg)
else:
alarm_msg = "kafka Node Topic " + Topic + " Partition " + Partition + " Leader " + Leader + " status ok"
logger.info(alarm_msg)
else:
logger.error(topics + " Not get Topic info " )
if __name__ == "__main__":
for dirpath in os.popen("pwd"):
dirpath = dirpath.strip('\n')
cfgpath = os.path.join(dirpath, "cfg/config.ini")
if OSVersion == 5 or OSVersion == 4 or OSVersion == 6:
conf = ConfigParser.ConfigParser()
else:
conf = configparser.ConfigParser()
conf.read(cfgpath)
# 基础配置加载
#LogRunSentence = " [ ! -d './log' ] && mkdir -p log"
#os.system(LogRunSentence)
logging.config.fileConfig("./cfg/logger.conf")
logger = logging.getLogger("rotatfile")
logger.setLevel(logging.INFO)
storm_ui_curl = conf.get("main", "storm_ui_curl")
Timeout = conf.get("main", "api_timeout")
Capacity_threshold = conf.get("main", "Capacity_threshold")
IsSendApex = conf.get("main", "IsSendApex")
WirteMsgfilename = conf.get("main", "WirteMsgfilename")
zookeerp_ip_port = conf.get("main", "zookeerp_ip_port")
kafka_comm_path = conf.get("main", "kafka_comm_path")
kafka_run_user = conf.get("main", "kafka_run_user")
Is_open_sudo = conf.get("main", "Is_open_sudo")
logger.info("Check Storm Capacity num Status ")
topology_id_list = get_topology_id()
# topology_id_list = topics_list
# topology_id_list = ["CallDetailTopology-6-1626660465"]
if len(topology_id_list) != 0:
for id in topology_id_list:
get_component_info(id)
logger.info("====================================")
logger.info("Check kafka Partition Leader Status ")
Check_kafka_Leader_status()
配置文件说明
- 配置说明
[main]
#storm ui地址
storm_ui_curl = http://172.16.100.90:8080
#超时时间
api_timeout = 2
#storm检查对应性能指标数,该数值大于1.0则基本上入库就有延迟
Capacity_threshold = 1.5
IsSendApex = True
#storm性能检查重复记录文件
WirteMsgfilename = "./log/DuplicateInfo.txt"
#zookeerp地址和端口
zookeerp_ip_port = 192.168.128.1:2181
#kafka脚本路径
kafka_comm_path = /data/zks/kafka_2.11-0.8.2.2/bin/kafka-topics.sh
#是否开启sudo权限,不开启sudo,则部署在kafka安装的用户下
Is_open_sudo = True
#kafka运行用户sudo时,需要指定用户
kafka_run_user = root
部署说明
-
部署方式
http://xxxx.xxx.xxxx/Deploymentpackage/CheckStormStatus.tar.gz 通过链接公司vpn进行 # [ccodsupport@xxx ] cd apex [ccodsupport@xxx apex ] tar xvf CheckStormStatus.tar.gz [ccodsupport@xxx cfg]$ more config.ini [main] #storm ui地址 storm_ui_curl = http://192.168.127.2:8080 #超时时间 api_timeout = 2 #storm检查对应性能指标数,该数值大于1.0则基本上入库就有延迟 Capacity_threshold = 1.5 #Capacity 阈值说明 #apacity:计算公式为Capacity = Spout 或者 Bolt 调用 execute 方法处理的消息数量 × 消息平均执行时间/时间区间。如果这个值越接近1,说明Spout或者Bolt基本一直在调用 execute 方法,因此并行度不够,需要扩展这个组件的 Executor数量 # IsSendApex = True #storm性能检查重复记录文件 WirteMsgfilename = "./log/DuplicateInfo.txt" #zookeerp地址和端口 zookeerp_ip_port = 192.168.127.2:2181 #kafka脚本路径 kafka_comm_path = /data/zks/kafka_2.11-0.8.2.2/bin/kafka-topics.sh #是否开启sudo权限 Is_open_sudo = True #kafka运行用户sudo时,需要指定用户 kafka_run_user = root
-
修改好配置文件,启动运行,查看是否有异常。
[ccodsupport@TK_MDB_1 CheckStormStatus]$ cd /home/ccodsupport/apex/CheckStormStatus && ./StormStatus
运行结果:
2023-03-16 15:09:29 139657615288064 StormStatus.py:151 INFO Check Storm Capacity num Status
2023-03-16 15:09:29 139657615288064 StormStatus.py:57 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/summary get topology id
2023-03-16 15:09:29 139657615288064 StormStatus.py:67 INFO get storm id list: [u'IVRMESSAGETopology-2-1666188471', u'TSrvappraiseTopology-4-1666188482', u'RAGSETopology-3-1666188476', u'CallDetailTopology-1-1666188466', u'BxRecordTopology-5-1666188486', u'SrRecordTopology-7-1666188497', u'FastdfsUrlTopology-6-1666188492', u'SdCallResultTopology-8-1666188503']
2023-03-16 15:09:29 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/IVRMESSAGETopology-2-1666188471
2023-03-16 15:09:31 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/TSrvappraiseTopology-4-1666188482
2023-03-16 15:09:34 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/RAGSETopology-3-1666188476
2023-03-16 15:09:43 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/CallDetailTopology-1-1666188466
2023-03-16 15:09:44 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/BxRecordTopology-5-1666188486
2023-03-16 15:09:44 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/SrRecordTopology-7-1666188497
2023-03-16 15:09:44 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/FastdfsUrlTopology-6-1666188492
2023-03-16 15:09:45 139657615288064 StormStatus.py:30 INFO Requests Api Url: http://192.168.127.2:8080/api/v1/topology/SdCallResultTopology-8-1666188503
2023-03-16 15:09:45 139657615288064 StormStatus.py:159 INFO ====================================
2023-03-16 15:09:45 139657615288064 StormStatus.py:160 INFO Check kafka Partition Leader Status
2023-03-16 15:09:45 139657615288064 StormStatus.py:107 INFO run shell command info: sudo -i -u kafka /bin/sh -c ' source ~/.bash_profile ; /home/kafka/kafka_2.11-0.8.2.2/bin/kafka-topics.sh --describe --zookeeper 172.16.100.18:2181 |grep Leader | grep -v offset'
2023-03-16 15:09:48 139657615288064 StormStatus.py:110 INFO run shell msg return: ['\tTopic: SmartDialerTest\tPartition: 0\tLeader: 1\tReplicas: 1\tIsr: 1\n', '\tTopic: agentProxy\tPartition: 0\tLeader: 2\tReplicas: 2\tIsr: 2\n', '\tTopic: agentStateDetail\tPartition: 0\tLeader: 2\tReplicas: 2,0,1\tIsr: 0,1,2\n', '\tTopic: call_detail\tPartition: 0\tLeader: 1\tReplicas: 1,0,2\tIsr: 0,1,2\n', '\tTopic: chatLog\tPartition: 0\tLeader: 2\tReplicas: 2,0,1\tIsr: 0,1,2\n', '\tTopic: ent_record_bx_table\tPartition: 0\tLeader: 1\tReplicas: 1,2,0\tIsr: 0,1,2\n', '\tTopic: ent_record_fastdfs_url\tPartition: 0\tLeader: 1\tReplicas: 1,0,2\tIsr: 0,1,2\n', '\tTopic: ent_record_sr_table\tPartition: 0\tLeader: 1\tReplicas: 1,0,2\tIsr: 0,1,2\n', '\tTopic: ivr_message\tPartition: 0\tLeader: 0\tReplicas: 0,1,2\tIsr: 0,1,2\n', '\tTopic: new_r_ags_e\tPartition: 0\tLeader: 2\tReplicas: 2,0,1\tIsr: 2,0,1\n', '\tTopic: sd-monitor\tPartition: 0\tLeader: 2\tReplicas: 2\tIsr: 2\n', '\tTopic: sd2slee-instruct-ip\tPartition: 0\tLeader: 0\tReplicas: 0\tIsr: 0\n', '\tTopic: sd_call_result\tPartition: 0\tLeader: 0\tReplicas: 0\tIsr: 0\n', '\tTopic: session_detail\tPartition: 0\tLeader: 1\tReplicas: 1\tIsr: 1\n', '\tTopic: t_srvappraise\tPartition: 0\tLeader: 2\tReplicas: 2,1,0\tIsr: 0,1,2\n', '\tTopic: traffic_agent\tPartition: 0\tLeader: 0\tReplicas: 0\tIsr: 0\n', '\tTopic: universal\tPartition: 0\tLeader: 2\tReplicas: 2\tIsr: 2\n'] Partition num count: 17
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic SmartDialerTest Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic agentProxy Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic agentStateDetail Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic call_detail Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic chatLog Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ent_record_bx_table Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ent_record_fastdfs_url Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ent_record_sr_table Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic ivr_message Partition 0 Leader 0 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic new_r_ags_e Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic sd-monitor Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic sd2slee-instruct-ip Partition 0 Leader 0 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic sd_call_result Partition 0 Leader 0 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic session_detail Partition 0 Leader 1 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic t_srvappraise Partition 0 Leader 2 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic traffic_agent Partition 0 Leader 0 status ok
2023-03-16 15:09:48 139657615288064 StormStatus.py:123 INFO kafka Node Topic universal Partition 0 Leader 2 status ok
- 添加定时任务
0 */2 * * * cd /home/ccodsupport/apex/DetailedContrast && ./StormStatus &>/dev/null