一些收藏python

三个小时,我用 Python 做了一个服务监控系统

2022-01-14  本文已影响0人  初心不变_叶子

前几天简书崩溃的事情大家还记得么?

现在数据已经恢复了,大家不必担心,这件事虽然处理慢了点,但终归是没有造成什么严重后果。

这件事让我熬了个夜,跑了无数次单元测试,回复了近百条用户反馈,更新两个公告,同时还要对自己的服务进行停运和降级处理。

我发现这个故障,是在守联的官方群里,而证实的方法不只是自己打开简书 App 看看,而是需要连上服务器跑 JRT 的单元测试,因为故障可能只存在于手机端,也可能软件并没有报错,但数据有问题。

但除了用户反馈,是不是还有其它方式,可以更快发现故障呢?

用户反馈这一渠道,我们是被动方,而我们可以通过每隔一段时间主动尝试的方式,进行主动监控。

国内外有很多相关的服务,但大多数是收费的,而且可定制性不高。我找了一圈,GitHub 上并没有符合我心意、足够轻量级的服务监控框架。

软件开发行业有句老话:不要重复发明轮子,意思是有现成的东西就要加以利用,不要自己实现一套一样的。

但是既然在这个领域还没有轮子,那咱们就自己造!

需求分析与设计

简单梳理了一下,这个系统要实现以下功能:

第五个的实现方式与今天的主题关系不大,暂且不提。

监控任务是多样的,有些任务要访问网址,根据错误码进行服务状态的判断,有些任务要寻找特定网页元素,还有些要判断一个值是否在区间内,我们不可能逐个定义好这些任务类型,否则程序的后期维护将会很繁琐。

我们使用函数来封装每个监控任务。由于所有监控函数中都没有需要动态变化的规则,所以我们不需要向函数传入参数。

函数的内部逻辑可能是多种多样的,但可以分为成功和失败两种类型,成功只有一种可能性,而失败有很多种,所以我们引入了错误码机制。为了便于分析,我们引入一个可选的消息字段。

我们定义的函数返回值如下:

提到定时执行,我们第一个想到的就是 Python 开发中非常常用的 apscheduler 库,它可以帮助我们处理复杂的调度逻辑,同时保持简洁与高度可定制化。

这里需要提到它的“事件”机制,简单来说,它允许我们设定一个函数,在特定事件发生时自动调用这个函数,并将事件的详细信息作为参数传入。

为了实现各种执行结果的处理,我们使用它提供的以下两个事件:

执行日志存档这个需求,我们第一个想到的就是将日志存入 txt 文件,但随着数据的不断增加,日志文件将越来越大,读取速度也会受到影响。在大量结构化数据的处理中,txt 文件并不是值得推荐的解决方案。

提到大量结构化数据,对信息技术稍有了解的小伙伴可能已经想到了:我们可以用数据库存储它们。

为了保证程序的简洁,我们使用 SQLite 数据库。这个名字大家可能有些陌生,但它是世界上装机量最大的数据库,微信的本机聊天记录就存放在这个数据库里。

然后呢?我们需要手写 SQL 语句来实现存储?这样未免有些太麻烦了,而且会带来一些不必要的安全问题。

好在我们有 ORM,全称 Object Relational Mapping,对象关系映射。

简单来说,它可以帮助我们把数据库变成 Python 中的对象,我们可以通过修改对象属性的方式,对数据库进行操作,而 SQL 的拼接与执行则隐藏在内部,不需要我们过多干预。

我们使用轻量级 ORM 框架 Peewee。

最后是消息推送。大家想到的可能是推送到微信上,但微信官方并没有对个人账号提供这一接口,而企业微信注册流程较为复杂,Server 酱这类服务需要付出额外的成本,虽然不高,但体验终究不算良好。

我之前写过一篇文章:用飞书实现脚本运行状态推送,所以,飞书,今天我又来了。

实现思路和技术选型完成,可以开始写代码了。

监控函数注册

打开编辑器,新建一个文件夹,git init 初始化存储库,然后新建 .gitignore 文件,让 Git 忽略部分文件,不将其添加到版本库中。

除了加入各类缓存的目录外,我们还需要将 *.db*.yaml 这两项加入排除列表。数据库文件很好理解,无用数据提交上去会污染版本库,YAML 则是一种配置文件格式,我们要在其中填入自己的飞书机器人鉴权信息,如果提交到了版本库上会存在安全风险。

软件开发安全守则指出,严禁将鉴权信息写入代码,严格做好数据与代码的分离,我们在开发过程中,应时刻谨记这一原则。

我们使用设计模式中的策略模式实现监控函数的注册:

策略模式,指对象有某个行为,但是在不同的场景中,该行为有不同的实现算法。

