python 配置钉钉机器人,推送禅道bug统计

2020-12-30  本文已影响0人  renyjenny

禅道11.7版本开始,webhook支持集成钉钉工作消息通知。具体设置见禅道开源版使用手册
因为公司的禅道版本较低,升级或迁移又比较麻烦。为了方便统计bug信息并自动通知到钉钉群,写了一个脚本用来获取当前项目的bug列表,然后统计待解决的、待回归的bug,以及具体bug信息,最后通过钉钉机器人,以markdown的方式发送到群里。代码参考python 爬虫禅道任务自动推送到钉钉

效果展示.png

一、配置钉钉群机器人

群设置-群智能助手--添加机器人


群机器人.png

安全设置我设置的是加签,webhook和密钥待会代码里会用到。

二、python代码

# run.py
# !/usr/bin/env python
# _*_ coding: utf-8 _*_

from chandao import account, my_task

__author__ = "reny"

if __name__ == '__main__':
    cfg = account.Account()
    task = my_task.MyTask(cfg)
    task.request()
# config.json
{
 "host": "http://192.168禅道地址",
 "account": "账号",
 "password": "密码"
}
# account.py
# !/usr/bin/env python
# _*_ coding: utf-8 _*_

import json
import sys
import os


class Account(object):
    def __init__(self):
        config = os.path.join(sys.path[0], "config.json")
        if not os.path.exists(config):
            print(u"config.json 配置文件不存在")
        else:
            with open(config, "r") as configFile:
                table = json.load(configFile)
                self._host = table["host"]
                self._account = table["account"]
                self._password = table["password"]

    def get_host(self):
        # print("Host: " + self._host)
        return self._host

    def get_account(self):
        print("Account: " + self._account)
        return self._account

    def get_password(self):
        print("Password: " + self._password)
        return self._password
# my_zentao.py
# !/usr/bin/env python
# _*_ coding: utf-8 _*_


import requests
import random
from dingtalkchatbot.chatbot import DingtalkChatbot


class MyZenTao(object):
    def __init__(self, config):
        self._config = config
        self._loginurl = config.get_host() + "/zentao/user-login.html"
        self._session = requests.session()

        # 钉钉机器人
        webhook = '钉钉机器人webhook'  
        secret = '密钥'
        self._robot = DingtalkChatbot(webhook, secret=secret)

    def login(self):
        ua_list = [
            'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36',
            'Mozilla/5.0 (X11; CrOS x86_64 10066.0.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36'
        ]
        ua = random.choice(ua_list)

        body = {
            "account": self._config.get_account(),
            "password": self._config.get_password(),
            # "referer": "http://127.0.0.1/zentao/my/",
            "keepLogin[]": "on",
        }

        response = self._session.post(self._loginurl, headers={
            "User-agent": ua,
        }, data=body)

        with response:
            content = response.text
            if "parent.location=" in content:
                print("登录成功!")
                return True
            elif "登录失败,请检查您的用户名或密码是否填写正确" in content:
                print("登录失败,用户名或密码不对")
                return False
            else:
                print("登录失败,其它问题:%s" % content)
                return False

    # 钉钉机器人发送禅道任务链接
    def send_markdown(self, title, text):
        self._robot.send_markdown(title=title, text=text, is_at_all=True)

# my_task.py
# !/usr/bin/env python
# _*_ coding: utf-8 _*_

from chandao import my_zentao
from lxml import etree
import datetime


class MyTask(my_zentao.MyZenTao):
    def request(self):
        url = self._config.get_host() + "bug列表地址"

        success = self.login()
        if success is True:
            response = self._session.get(url)
            with response:
                content = response.text
                html = etree.HTML(content)

                tasks = html.xpath("//table[@id='bugList']//tr")
                count = todo_num = 0
                bug_list = []

                for i, task in enumerate(tasks):
                    id = task.xpath("./td[@class='c-id cell-id']//label//text()")  # 任务id
                    if len(id) == 0:
                        continue

                    id_str = str(id[0])

                    # 总数增加
                    count = count + 1
                    status = task.xpath("./td[contains(@class, 'c-status')]/@title")  # 状态
                    if status[0] != '激活':
                        continue
                    todo_num = todo_num + 1

                    # 组合bug list
                    content = task.xpath("./td[@class='c-title text-left']/@title")
                    content_str = str(content[0])

                    assigned_to = task.xpath("./td[contains(@class, 'c-assignedTo')]//text()")
                    assigned_to_str = str(assigned_to[1])

                    bug_list_item = "{0} {1}===>{2}".format(id_str, content_str, assigned_to_str)
                    bug_list.append(bug_list_item)

            # markdown格式
            msg = "### [截止{},共有{}个bug未解决,{}个bug待回归。](禅道地址)".format(
                datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), todo_num, count - todo_num)
            msg = msg + "\n\n* * *\n"
            for i in bug_list:
                msg = msg + '\n- ' + str(i)

            print(msg)
            self.send_markdown("禅道提醒", msg)

三、配置定时任务

# 每周一到周五,下午4点执行
crontab -e
0 16 * * 2-6 python /root/chandao/run.py
上一篇 下一篇

猜你喜欢

热点阅读