查询gitlab存在存在某个分支的项目并且判断代码分支是否合并
2021-08-15 本文已影响0人
liukeless
应用场景
在微服务的场景下,往往一个需求版本会涉及到很多的服务模块,它们有着共同的分支名称,但是由于开发时间跨度较长,到后期都忘记了涉及到哪些模块需要发版;
你们是否发生过发版之后,发现上个版本的代码没有合并,导致线上版本缺少上个版本代码的功能;
解决方案
本篇提供了一个python脚本,专门解决上面两类问题。主要功能如下:
- 检查存在目标分支的项目有哪些
- 检查目标分支是否合并到某分支(比如发版完成后需要开发分支合并到master分支)
- 检查稳定分支是否合并到master分支 (在多版本并行开发的场景下,master 有两个并行开发分支 dev1、dev2, dev1上线之后合并到master分支,忘记合并到dev2分支,导致dev2上线缺少dev1分支的代码)
废话不多说,show code
以下几个参数需要自行替换;
- private_token gitlab页面生成token
- wxwork_webhock 企业微信webhock地址,如果需要发送到企业维信群聊需要提供
- gitlab_host gitlab的地址
- 关键参数 target_branch_name 、merge_to 、merged_from 需要分析的分支名称,具体参考注释
"""
基于gitlab-api: https://docs.gitlab.com/ee/api/
gitlab 授权private user->setting->accessTokens
"""
import json
import traceback
from datetime import datetime
from itertools import groupby
from operator import itemgetter
import requests
# 认证token gitlab页面生成
from image_util import new_image
private_token = "*****"
# 获取最小项目权限 30 = developer
min_access_level = 30
print_simple = True
merge_project = True
wxwork_webhock = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=******"
gitlab_host = "https://git.xxxx.com"
gitlab_api_version = "/api/v4"
base_path = gitlab_host + gitlab_api_version
# 获取所有项目
all_projects_path = base_path + "/projects"
# 查找是否存在目标分支
branches_path = all_projects_path + "/{}/repository/branches/{}"
# 查找commit 被推送到分支
commits_push_to = base_path + "/projects/{}/repository/commits/{}/refs"
target_branch_name = ["dev-1", "dev-2"]
# namespace_exclude = [{"kind": "user", "name": "xxx"}]
namespace_exclude = []
namespace_include = []
# target_branch_name 是否合并到 merge_to分支 (发版后检查是否合并到稳定分支)
merge_to = []
# merged_from 是否合并到 target_branch_name (发版前检查上个版本分支是否合并)
merged_from = ["master"]
def obtain_all_projects(per_page, page):
project_list = []
args = {"per_page": per_page, "page": page, "min_access_level": min_access_level, "order_by": "id"}
resp = requests.get(url=all_projects_path, params=args, headers={"PRIVATE-TOKEN": private_token})
if not is_success(resp.status_code):
raise Exception("获取所有项目列表失败")
content = json.loads(resp.content)
project_list.extend(content)
resp_headers = resp.headers
if resp_headers["X-Next-Page"]:
project_list.extend(obtain_all_projects(per_page, int(resp_headers["X-Next-Page"])))
return project_list
def is_success(code):
if 200 <= code < 300:
return True
else:
return False
def search_branch(projects):
results = []
for p in projects:
flag = True
for x in namespace_exclude:
if p["namespace"]["kind"] == x["kind"] and p["namespace"]["name"] == x["name"]:
flag = False
break
if namespace_include:
flag = False
for x in namespace_include:
if p["namespace"]["kind"] == x["kind"] and p["namespace"]["name"] == x["name"]:
flag = True
break
if not flag:
continue
for b in target_branch_name:
try:
branch = obtain_branch(p["id"], p["name"], b)
if not branch:
continue
last_commit = branch["commit"]
merge_to_result = {}
for t in merge_to:
merge_to_result[t] = "False"
result = requests.get(url=commits_push_to.format(p["id"], last_commit["id"]),
headers={"PRIVATE-TOKEN": private_token})
if not is_success(result.status_code):
if result.text == '{"message":"404 Commit Not Found"}':
merge_to_result[t] = "No Commit"
continue
raise Exception("获取commit推送到【{}】分支列表失败".format(p["name"]))
pushed_branch = json.loads(result.content)
for p_branch in pushed_branch:
if p_branch["name"] == t:
merge_to_result[t] = "True"
break
merged_from_result = {}
for f in merged_from:
merged_from_result[f] = "False"
from_branch = obtain_branch(p["id"], p["name"], f)
if not from_branch:
merged_from_result[f] = "No Branch"
continue
from_last_commit = from_branch["commit"]
response = requests.get(url=commits_push_to.format(p["id"], from_last_commit["id"]),
headers={"PRIVATE-TOKEN": private_token})
if not is_success(response.status_code):
if response.text == '{"message":"404 Commit Not Found"}':
merged_from_result[f] = "No Commit"
continue
raise Exception("获取commit推送到【{}】分支列表失败".format(p["name"]))
cur_pushed_branch = json.loads(response.content)
for c_p_branch in cur_pushed_branch:
if c_p_branch["name"] == b:
merged_from_result[f] = "True"
break
results.append({"namespace": p["namespace"]["kind"] + "/" + p["namespace"]["name"],
"projectName": p["name"],
"branch": branch["name"],
"commitTime": datetime.strptime(last_commit["created_at"],
"%Y-%m-%dT%H:%M:%S.%f+08:00").strftime(
"%Y-%m-%d %H:%M:%S"),
"committer": last_commit["committer_name"],
"title": last_commit["title"],
"mergeTo": merge_to_result,
"mergedFrom": merged_from_result
})
except Exception as e:
traceback.print_exc()
print("获取项目【{}】分支列表失败".format(p["name"]))
return results
def obtain_branch(project_id, project_name, branch_name):
resp = requests.get(url=branches_path.format(project_id, branch_name), headers={"PRIVATE-TOKEN": private_token})
if not is_success(resp.status_code):
if resp.text == '{"message":"404 Branch Not Found"}':
return
raise Exception("获取项目【{}】分支列表失败".format(project_name))
return json.loads(resp.content)
def calculate_max(results):
max_length = {"namespaceMax": 9, "projectNameMax": 7, "branchMax": 6, "committerMax": 1, "titleMax": 1,
"mergeToMax": 7, "mergedFromMax": 10}
for x in results:
if len(x["namespace"]) > max_length["namespaceMax"]:
max_length["namespaceMax"] = len(x["namespace"])
if len(x["projectName"]) > max_length["projectNameMax"]:
max_length["projectNameMax"] = len(x["projectName"])
if len(x["branch"]) > max_length["branchMax"]:
max_length["branchMax"] = len(x["branch"])
if len(x["committer"]) > max_length["committerMax"]:
max_length["committerMax"] = len(x["committer"])
if len(x["title"]) > max_length["titleMax"]:
max_length["titleMax"] = len(x["title"])
if len(str(x["mergeTo"])) > max_length["mergeToMax"]:
max_length["mergeToMax"] = len(str(x["mergeTo"]))
if len(str(x["mergedFrom"])) > max_length["mergedFromMax"]:
max_length["mergedFromMax"] = len(str(x["mergedFrom"]))
return max_length
def main():
send_list = []
start = "=======>> Start Run At {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
send_list.append(start)
projects = obtain_all_projects(100, 1)
results = search_branch(projects)
col_max = calculate_max(results)
results.sort(key=itemgetter("branch"))
for branch, items in groupby(results, key=itemgetter("branch")):
log = "存在分支【{:<{}}】的项目列表".format(branch, col_max["branchMax"])
send_list.append(log)
header_line = "+-" + "-" * col_max["namespaceMax"] + "-+-" + "-" * col_max["projectNameMax"] + "-+-" + "-" * \
col_max[
"branchMax"] + "-+-" + "-" * col_max["mergeToMax"] + "-+-" + "-" * col_max[
"mergedFromMax"] + "-+"
send_list.append(header_line)
table_header = "| " + "{:<{}}".format("namespace", col_max["namespaceMax"]) + " | " + "{:<{}}" \
.format("project", col_max["projectNameMax"]) + " | " + "{:<{}}".format(
"branch", col_max["branchMax"]) + " | ""{:<{}}".format("mergeTo",
col_max["mergeToMax"]) + " | " + "{:<{}}".format(
"mergedFrom", col_max["mergedFromMax"]) + " |"
send_list.append(table_header)
send_list.append(header_line)
for x in items:
if print_simple:
print_str = "| " + "{:<{}}".format(x["namespace"], col_max["namespaceMax"]) + " | " + "{:<{}}".format(
x["projectName"],
col_max[
"projectNameMax"]) + " | " + "{:<{}}".format(
x["branch"], col_max["branchMax"]) + " | " + "{:<{}}".format(str(x["mergeTo"]),
col_max[
"mergeToMax"]) + " | " + "{:<{}}".format(
str(x["mergedFrom"]), col_max["mergedFromMax"]) + " |"
# print_str = "namespace: {:<{}} project: {:<{}} branch: {:<{}} mergeTo: {:<{}} mergedFrom: {:<{}}".format(
# x["namespace"], col_max["namespaceMax"], x["projectName"], col_max["projectNameMax"], x["branch"],
# col_max["branchMax"], str(x["mergeTo"]), col_max["mergeToMax"], str(x["mergedFrom"]),
# col_max["mergedFromMax"]
# )
send_list.append(print_str)
else:
print_str = "namespace: {:<{}} project: {:<{}} branch: {:<{}} mergeTo: {:<{}} mergedFrom: {:<{}} lastCommit: {} {:<{}} {:<{}}".format(
x["namespace"], col_max["namespaceMax"], x["projectName"], col_max["projectNameMax"],
x["branch"],
col_max["branchMax"], str(x["mergeTo"]), col_max["mergeToMax"], str(x["mergedFrom"]),
col_max["mergedFromMax"],
x["commitTime"], x["committer"], col_max["committerMax"], x["title"], col_max["titleMax"]
)
print(print_str)
send_list.append(print_str)
send_list.append(header_line)
if merge_project:
results.sort(key=itemgetter("projectName"))
project_list = []
for project, items in groupby(results, key=itemgetter("projectName")):
project_list.append(project)
merge_str = "\n合并项目列表:"
# print(merge_str)
send_list.append(merge_str)
# merge_list = "\n".join(project_list)
# print(merge_list)
send_list.extend(project_list)
print("\n".join(send_list))
# send_wxwork("\n".join(send_list))
pass
def send_wxwork(msg):
# 发送到企业微信群聊机器人
headers = {"Content-Type": "application/json"}
params = {"msgtype": "text", "text": {"content": msg}}
resp = requests.post(url=wxwork_webhock, headers=headers, json=params)
print(resp.content)
if __name__ == '__main__':
main()