这里,我们使用 Python 的装饰器对其稍加修改,其实现思路来源于《流畅的 Python》一书,如果大家有基础的 Python 程序开发经验,可以找来细细阅读。

注册监控函数的装饰器代码如下:

def register_task(task_name: str = None, run_cron: str = None) -> Callable:
    """将一个函数注册为监控任务

    Args:
        func (Callable): 监控任务函数
        task_name (str): 监控任务名称, Defaults to None.
        run_cron (str): 监控任务运行的 cron 表达式, Defaults to None.
    """
    if not run_cron:
        raise ValueError("run_cron 不能为空")

    def outer(func: Callable):
        @wraps(func)
        def inner(task_name: str = None, run_cron: str = None):
            if task_name:
                registered_funcs[task_name] = [func, run_cron]
            else:
                registered_funcs[func.__name__] = [func, run_cron]
            return func
        return inner(task_name, run_cron)
    return outer

这一装饰器函数将会接收两个参数:监控任务的名称和任务运行的 cron 表达式,并将它们存入字典 registered_funcs 中。

cron 是 Linux 下的一种计划任务语法,可以用简单的字符串表示定时执行规则。

我们可以这样使用这个装饰器函数。

@register_task(name="测试监控任务", cron="0 1-59 * * * *")
def test_task():
    pass

注册这一函数后,registered_funcs 的值如下:

{'测试监控任务': [<function test_task at 0x000001C04B677700>, '0 1-59 * * * *']}

这里的 cron 表达式代表每分钟的第一秒执行一次。

这里给出一个简书主站的监控任务实现:

@register_task("简书主站监控", "0 1-59 * * * *")  # 每分钟执行一次
def JianshuMainPageMonitor():
    try:
        response = httpx_get("https://www.jianshu.com/")
    except Exception as e:
        success = False
        status_code = -1
        message = str(e)
    else:
        message = ""
        status_code = response.status_code
        if status_code != 200:
            success = False
        else:
            success = True
    return (success, status_code, message)

可以看到,我们的函数返回上文定义的三个字段。

日志存储

我们需要用定义 Python 类的方式定义数据库结构。

运行日志:

class RunLog(Model):
    id = IntegerField(primary_key=True)
    time = DateTimeField()
    level = IntegerField()
    message = CharField()

    class Meta:
        database = SqliteDatabase("log.db")

监控日志:

class MonitorLog(Model):
    id = IntegerField(primary_key=True)
    time = DateTimeField()
    monitor_name = CharField()
    successed = BooleanField()
    status_code = IntegerField()
    message = CharField()

    class Meta:
        database = SqliteDatabase("log.db")

这里的各种字段,都是 Peewee 提供的对象,它们会在数据库表创建时对应到不同的数据类型。

我们将两类日志存放在同一个数据库文件 log.db 中,便于统一管理。程序中对数据库的查询不多,性能需求不敏感,故不需要新建索引。

最后,我们编写一个函数,初始化数据库并创建对应的表:

def init_db() -> None:
    """初始化数据库"""
    RunLog.create_table()
    MonitorLog.create_table()

数据库有了,但为了后期维护考虑,我们需要封装一下,将高级接口暴露给程序的其它部分使用。

先定义数字与日志等级对应如下:

封装部分代码省略,注意将错误捕获并写入运行日志。

配置文件管理

我们使用 YAML 格式的配置文件,这是一种比 Json 更简介易读的格式,PyYAML 库实现了对它的读写操作。

目前我们要配置的字段不多,先定义默认配置文件,Python 命名规范中,常量使用全大写字母,下划线分隔:

DEFALUT_CONFIG = {
    "message_service": {
        "app_id": "",
        "app_secret": "",
        "email": ""
    }
}

读写配置文件的函数如下:

def CreateDefaultConfig() -> None:
    with open("config.yaml", "w", encoding="utf-8") as f:
        dump(DEFALUT_CONFIG, f, indent=4, allow_unicode=True)


def GetConfig() -> Dict:
    try:
        with open("config.yaml", "r", encoding="utf-8") as f:
            return load(f, SafeLoader)
    except FileNotFoundError:  # 配置文件不存在
        CreateDefaultConfig()
        return GetConfig()

这里我们对配置文件不存在的处理方式是使用默认值创建,然后返回其内容,实际情况下可以改为文件不存在时报错,将第一次运行时无配置文件的处理逻辑交给主程序实现。

消息推送

申请飞书开发者账号、新建应用、授权与获取鉴权的操作方式已经在用飞书实现脚本运行状态推送这篇文章中写过,这里不再赘述。

