大数据

DataX添加告警功能

2020-03-06  本文已影响0人  Rudolf_liu

前言

公司的DataX已经用了2年多了,性能以及基本功能上没有太大问题。但是有一个问题一直困扰着我,就是DataX的错误告警。DataX的日志问题,一直令人头疼。随着job的逐渐增多,一个调度程序或者脚本打印出来的日志实在太多,假如中途有哪个job执行报错,根本无法排查,虽然DataX自身会记录每个job执行的日志,在log目录下,命名规则:脚本名称+执行时间,但是文件数目过多排查起来还是挺困难的。

设计思路

有了需求,就可以开始设计,给DataX添加告警功能有两种办法,修改源码捕获日志,当时为了不侵入源码,采用了捕获日志的方式。

代码改动

修改DataX启动类,即bin/datax.py,将datax.py打印出来的日志放入pipe(管道)缓存中,根据最后执行返回的code判断任务成功还是失败,失败则需要调用告警功能。具体修改后的代码如下:

if __name__ == "__main__":
    printCopyright()
    parser = getOptionParser()
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        generateJobConfigTemplate(options.reader,options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args)
    # print startCommand
    child_process = subprocess.Popen(startCommand, shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

    register_signal()
    start=time.time()
    errmsg=[]
    while child_process.poll() == None:
        line=child_process.stdout.readline()
        print(line),  # 使用,是为了使打印出来的日志不换行
        errmsg.append(line)
        if time.time() - start >= 1800: # 超时时间30分钟
            # 可以添加一些超时的信息
            break

    if child_process.returncode != 0:
        # 执行具体的告警操作,可以从启动参数中获取到具体的脚本名称
        # 解析errmsg或者直接输出
        
    sys.exit(child_process.returncode)

总结

上一篇下一篇

猜你喜欢

热点阅读