python 脚本备份gitlab分支(打tag)

2021-02-18  本文已影响0人  liukeless

现在使用git作为版本控制的趋势已经很明显, gitlab又是用的比较多的一个开源的git服务端;开发过程中会有很多dev分支,在发布生产后一般会把这些分支备份(打tag), 然后合并到master分支,之后就可以删除分支了。下面的python脚本 就是自动化实现这一过程的。

需要修改 host为gitlab域名,git_user_name ,git_pwd git登录账号密码,group_project 需要处理的分组和项目,exclude_branch 需要排除的分支;
基于 # GitLab Community Edition 12.10.6版本

可以基于last_commit判断是否已经备份打tag了

"""
用于gitlab已发布版本分支, 打Tag归档, 可选删除归档分支
python3
pip install BeautifulSoup
pip install requests
pip install lxml
"""
import requests
from bs4 import BeautifulSoup
# gitlab的域名  需要自己更改
host = "https://git.xxx.com"
login_index = host + "/users/sign_in"
login_url = host + "/users/auth/ldapmain/callback"
project_index_url = host + "/{}/{}"
branches_api = host + "/api/v4/projects/{}/repository/branches/?page={}&per_page={}"
tags_api = host + "/api/v4/projects/{}/repository/tags/?page={}&per_page={}"
add_tag_url = project_index_url + "/-/tags"
tree_url = project_index_url + "/-/tree/{}"
graphql = host + "/api/graphql"
branches_url = project_index_url + "/branches"
delete_branch_url = branches_url + "/{}"

# git登录的账号密码
git_user_name = "root"
git_pwd = "*****"

# 需要处理的分组和项目, Administrator = root 组名可以打开gitlab项目首页 地址栏显示即是【https://git.xxx.com/组名/项目名】, 建议先fork项目到私人仓库测试,检查没问题再在上游仓库执行
# group_project = {'plat': ['trade'], 'root': ['pay']}
group_project = {'core': ['trade','settle','service-api',]}
# 需要排除创建tag的分支,未发布的分支无需归档
exclude_branch = ['master', 'release', 'dev-v5.8.5', 'dev-v5.8.4', 'dev-v5.8.6', 'dev-v5.8.4.1', 'dev-v5.8.3.5']
# 归档完成是否删除分支,第一次测试请改为False,受保护的分支需要受权限才能删除
delete_bak_branch = True

tag_suffix = '-Tag'
header = {
    'authority': 'git.xxx.com',
    'method': 'POST',
    'path': '/api/graphql',
    'scheme': 'https',
    'accept': '*/*',
    'accept-encoding': 'gzip, deflate, br',
    'accept-language': 'zh-CN,zh;q=0.9',
    'content-type': 'application/json',
    'sec-fetch-dest': 'empty',
    'sec-fetch-mode': 'cors',
    'sec-fetch-site': 'same-origin',
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36'
}


def delete_branch(group, project, branch, session):
    if delete_bak_branch:
        # print('开始删除 Group=【%s】 Project=【%s】 Branch=【%s】' % (group, project, branch))
        token = obtain_token(session, branches_url.format(group, project))
        header['x-csrf-token'] = token
        result = session.delete(url=delete_branch_url.format(group, project, branch), headers=header)
        if is_success(result.status_code):
            print('完成删除 Group=【%s】 Project=【%s】 Branch=【%s】' % (group, project, branch))
    pass


def obtain_project(k, v, session):
    result = session.get(project_index_url.format(k, v))
    if not is_success(result.status_code):
        raise Exception('获取项目id失败')
    soup = BeautifulSoup(result.content, 'lxml')
    project_id = soup.body.attrs['data-project-id']
    link = soup.find_all(attrs={'name': 'csrf-token'})[0]
    token = link['content']
    return {'token': token, 'project_id': project_id}


