500 lines or less学习笔记(四)——持续集成系统

2021-08-26  本文已影响0人  简单一点点

本文介绍了一个简单的分布式持续集成系统,包含监听器、调度器和测试运行器三个组件,主要由 Python 语言实现,并包含一部分 shell 脚本。我是在 Windows 下运行的代码,所以对部分地方进行了修改。

原文作者

Malini Das。 Malini Das是一位软件工程师,他热衷于快速开发(但保证安全!)以及解决交叉编程问题。她曾在 Mozilla 担任工具工程师,当前在 Twitch 的技能。你可以通过关注 Malini 的 Twitter 或她的博客来了解她的最新动态。

什么是持续集成系统?

在开发软件时,我们希望能够验证我们的新功能或错误修复是否安全,是否按预期工作。我们通过对代码运行测试来做到这一点。有时,开发人员会在本地运行测试来验证他们的更改是安全的,但是开发人员可能没有时间在运行软件的每个系统上测试他们的代码。此外,随着越来越多的测试被添加,运行所有这些测试(即使只是在本地)变得不太可行。正因为如此,持续集成系统被创造出来。

持续集成(CI)系统是用来测试新代码的专用系统。新代码提交到代码存储库后,持续集成系统负责验证此提交不会导致任何测试失败。为此,系统必须能够获取新更改的代码,运行测试并报告结果。和其他系统一样,它也应该有良好的稳定性。这意味着,如果系统的任何部分发生故障,它应该能够从故障点恢复并继续运行。

这个测试系统应该能够很好地处理负载均衡,这样如果提交的速度比测试的运行速度还要快,我们就可以在合理的时间内得到测试结果。我们可以通过分布式和并行化测试工作来实现这一点。这个项目将展示一个小型的、基本的分布式连续集成系统,它是为可扩展性而设计的。

项目限制和注意事项

本项目使用 Git 作为测试代码的存储库,但只会使用标准的源代码管理调用指令,因此如果你不熟悉 Git,但熟悉其他版本控制系统(VCS),如 svn 或 Mercurial,你仍然可以继续学习。

由于代码长度和单元测试的限制,我简化了测试发现机制。我们将只运行存储库中 tests 目录中的测试。

持续集成系统监视主存储库,该存储库通常托管在 Web 服务器上,而不是 CI 文件系统的本地存储库。对于我们的示例,我们将使用本地存储库而不是远程存储库。

持续集成系统不需要按固定的、定期的时间表运行。你还可以让它们每隔几次提交或每次提交运行一次。对于我们的示例,CI 系统将定期运行。这意味着,如果将其设置为在五秒钟内检查更改,则它将针对五秒钟后所做的最新提交运行测试。它不会测试在这段时间内所做的每一次提交,只测试最近的一次。

此 CI 系统旨在定期检查存储库中的更改。在真实的 CI 系统中,你还可以让存储库监听器得到托管存储库的通知。例如,Github 提供了“post commit hooks”,用于向 URL 发送通知。按照此模型,位于该 URL 的 Web 服务器将调用存储库监听器来响应该通知。由于这在本地建模很复杂,所以我们使用了一个观察者模型,在这个模型中,存储库监听器将检查更改,而不是得到通知。

CI 系统也有一个报告器组件,测试运行程序将其结果报告给一个组件,以便人们可以在网页上看到这些结果。为了简单起见,本项目收集测试结果并将其作为文件存储在调度程序进程本地的文件系统中。

注意,这个 CI 系统使用的架构只是众多可能性中的一种。选择这种方法是为了将我们的案例研究简化为三个主要部分。

引言

持续集成系统的基础结构包括三个部分:监听器,测试任务调度器,和测试运行器。首先监听器会监视代码库,当发生提交时,监听器会通知调度器。之后,调度器会分配给一个可用的测试运行器来完成对应提交版本号的测试。

构建 CI 系统的方式有很多。我们可以将他们全部运行在一台电脑的同一个线程之中。但是这样一来,我们的 CI 系统就会缺少了处理大负载的能力,当很多的提交带来了大量的测试内容时,这种方案非常容易引起工作的积压。同时这种方案的容错率非常低,一旦运行该系统的计算机发生故障或是断电,没有后备的系统完成中断的工作。我们希望我们的 CI 系统应该根据需求尽可能的同时完成多项测试工作,并且在机器发生意外停机时有很好的后备运行方案。