我们从原先的项目中将请求飞书 API 相关的代码复制过来,并稍作修改:

def SendFeishuMessage(app_id: str, app_secret: str, email: str, message: str) -> None:
    headers = {"Content-Type": "application/json; charset=utf-8"}
    data_to_get_token = {"app_id": app_id, "app_secret": app_secret}
    headers["Authorization"] = "Bearer " + httpx_post("https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", headers=headers, json=data_to_get_token).json()["tenant_access_token"]

    data_to_send_message = {"email": email, "msg_type": "text", "content": {"text": message}}
    response = httpx_post("https://open.feishu.cn/open-apis/message/v4/send/", headers=headers, json=data_to_send_message)
    if response.json()["code"] != 0:
        AddRunLog(1, f"向{email}发送飞书消息失败,错误码:{response.json()['code']}")
    else:
        AddRunLog(3, f"向{email}发送飞书消息成功,消息代码:{response.json()['data']['message_id']}")

然后使用类似的方式,通过更改 data_to_send_message 的方式自定义消息的格式,实现以下三种消息类型的封装:

这三个函数都有三个相同的参数:

我们对配置文件管理模块稍加改动,生成空白的配置文件,然后将信息填入其中:

message_service:
    app_id: 'cli_a157**********0d'
    app_secret: 'TSll************************ituf'
    email: 'ye************11@qq.com'

事件处理

上文已经提到,我们需要用到 apscheduler 的两个事件,分别是任务执行成功和任务执行失败。

但注意,任务执行失败只有可能是出现了未捕获的异常,但执行成功有多种情况,有可能因为错误被捕获,或者数据超出范围造成失败,但在这些情况下,该任务在调度器看来依然是“执行成功”。

当然,也可以通过更改代码逻辑的方式,使任务在满足一定条件时主动抛出异常,两者孰优孰劣可以自行定夺。

我们定义的事件处理函数需要接收一个参数:event,这是 apscheduler 中对事件信息的封装。

首先,我们定义一个任务运行成功事件的处理函数:

def JobExecutedSuccessfully(event) -> None:

查阅文档,找到 event 对象的可用属性,其中有我们需要的内容:

使用 Python 的元组解包特性赋值相关变量,代码如下:

success, status_code, message = event.retval

之后,我们需要判断任务是否成功,如果不成功,就在终端输出相应提示,并推送告警信息,最后将相关数据存入数据库。

但是,我们细细一想,会发现一个问题:我们的程序会每分钟监测一次简书主页的状态,如果简书崩溃 60 分钟,我们就会收到 60 条相同的告警信息,这显然是没有必要的,而且如此频繁地调用飞书的 API 可能导致访问超限,继而影响其它告警信息的发送。

所以我们需要一种方式,对该任务的历史状态进行检测,判断其告警信息是否已经发送过。

由于我们已经设计过,每次任务执行时,都会将数据入库,所以我们只需以任务名称作为筛选条件,获取其上一次运行的结果即可,如果上一次运行未能成功,则告警信息已经发送过。

在日志服务模块中添加一个函数,实现这一功能:

def IsFailedUntilNow(monitor_name: str) -> bool:
    try:
        last_log = MonitorLog.select().where(MonitorLog.monitor_name == monitor_name).order_by(MonitorLog.id.desc()).get()
        return not last_log.successed
    except Exception as e:
        AddRunLog(level=1, message=f"查询监控日志失败:{e}")
        return True  # 无法判断时假设任务一直处于失败状态

这时,使用 ORM 框架的好处就体现出来了,我们可以很方便地指定筛选条件和排序规则。

注意,在事件处理函数中,这一查询应先于本次任务结果的入库执行,否则本次任务的结果将会作为函数的判断依据,导致这一逻辑失效。

回到事件处理函数,我们使用这一函数判断告警信息是否发送过,然后根据不同状态执行对应逻辑:

为了便于调试,不要忘记在每个分支中加上合适等级的运行日志记录。

用相同的方式封装好任务失败事件的处理函数,这一函数的逻辑较为简单,只需要判断并执行上述逻辑的前两条即可。

程序主逻辑

对程序开发有所了解的小伙伴可能已经发现了:我们使用的是自底向上的开发方式。

其实,我们也可以先编写好空函数,使用高层函数实现主要逻辑,然后逐个实现函数内容。

主程序启动时,先导入第三方库和我们自己编写的模块:

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.schedulers.background import BackgroundScheduler

from config_service import GetConfig
from event_handlers import JobExecutedFailure, JobExecutedSuccessfully
from log_service import AddRunLog
from message_service import SendFeishuSystemMessage
from monitors import init_monitors
from register import get_registered_funcs_info
from utils import CronToKwargs