def main():
    session = requests.Session()
    # 获取token
    token = obtain_token(session, login_index)
    # 登录
    user_name = login(session, token)
    for k in group_project:
        for v in group_project[k]:
            tags = []
            branches = []
            for p in range(1, 10):
                pair = obtain_project(k, v, session)
                header['x-csrf-token'] = pair['token']
                branch_result = session.get(url=branches_api.format(pair['project_id'], p, 100), headers=header)
                tag_result = session.get(url=tags_api.format(pair['project_id'], p, 100), headers=header)
                if not is_success(branch_result.status_code) or not is_success(tag_result.status_code):
                    raise Exception('obtain branch and tags error')
                branch_page = branch_result.json()
                tag_page = tag_result.json()
                have_more = False
                if branch_page:
                    for x in branch_page:
                        branches.append(x['name'])
                    have_more = True
                if tag_page:
                    for y in tag_page:
                        tags.append(y['name'])
                    have_more = True
                if not have_more:
                    break
            deal_branches = set(branches) - set(exclude_branch)
            print('过滤之后待处理分支列表=%s' % (str(deal_branches)))
            for branch in deal_branches:
                rel_add_tag_url = add_tag_url.format(k, v)
                add_token_url = rel_add_tag_url + '/new'
                new_tag = obtain_tag_name(branch, tags, k, v, session)
                if not new_tag:
                    delete_branch(k, v, branch, session)
                    continue
                add_tag_param = {'utf8': '✓', 'tag_name': new_tag, 'ref': branch,
                                 'message': '自动归档创建Tag-by-' + user_name,
                                 'authenticity_token': obtain_token(session, add_token_url)}
                # print('开始归档 Group=【%s】 Project=【%s】 Branch=【%s】Tag=【%s】' % (k, v, branch, new_tag))
                result = session.post(rel_add_tag_url, add_tag_param)
                if is_success(result.status_code):
                    print('完成归档 Group=【%s】 Project=【%s】 Branch=【%s】Tag=【%s】' % (k, v, branch, new_tag))
                    delete_branch(k, v, branch, session)
                else:
                    print('归档失败 Group=【%s】 Project=【%s】 Branch=【%s】Tag=【%s】' % (k, v, branch, new_tag))

    print("over")


def is_success(code):
    if 200 <= code < 300:
        return True
    else:
        return False


def check_commit(branch, branch_last_commit, tag_name, tag_last_commit):
    code1 = branch_last_commit.status_code
    code2 = tag_last_commit.status_code
    flag = is_success(code1) and is_success(code2)
    if not flag:
        # 检查失败, 认为不是同一个分支
        print('请求获取最后提交记录请求失败branch_last_commit=【%d】【%d】' % (code1, code2))
        return False
    try:
        branch_json = branch_last_commit.json()
        tag_json = tag_last_commit.json()
        sha1 = branch_json['data']['project']['repository']['tree']['lastCommit']['sha']
        sha2 = tag_json['data']['project']['repository']['tree']['lastCommit']['sha']
        if sha1 == sha2:
            print('判断Branch=【%s】 Tag=【%s】最新Commit=【%s】记录相同,不再生成新的Tag' % (branch, tag_name, sha1))
            return True
    except Exception as e:
        print(e)
    return False