为了构建一个负载能力强并且容错率又高的 CI 系统,在本项目中,上述的每一个组件都以独立的进程运行。每个进程之间完全独立,并且每个进程可以同时运行多个实例。这种方案在很多的测试工作需要同时展开时很有用。我们可以并行运行多个测试运行器的实例,每个测试运行器独立工作,这样就可以有效的解决测试队列积压的问题。

在本项目中这些组件虽然运行在独立的进程上,但是相互之间可以通过套接字进行通信,这样我们就可以在网络中的不同主机上分别运行这些进程。我们会为每一个进程分配一个地址/端口,这样每个进程之间就可以通过向分配到的地址发送消息来互相通信。

通过分布式的架构,我们可以做到在硬件发生故障时即时的进行处理。我们可以把监听器,测试任务调度器,和测试运行器分别运行在不同的机器上,他们可以通过网络保持相互通信。当他们之中的任何一个发生问题时,我们可以安排一台新的主机上线运行发生问题的进程。这样一来这个系统就会有非常高的容错率。

在本项目并没有包含自动恢复的代码。自动恢复的功能取决于你使用的分布式系统的结构。在实际的使用中,CI 系统通常运行在支持故障转移(举个例子,当分布式系统中的一个机器发生故障,我们设定好的后备机器会自动接手中断的工作)的分布式系统之中。

对于本项目,这些进程中的每一个都将在本地以不同的本地端口手动启动。

项目文件结构

项目中每个组件的 Python 文件结构如下:监听器 (repo_observer.py),测试任务调度器(dispatcher.py),测试运行器(test_runner.py)。上述每个进程之间通过套接字通信,我们将用于实现通信功能的代码统一的放在 helpers.py 中。这样就可以让每个组件直接从这个文件中导入通信函数,而不用在每个组件中重复的写这段代码。

另外,我们还用到了 bash 脚本。这些脚本用来执行一些简单的 bash 和 git 的操作,直接通过 bash 脚本要比利用 Python 提供的系统级别的模块(比如,os 或者 subprocess 之类的)要更方便一些。

最后,我们还建立了一个 tests 目录来存放我们需要 CI 系统运行的测试样例。在这个目录中包含两个用于测试的样例,其中一个样例模拟了样例通过时的情况,另一个则模拟了失败时的情况。

初始设置

虽然我们的 CI 系统是为分布式的运行而设计的,但是为了在理解 CI 系统运行原理的过程中不受网络因素的影响,我们会在同一台计算机上运行所有的组件。当然,如果你想要试一试分布式的运行环境,你也可以将每一个组件分别运行到不同的主机上。

持续集成系统通过监听代码的变动来触发测试,所以在开始之前我们需要设置一个用于监听的代码储存库。

我们称这个用于测试的项目为 test_repo:

$ mkdir test_repo 
$ cd test_repo 
$ git init

这是开发人员迁入代码的主存储库。监听器模块通过检查 commit (提交)来进行代码更新的监听,所以我们至少需要一次的 commit 才能进行监听器模块的测试。

将 tests 文件夹拷贝到 test_repo 中,然后提交:

$ cp -r /this/directory/tests /path/to/test_repo/ 
$ cd /path/to/test\_repo 
$ git add tests/ 
$ git commit -m ”add tests”

现在,我们的代码仓库中的 master 分支上有了一次提交。

监听器组件需要一份单独的代码拷贝来检测新的提交。让我们从 master 分支做一份代码拷贝,起名为 test_repo_clone_obs

$ git clone /path/to/test_repo test_repo_clone_obs

测试运行器同样需要一个自己的代码拷贝,这样它才能在 commit 发生时运行相关的测试。我们同样从 master 分支做一份代码拷贝,并起名为 test_repo_clone_runner

$ git clone /path/to/test_repo test_repo_clone_runner

组件