为了保证导入语句符合规范,这里日志模块不是最先导入的,但如果其它模块的初始化过程中可能发生异常,建议变更导入顺序,将日志模块的导入提前,避免发生错误时无法记录日志。

之后,我们需要校验配置文件是否完整填写:

if not all((GetConfig()["message_service"]["app_id"],
           GetConfig()["message_service"]["app_secret"],
           GetConfig()["message_service"]["email"])):
    AddRunLog(1, "消息服务配置不完整")
    print("消息服务配置不完整,请检查配置文件")
    AddRunLog(0, "程序退出,原因:消息服务配置不完整")
    exit(1)

其实配置文件的读取其实还有待优化,以现有的逻辑,每调用一次这个函数,就会对磁盘进行一次访问,会拖慢程序的速度,但考虑到在程序中配置文件的读取不算频繁,而且服务器上使用的是固态硬盘,随机读取性能尚可,现阶段不需要优化。

使用一行代码来初始化调度器:

scheduler = BackgroundScheduler()

AddRunLog(4, "初始化调度器成功")

然后用我们封装好的接口获取已注册的函数,遍历数据,进行任务注册:

funcs_to_schedule = get_registered_funcs_info()

AddRunLog(3, f"已注册的监控任务数量:{len(funcs_to_schedule)}")

AddRunLog(4, "开始添加监控任务")
for task_name, (func, run_cron) in funcs_to_schedule.items():
    scheduler.add_job(func, "cron", **CronToKwargs(run_cron), id=task_name)
    AddRunLog(4, f"添加监控任务:{task_name} 成功,运行 cron 表达式:{run_cron}")

AddRunLog(4, "监控任务添加完成")

这里引入了一个函数 CronToKwargs,这一函数来自于我们编写的 utils 模块,作用为将任务注册装饰器中的 cron 表达式进行拆分,返回一个字典,我们在此处使用字典解包将其作为参数传入 scheduler.add_job() 方法。

然后,注册我们的事件处理函数:

AddRunLog(4, "开始注册事件回调")
scheduler.add_listener(JobExecutedSuccessfully, EVENT_JOB_EXECUTED)
scheduler.add_listener(JobExecutedFailure, EVENT_JOB_ERROR)
AddRunLog(4, "事件回调注册完成")

最后,启动调度器,并发送系统消息:

scheduler.start()
AddRunLog(3, "调度器启动成功")
SendFeishuSystemMessage(GetConfig()["message_service"]["app_id"],
                        GetConfig()["message_service"]["app_secret"],
                        GetConfig()["message_service"]["email"],
                        "调度器启动成功")
print("调度器启动成功")

最后,使用 while True 开始一个无限循环,使用 input() 函数阻塞获取用户输入,并提供命令行管理能力:

Debug

运行程序,通过填入错误监控地址的方式使监控任务失败,发现虽然监控日志中有相应的记录,但告警信息并没有发送。

查看运行日志,发现程序认为告警信息已经发送过,但事实并非如此。在相应判断逻辑位置打断点,并在此处停止调度器避免重复运行带来的干扰。

单步运行函数,可以发现,由于监控数据库为空,程序在数据库查询中抛出了异常,异常被 except 语句捕获,执行了备用逻辑,导致告警信息发送被跳过。

IsFailedUntilNow 函数的 try 语句块中加入如下代码:

if MonitorLog.select().where(MonitorLog.monitor_name == monitor_name).count() == 0:
            return False  # 数据库中没有对应的记录,第一次运行即出现错误

再次运行,问题解决。

至此,程序编写完成。来看看我们写了多少行代码:

代码行数统计

我们只用了不到四百行代码,就实现了一个服务监控系统。

实战

凑巧的是,就在我编写好这个系统的几小时后,简书又一次出现故障。在本次故障的过程中,我收到的消息推送如下:

消息推送记录

简单梳理一下时间线:

这里解释一下流程中的一些问题:

为什么程序完成之后没有立刻上线启动运行?

我本来打算完善几个功能点,所以没有直接上线。

为什么没有调度器停止的信息?

因为我没有使用常规的退出方式(命令行输入退出指令),所以没有发送退出信息。后续打算改成注册退出事件的方式,保证程序无论因为何种原因退出,均可以推送系统信息。

为什么系统重新上线之后简书主站的监控预警没有再发送一遍?

因为数据库中还有上次运行的失败记录,重复发送预警信息在这种场景下也没有必要。

后续优化

后记

程序已经以 MIT 协议开源:JianshuAvailabilityMonitor

希望大家学编程不是只学会语法和特性,而能够将编程作为一种工具,真正应用到特定场景中。

上一篇下一篇

猜你喜欢

热点阅读