def obtain_tag_name(branch, tags, group, project_name, session):
    tag_name = branch + tag_suffix
    project_path = group + '/' + project_name
    if tag_name in tags:
        params = "{\"operationName\":\"pathLastCommit\",\"variables\":{\"projectPath\":\"" + project_path + "\",\"ref\":\"" + branch + "\",\"path\":\"\"},\"query\":\"query pathLastCommit($projectPath: ID!, $path: String, $ref: String!) {\\n  project(fullPath: $projectPath) {\\n    repository {\\n      tree(path: $path, ref: $ref) {\\n        lastCommit {\\n          sha\\n          title\\n          description\\n          message\\n          webUrl\\n          authoredDate\\n          authorName\\n          authorGravatar\\n          author {\\n            name\\n            avatarUrl\\n            webUrl\\n            __typename\\n          }\\n          signatureHtml\\n          pipelines(ref: $ref, first: 1) {\\n            edges {\\n              node {\\n                detailedStatus {\\n                  detailsPath\\n                  icon\\n                  tooltip\\n                  text\\n                  group\\n                  __typename\\n                }\\n                __typename\\n              }\\n              __typename\\n            }\\n            __typename\\n          }\\n          __typename\\n        }\\n        __typename\\n      }\\n      __typename\\n    }\\n    __typename\\n  }\\n}\\n\"}"
        token = obtain_token(session, project_index_url.format(group, project_name))
        header['x-csrf-token'] = token
        branch_last_commit = session.request(method='POST', url=graphql, data=params.encode('utf-8'), headers=header)
        tree_token = obtain_token(session, tree_url.format(group, project_name, tag_name))
        tree_params = "{\"operationName\":\"pathLastCommit\",\"variables\":{\"projectPath\":\"" + project_path + "\",\"ref\":\"" + tag_name + "\",\"path\":\"\"},\"query\":\"query pathLastCommit($projectPath: ID!, $path: String, $ref: String!) {\\n  project(fullPath: $projectPath) {\\n    repository {\\n      tree(path: $path, ref: $ref) {\\n        lastCommit {\\n          sha\\n          title\\n          description\\n          message\\n          webUrl\\n          authoredDate\\n          authorName\\n          authorGravatar\\n          author {\\n            name\\n            avatarUrl\\n            webUrl\\n            __typename\\n          }\\n          signatureHtml\\n          pipelines(ref: $ref, first: 1) {\\n            edges {\\n              node {\\n                detailedStatus {\\n                  detailsPath\\n                  icon\\n                  tooltip\\n                  text\\n                  group\\n                  __typename\\n                }\\n                __typename\\n              }\\n              __typename\\n            }\\n            __typename\\n          }\\n          __typename\\n        }\\n        __typename\\n      }\\n      __typename\\n    }\\n    __typename\\n  }\\n}\\n\"}"
        header['x-csrf-token'] = tree_token
        tag_last_commit = session.request(method='POST', url=graphql, data=tree_params.encode('utf-8'), headers=header)
        flag = check_commit(branch, branch_last_commit, tag_name, tag_last_commit)
        if flag:
            return None
        suffix = '({})'
        count = 0
        while True:
            count = count + 1
            new_tag_name = tag_name + suffix.format(count)
            if new_tag_name not in tags:
                return new_tag_name
            else:
                tree_token = obtain_token(session, tree_url.format(group, project_name, new_tag_name))
                tree_params = "{\"operationName\":\"pathLastCommit\",\"variables\":{\"projectPath\":\"" + project_path + "\",\"ref\":\"" + new_tag_name + "\",\"path\":\"\"},\"query\":\"query pathLastCommit($projectPath: ID!, $path: String, $ref: String!) {\\n  project(fullPath: $projectPath) {\\n    repository {\\n      tree(path: $path, ref: $ref) {\\n        lastCommit {\\n          sha\\n          title\\n          description\\n          message\\n          webUrl\\n          authoredDate\\n          authorName\\n          authorGravatar\\n          author {\\n            name\\n            avatarUrl\\n            webUrl\\n            __typename\\n          }\\n          signatureHtml\\n          pipelines(ref: $ref, first: 1) {\\n            edges {\\n              node {\\n                detailedStatus {\\n                  detailsPath\\n                  icon\\n                  tooltip\\n                  text\\n                  group\\n                  __typename\\n                }\\n                __typename\\n              }\\n              __typename\\n            }\\n            __typename\\n          }\\n          __typename\\n        }\\n        __typename\\n      }\\n      __typename\\n    }\\n    __typename\\n  }\\n}\\n\"}"
                header['x-csrf-token'] = tree_token
                tag_last_commit = session.request(method='POST', url=graphql, data=tree_params.encode('utf-8'),
                                                  headers=header)
                flag = check_commit(branch, branch_last_commit, tag_name, tag_last_commit)
                if flag:
                    return None
    else:
        return tag_name


def login(session, token):
    params = {'utf8': '✓', 'username': git_user_name, 'password': git_pwd, 'remember_me': '1',
              'authenticity_token': token}
    response = session.post(login_url, params)
    if not is_success(response.status_code):
        raise Exception('登录git失败')
    soup = BeautifulSoup(response.content, 'lxml')
    user_name = soup.find_all(attrs={'class': 'user-name'})[0].text
    return user_name


def obtain_token(session, url):
    response = session.get(url)
    if not is_success(response.status_code):
        raise Exception('获取Token失败')
    content = response.text
    soup = BeautifulSoup(content, 'lxml')
    link = soup.find_all(attrs={'name': 'csrf-token'})[0]
    token = link['content']
    return token


if __name__ == '__main__':
    main()


上一篇下一篇

猜你喜欢

热点阅读