监听器(repo_observer.py

监听器的任务是监听代码库中的改动,并在发现新提交时通知测试任务分配器。为了保证我们的 CI 系统与各种版本控制系统(并不是所有的 VCS 都有内置的通知系统)都能够兼容,我们设定 CI 系统定时检查代码库是否有新的提交,而不是等待 VCS 在代码提交时发送通知。

监听器会定时轮询存储库,当观察到更改时,监听器会向分配器推送需要运行测试的代码的提交ID。监听器通过获取当前的提交 ID 来检查新的提交,然后将本地库更新至这个版本,最后将这个版本与远程库最近一次的提交 ID 进行比对。这样,监听器中本地的当前版本与远程的最新版本不一致时就判定为发生了新的提交。在我们的 CI 系统中,监听器只会向分配器推送最近的一次提交。这意味着,如果在一次的轮询周期内发生了两次提交,监听器只会为最近的一次运行测试。通常来讲,CI 系统会为自上一次更新以来的每一次的提交运行相应的测试。但是为了简单起见,这次我们搭建的 CI 系统采取了仅为最后一次提交运行测试的方案。

监听器必须清楚自己监听的到底是哪一个存储库,我们之前已经在 /path/to/test_repo_clone_obs 建立了一份用于监听的存储库拷贝。我们的监听器会使用这份拷贝进行检测。为了监听器能够使用这份拷贝,我们在调用 repo_observer.py 时会传入这个拷贝的路径。监听器会利用这份拷贝从主仓库中拉取最新的代码。

同样,我们还需要为监听器提供测试任务分配器的地址,这样监听器推送的消息才能传递到分配器中。在运行监听器时,可以通过命令行参数 --dispatcher-server 来传递分配器的地址。如果不手动传入地址,分配器的默认地址取值为:localhost:8888

def poll():
    parser = argparse.ArgumentParser()
    parser.add_argument("--dispatcher-server",
                        help="dispatcher host:port, " \
                        "by default it uses localhost:8888",
                        default="localhost:8888",
                        action="store")
    parser.add_argument("repo", metavar="REPO", type=str,
                        help="path to the repository this will observe")
    args = parser.parse_args()
    dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")

当调用了监听器脚本后,会直接从 poll() 开始运行。这个函数会将命令行的参数传递进来,并开始一个无限的 while 循环。这个 while 循环会定期的检查存储库的变化。这个循环中所做的第一个工作就是运行 bash 脚本update_repo.sh[1]

    while True:
        try:
            # 调用更新存储库的 bash 脚本并检查更新。如果发现更新,它会删除
            # 当前工作目录的 .commit_id 文件
            subprocess.check_output(["./update_repo.sh", args.repo])
        except subprocess.CalledProcessError as e:
            raise Exception("Could not update and check repository. " +
                            "Reason: %s" % e.output)

update_repo.sh 用于识别新的提交并通知监听器。它首先记录当前所在的提交 ID,然后拉取最新的代码,接着检查最新的提交 ID。如果二者匹配,说明代码没有变动,所以监听器不会作出任何响应。但是,如果提交 ID 间存在不同,就意味着有了新的提交。这时,update_repo.sh 会创建一个叫 .commit_id 的文件来记录最新的提交 ID。

update_repo.sh 的具体步骤如下:首先,我们的脚本源自于一个叫 run_or_fail.sh 的文件。run_or_fail.sh 提供了一些 shell 脚本的辅助函数。通过这些函数我们可以运行指定的脚本并可以在运行出错时输出错误信息。

#!/bin/bash

source run_or_fail.sh 

接下来,我们的脚本会试图删除 .commit_id 文件。因为 repo_observer.py 会不断循环的调用 updaterepo.sh,如果在上一次的调用中产生了 .commit_id 文件,并且其中储存的版本ID我们在上一次轮询中已经完成了测试,就会造成混乱。所以我们在每次都会先删除上一次的 .commit_id 文件。

bash rm -f .commit_id

在删除了文件之后(在文件已经存在的情况下),它会验证我们正在观察的存储库是否存在,然后将其重置为最新的提交,以防任何原因导致它不同步。

run_or_fail "Repository folder not found!" pushd $1 1> /dev/null
run_or_fail "Could not reset git" git reset --hard HEAD

再之后,读取 git 的日志,将其中最后一次的提交 ID 解析出来。

COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
  echo "Could not call 'git log' on repository"
  exit 1
fi
COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`

接下来,拉取存储库,获取最近所有的更改,并得到最新的提交ID。

run_or_fail "Could not pull from repository" git pull
COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
  echo "Could not call 'git log' on repository"
  exit 1
fi
NEW_COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`

最后,如果新得到的提交 ID 与上一次的 ID 不匹配,我们就知道在两次轮询间发生了新的提交,所以我们的脚本应该将新的提交ID储存在 .commit_id 文件中。

# if the id changed, then write it to a file
if [ $NEW_COMMIT_ID != $COMMIT_ID ]; then
  popd 1> /dev/null
  echo $NEW_COMMIT_ID > .commit_id
fi

repo_observer.py 中的 update_repo.sh 脚本运行结束后,监听器会检查 .commit_id 是否存在。如果文件存在,我们就知道在上一次的轮询后又发生了新的提交,我们需要通知测试样例调度器来开始测试。监听器会通过连接并发送一个'status'请求来检查调度器服务的运行状态,以保证它处在可以正常接受指令的状态正常工作状态。

        if os.path.isfile(".commit_id"):
            try:
                response = helpers.communicate(dispatcher_host,
                                               int(dispatcher_port),
                                               "status")
            except socket.error as e:
                raise Exception("Could not communicate with dispatcher server: %s" % e)

如果调度器返回一个“OK”,监听器就会读取 .commit_id 文件中最新的提交ID,并使用 dispatch:<commit ID> 请求将 ID 发送到调度器中。监听器会每隔 5 秒发送一次指令。如果发生任何错误,监听器同样会每隔 5s 进行一次重试。

           if response == "OK":
                # 调度器已存在,让我们发送测试
                commit = ""
                with open(".commit_id", "r") as f:
                    commit = f.readline()
                response = helpers.communicate(dispatcher_host,
                    int(dispatcher_port), "dispatch:%s" % commit)
                if response != "OK":
                    raise Exception("Could not dispatch the test: %s" % response)
                print("dispatched!")
            else:
                # 调度器出现了错误
                raise Exception("Could not dispatch the test: %s" % response) 
        time.sleep(20)

监听器会永远重复这一操作,直到你使用 KeyboardInterrupt (Ctrl+C)终止监听器发送进程或发送终止信号,。

测试任务调度器(dispatcher.py

测试任务调度器是一个用来为测试运行器分配测试任务的独立进程。它会在一个指定端口监听来自存储库监听器及测试运行器的请求。调度器允许测试运行器主动注册,当监听器发送一个提交 ID 时,它会将测试工作分配给一个已经注册的测试运行器。同时,它还可以平稳的处理测试运行器遇到的各种问题,当一个运行器发生故障,它可以立即将该运行器运行测试的提交 ID 重新分配给一个新的测试运行器。

dispatch.py 脚本从 serve 函数开始运行。首先,它会解析你设定的分配器的地址及端口:

def serve():
    parser = argparse.ArgumentParser()
    parser.add_argument("--host",
                        help="dispatcher's host, by default it uses localhost",
                        default="localhost",
                        action="store")
    parser.add_argument("--port",
                        help="dispatcher's port, by default it uses 8888",
                        default=8888,
                        action="store")
    args = parser.parse_args()

这里我们会开启分配器进程以及一个 runner_checker 函数进程,和一个 redistribute 函数进程。

    server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
    print("serving on %s:%s" % (args.host, int(args.port)))

    ...

    runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
    redistribute = threading.Thread(target=redistribute, args=(server,))
    try:
        runner_heartbeat.start()
        redistribute.start()
        # 激活 server; 一直运行直到
        # 用 Ctrl+C 或 Cmd+C 打断程序
        server.serve_forever()
    except(KeyboardInterrupt, Exception):
        # 如果发生异常,则杀死进程
        server.dead = True
        runner_heartbeat.join()
        redistribute.join()

runner_checker 函数会定期的 ping 每一个注册的运行器,来确保他们都处于正常工作的状态。如果有运行器没有响应,该函数就会将其从注册的运行器池中删除,并且之前分配给它的提交 ID 会被重新分配给一个新的可用的运行器。函数会在 pending_commits 变量中记录运行受到运行器失去响应影响的提交ID。

    def runner_checker(server):
        def manage_commit_Lists(runner):
            for commit, assigned_runner in server.dispatched_commits.items():
                if assigned_runner == runner:
                    del server.dispatched_commits[commit]
                    server.pending_commits.append(commit)
                    break
            server.runners.remove(runner)

        while not server.dead:
            time.sleep(10)
            for runner in server.runners:               
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    response = helpers.communicate(runner["host"], int(runner["port"]),
                        "ping")
                    if response != "pong":
                        print("removing runner %s" % runner)
                        manage_commit_Lists(runner)
                except socket.error as e:
                    manage_commit_Lists(runner)

redistribute 用来将 pending_commits 中记录的提交 ID 进行重新分配。redistribute 运行时会不断的检查 pending_commits 文件,一旦发现 pending_commits 中存在提交 ID,函数会调用 dispatch_tests 方法来分配这个提交 ID。

    def redistribute(server):
        while not server.dead:
            for commit in server.pending_commits:
                print "running redistribute"
                print server.pending_commits
                dispatch_tests(server, commit)
                time.sleep(5)

dispatch_tests 函数用来从已注册的运行器池中返回一个可用的运行器。如果得到了一个可用的运行器,函数会发送一个带有提交 ID 的运行测试指令。如果当前没有可用的运行器,函数会在 2s 的休眠之后重复上述过程。如果分配成功了,函数会在 dispatched_commits 变量中记录提交 ID 及该提交 ID 的测试正在由哪一个运行器运行。如果提交ID在 pending_commits 中, dispatch_tests 函数会在重新分配后将提交ID从 pending_commits 中删除。

def dispatch_tests(server, commit_id):
    # NOTE: 通常我们不会永远运行
    while True:
        print("trying to dispatch to runners")
        for runner in server.runners:
            response = helpers.communicate(runner["host"],
                int(runner["port"]), "runtest:%s" % commit_id)
            if response == "OK":
                print("adding id %s" % commit_id)
                server.dispatched_commits[commit_id] = runner
                if commit_id in server.pending_commits:
                    server.pending_commits.remove(commit_id)
                return
        time.sleep(5)

调度器服务用到了标准库中的一个叫 SocketServer 的非常简单的网络服务器模块。SocketServer 模块中有四种基本的服务器类型:TCP,UDP, UnixStreamServerUnixDatagramServer。为了保证我们的数据传输连续稳定,我们使用基于 TCP 协议的套接字(UDP 并不能保证数据的稳定和连续)。

SocketServer 中提供的默认的 TCPServer 最多只支持同时处理一个连接。所以当调度器与一个运行器建立连接之后,就无法再与监听器建立连接了。此时来自监听器的连接只能等待第一个连接完成并断开才能建立与调度器的连接。这对于我们的项目而言并不是很理想,在我们预想中,调度器应该直接而迅速的同时与所有运行器及监听器进行通信。

为了使调度器可以同时处理多个连接,我们使用了一个自定义的类 ThreadingTCPServer 来为默认的 SocketServer 类增加多线程运行的功能。也就是说无论何时调度器接收到连接请求,它都会新建一个线程来处理这个连接。这使调度器可以同时处理多个连接。

class ThreadingTCPServer(socketserver.ThreadingTCPServer):
    runners = [] # 测试运行器池
    dead = False # 向其它线程指示是否还在运行
    dispatched_commits = {} # 我们派遣的 commit
    pending_commits = [] # 尚未派遣的任务

调度器通过为每一个请求定义处理程序来工作。在继承自 SocketServerBaseRequestHandler 类的 DispatcherHandler 类中定义了处理的方法。这个基类只要求我们定义一个 handle 函数,每当有连接请求时就会调用它。我们将这个函数的写在 DispatcherHandler中,并且确保在每一次请求到来时,这个函数能够被调用。这个函数会不断地监听发来的请求(self.request 会携带请求信息),并解析请求中的指令。

class DispatcherHandler(socketserver.BaseRequestHandler):
    """
    调度器的 RequestHandler 类
    它会将传入的 commit 派遣给测试运行器并处理它们发过来的请求和
    测试结果。
    """

    command_re = re.compile(r"(\w+)(:.+)*")
    BUF_SIZE = 1024

    def handle(self):
        # self.request 是连接这个客户端的 TCP socket
        self.data = self.request.recv(self.BUF_SIZE).decode('utf-8').strip()
        print('receive data: ' + self.data)
        command_groups = self.command_re.match(self.data)
        if not command_groups:
            self.request.sendall("Invalid command")
            return
        command = command_groups.group(1)

这个函数可以处理如下指令:statusregisterdispatch,以及 results。其中 status 函数用来检测调度器服务是否处于运行状态。

        if command == "status":
            print("in status")
            self.request.sendall("OK".encode('utf-8'))

为了让调度器的功能生效,我们需要注册至少一个测试运行器。当 register 命令和一个主机端口对被调用时,它将运行器的信息存储在一个列表,(运行器对象会附件到 ThreadingTCPServer 对象中)。以便在以后需要为运行器提供一个提交 ID 来运行测试时与运行器通信。

        elif command == "register":
            # 将测试运行器添加到我们的池子中
            print("register")
            address = command_groups.group(2)
            host, port = re.findall(r":(\w*)", address)
            runner = {"host": host, "port": port}
            self.server.runners.append(runner)
            print(" runner %s has registered" % runner)
            self.request.sendall("OK".encode('utf-8'))

dispatch 由存储库监听器用于为提交分配测试运行器。此命令的格式为 dispatch:<commit ID>。调度器从该消息中解析出提交 ID 并将其发送给测试运行器。

        elif command == "dispatch":
            print("going to dispatch")
            commit_id = command_groups.group(2)[1:]
            if not self.server.runners:
                self.request.sendall("No runners are registered".encode('utf-8'))
            else:
                # The coordinator can trust us to dispatch the test
                self.request.sendall("OK".encode('utf-8'))
                dispatch_tests(self.server, commit_id)

results 指令会由测试运行器在上报测试结果时调用。此命令的用法为 results:<commit ID>:<length of results data in bytes>:<results><commit ID>用于标识测试报告对应的提交ID。<length of results data in bytes> 用于计算结果数据使用需要多大的缓冲区。最后,<results> 中是实际报告信息。

        elif command == "results":
            print("got test results")
            results = command_groups.group(2)[1:]
            results = results.split(":")
            commit_id = results[0]
            length_msg = int(results[1])
            # 3 代表发送命令中的冒号数量
            remaining_buffer = self.BUF_SIZE - (len(command) + len(commit_id) + len(results[1]) + 3)
            if length_msg > remaining_buffer:
                self.data += self.request.recv(length_msg - remaining_buffer).strip()
            del self.server.dispatched_commits[commit_id]
            if not os.path.exists("test_results"):
                os.makedirs("test_results")
            with open("test_results/%s" % commit_id, "w") as f:
                data = self.data.split(":")[3:]
                data = "\n".join(data)
                f.write(data)
            self.request.sendall("OK".encode('utf-8'))

测试运行器(test_runner.py

测试运行器负责对给定的提交 ID 运行测试并上报测试结果。它仅与调度器通信,调度器负责为其提供需要运行测试的提交ID,并接收测试结果。

test_runner.py 文件被调用后会首先调用 serve 函数以启动测试运行器服务,并启动一个线程来运行 dispatcher_checker 函数。由于此启动过程与 repo_observer.pydispatcher.py 的启动过程非常相似,因此我们在这里就不再赘述。

dispatcher_checker 函数每五秒对调度器执行一次 ping 操作,以确保它仍然在正常运行。这个操作主要是出于资源管理上的考虑。如果对应的调度器已经关闭,那么测试运行器也会关闭。否则测试运行器就只能空跑,无法接收新的任务也无法提交之前任务产生报告。

     def dispatcher_checker(server):
        # 检查调度器是否出故障。如果出现问题,我们将关闭它
        # 当调度器程序返回后它可能没有相同的主机/端口
        while not server.dead:
            time.sleep(30) # 稍微改长点
            if(time.time() - server.last_communication) > 10:
                try:
                    response = helpers.communicate(
                        server.dispatcher_server["host"],
                        int(server.dispatcher_server["port"]),
                        "status"
                    )
                    if response != "OK":
                        print("Dispatcher is no longer functional")
                        server.shutdown()
                        return
                except socket.error as e:
                    print("Can't communicate with dispatcher: %s" % e)
                    server.shutdown()
                    return

测试运行器的服务与调度器相同都是 ThreadingTCPServer,它需要多线程运行,因为调度器既会向它下发提交 ID,也可能在测试运行的期间 ping 它是否在运行状态。

class ThreadingTCPServer(socketserver.ThreadingTCPServer):
    dispatcher_server = None # 保存调度服务器的 host/port 信息
    last_communication = None # 追踪来自调度器的最后一次v通信
    busy = False # 状态标志
    dead = False # 状态标

整个通信流是从调度器向测试运行器发送需要运行测试的提交 ID 开始的。如果测试运行器可以运行测试,它会发送确认消息响应调度器,然后关闭第一个连接。为了使测试运行器在运行测试的同时还能接受来自调度器的请求,它会单独启动一个线程来运行测试。

这样,当调度器在测试运行器正在运行测试的时候发来一个请求(比如一个 ping 请求),运行器服务将在一个单独的线程上作出响应,测试运行器的测试在另一个线程上仍在运行。这样测试运行器就可支持同时运行多个任务了。还有一种替代多线程运行的设计是在调度器与测试运行器间建立一个长连接。但这样会在调度器端消耗大量的内存来维持连接,另外这种方式还容易受网络影响,比如突然的断线。

测试运行器会从调度器接收两种消息。第一种是 ping 消息 ,调度器用这个消息来验证测试运行器是否仍处于活跃状态。

class TestHandler(SocketServer.BaseRequestHandler):
    ...

    def handle(self):
        ....
        if command == "ping":
            print("pinged")
            self.server.last_communication = time.time()
            self.request.sendall("pong".encode('utf-8'))

另一个是 runtest,它的格式是 runtest:<commit ID> 。这条指令用于分配器下发需要测试的提交 ID。当接收到 runtest 时,测试运行器将检查当前是否有正在运行的测试。如果有,它会给调度器返回 BUSY 的响应。如果没有,它会返回 OK,将其状态设置为 busy 并运行其 run_tests 函数。

        elif command == "runtest":
            print("got runtest command: am I busy? %s" % self.server.busy)
            if self.server.busy:
                self.request.sendall("BUSY".encode('utf-8'))
            else:
                self.request.sendall("OK".encode('utf-8'))
                print("running")
                commit_id = command_groups.group(2)[1:]
                self.server.busy = True
                self.run_tests(commit_id, self.server.repo_folder)
                self.server.busy = False

这个函数会调用一个叫 test_runner_script.sh 的 shell 脚本,该脚本会将存储库更新到给定的提交ID。脚本返回后,如果存储库已经被成功的更新,运行器会使用 unittest 运行测试并将结果收集到一个文件中。测试运行完毕后,测试运行器将读入结果报告文件,并将报告发送给调度器。

    def run_tests(self, commit_id, repo_folder):
        # update repo
        output = subprocess.check_output(["./test_runner_script.sh",
                                        repo_folder, commit_id])
        print(output)
        # 运行测试
        test_folder = os.path.join(repo_folder, "tests")
        suite = unittest.TestLoader().discover(test_folder)
        result_file = open("results", "w")
        unittest.TextTestRunner(result_file).run(suite)
        result_file.close()
        result_file = open("results", "r")
        # 将结果发给调度器
        output = result_file.read()
        helpers.communicate(self.server.dispatcher_server["host"],
            int(self.server.dispatcher_server["port"]),
            "results:%s:%s:%s" % (commit_id, len(output), output))

test_runner_script.sh 的内容如下 :

#!/bin/bash
REPO=$1
COMMIT=$2
source run_or_fail.sh
run_or_fail "Repository folder not found" pushd "$REPO" 1> /dev/null
run_or_fail "Could not clean repository" git clean -d -f -x
run_or_fail "Could not call git pull" git pull
run_or_fail "Could not update to given commit hash" git reset --hard "$COMMIT"

要运行 test_runner.py ,必须将其指向存储库的副本。你可以使用我们先前创建的 /path/to/test_repo test_repo_clone_runner 副本作为启动参数。默认情况下, test_runner.py将在 localhost 的 8900-9000 端口上启动,并尝试连接到 localhost:8888 上的调度服务器。你可以通过一些可选参数来更改这些值。--host--port 参数用于指定运行测试运行器服务器的地址和端口,--dispatcher-server 参数指定调度器的地址。

控制流程图

下图是该系统的概述图。图中假设所有三个文件( repo_observer.py , dispatcher.py和test_runner.py )都已在运行,并描述了每个进程在新的提交发生时所采取的操作。

diagram.png

运行代码

我们可以在本地运行这个简单的 CI 系统,为每个进程使用不同的终端 shell。我们首先启动调度器,它默认运行在端口 8888 上:

$ python dispatcher.py

打开一个新的的 shell,我们启动测试运行器(这样它就可以在调度器中注册了):

$ python test_runner.py <path/to/test_repo_clone_runner>

测试运行器将自动为自己分配端口,范围为 8900-9000。你可以根据需求运行多个测试运行器。

最后,在另一个新 shell 中,让我们启动代码库监听器:

$ python repo_observer.py --dispatcher-server=localhost:8888 <path/to/repo_clone_obs>

现在一切准备就绪,让我们触发一些测试玩一下吧!根据设计我们需要创建一个新的提交来触发测试。切换到你的主存储库中, 随便改点什么:

$ cd /path/to/test_repo
$ touch new_file
$ git add new_file
$ git commit -m"new file" new_file

然后 repo_observer.py 识别到有一个新的提交产生了,之后通知调度器。你可以在它们各自的 shell 窗口中查看它们的运行日志。当调度器收到测试结果,它就会将它们保存在此存储库中的 test_results/ 文件夹中,并使用提交ID作为文件名。

错误处理

该 CI 系统中包括一些简单的错误处理。

如果你将 test_runner.py 进程杀掉,dispatcher.py 能够识别该运行器已经不再活跃,并将其从运行器池中移除。

你也可以模拟网络或系统故障,在测试运行器执行测试的时候将它杀死。这时,调度器会识别到运行器已经挂了,它会将挂掉的运行器从运行器池中移除,并将这个运行器之前在执行的任务分配给池中其它的运行器。

如果你杀掉调度器,那么监听器会直接报错。测试运行器也会发现调度器不再运行,并自动关闭。

总结

通过将关注点分解到各自的流程,我们构建了一个分布式的持续集成系统的的基础。通过套接字请求实现进程间的通信,我们的 CI 系统可以分布式的运行在不同的机器上,这增强了我们的系统可靠性和可扩展性。

这套 CI 系统现在的功能仍然非常简单,你可以对它进行各种扩展以实现更多功能。以下是一些改进建议。

每次提交运行测试

当前系统将定期检查是否有新的提交并对最近的一次提交运行测试。这个设计可以改为每次提交都触发测试。你可以修改定期检查程序,获取在两次轮询中发生的所有提交来实现这个功能。

更智能的运行器

如果测试运行器检测到调度器没有响应,则它将停止运行。当测试运行器正在运行测试时,也会立即关闭!如果测试运行器可以有一段时的等待期或者长期运行(如果你并不在乎它对资源的占用)来等待调度器恢复可能会更好一些。这样当调度器恢复时,运行器既可以将之前执行的测试的报告重新发回调度器。这样可以避免因调度器故障而引起的重复任务,在对每一个提交都执行测试时,这将很大程度上节约运行器资源。

报告展示

在真正的 CI 系统中,测试报告一般会发送到一个单独的收集结果的报告服务中。报告服务会将结果发送到某个地方供查看,或是设置一些通知规则,在遇到故障或其他一些特殊的情况下通知相关人员。你可以为我们的 CI 系统创建一个独立的报告进程,替换掉调度器的报告收集功能。这个新的进程可以是一个 Web 服务(或链接到一个Web服务上), 这样我们就可以在网页上直接在线查看测试报告,甚至可以用一个邮件服务器来实现测试失败时的提醒。

测试运行器管理器

在当前的系统中,我们必须手动运行 test_runner.py 文件来启动测试运行器。你可以创建一个测试运行器管理器进程,通过这个进程来管理查看所有运行器上的负载和来自调度器的请求,对运行器的数量进行相应的调整。这个进程会接受所有的测试任务,根据任务启动测试运行器,并在任务少的时候减少运行器的实例。

遵循这些建议,你可以使这个简单的 CI 系统更加健壮并且容错率更高,并且具有与其他系统(比如一个网页版的报告查看器)集成的能力。

如果你希望了解现在的持续集成系统可以实现到什么样的灵活性,我建议你去看看 Jenkins,这是一个用 Java 编写的非常强大的开源 CI 系统。它提供了一个基本的 CI 系统,同时也允许使用插件进行扩展。你可以通过 GitHub 访问其源代码。另一个推荐的项目是Travis CI ,它是用 Ruby 编写的,其源代码也可以通过 GitHub 得到。

这是了解 CI 系统如何工作以及如何自己构建 CI 系统的尝试。现在你应该对制作一个可靠的分布式系统所需的内容有了更深入的了解,希望你可以利用这些知识开发更复杂的解决方案。


  1. 使用 bash 是因为我们需要检查文件是否存在、创建文件和使用 Git,而 shell 脚本是实现这一点最直接、最简单的方法。或者,你可以使用跨平台的 Python 包;例如,Python 的 os 内置模块可以用于访问文件系统,GitPython 可以用于 Git 访问,但它们执行操作的方式没那么直接。

上一篇下一篇

猜你喜欢

热点阅读