删除azkaban的执行历史

2021-06-12  本文已影响0人  飞有飞言
image.png

azkaban是一款工作流调度工具,由Linkedin开发并开源给社区。

azkaban保留了task,flow的执行历史。每个任务的标准输出(stdout)、标准错误输出(stderr)都会先暂时存到日志文件里,同时也存储到后端数据库中,一般是mysql。

azkaban通常用在大数据任务调度场景,把任务提交之后,如果任务是spark,hive,hadoop,flink等任务的话,会产生大量的日志输出。这些日志如果长时间存在mysql中,会让mysql的数据库过大,占用大量的磁盘空间。
所以就需要能够定时清除mysql中存储的执行历史的功能。

为此单独开发一个python脚本来清除执行历史。azkaban中有3个表和执行历史有关execution_logs, execution_jobs, execution_flows

因为最近的日志还是要看的,在任务执行有问题的时候还需要查看,所以保留最近执行的历史。

mysql的连接信息在azkaban的properties文件中保存的有,所以直接读了azkaban的properties文件。

#!/usr/bin/env python
# coding:utf-8


import sys
import pymysql
import time


# 清理30天之前的,每次清理3000条
sql_find_max_exec_id = '''select exec_id, from_unixtime(cast(submit_time/1000 as int)) from execution_flows 
where submit_time < unix_timestamp(date_sub(current_timestamp(), interval 30 day)) * 1000
order by exec_id asc limit 1 offset 3000
'''

tables_to_clear = ['execution_logs', 'execution_jobs', 'execution_flows']

sql_clear_execution = 'delete from %s where exec_id < %s'

def read_props(filepath):
    d = {}
    with open(filepath) as fp:
        for line in fp:
            line = line.strip()
            if not line or line.startswith('#'): continue
            t = line.split('=')
            d[t[0].strip()] = t[1].strip()

    return d


'''
mysql.port=3306
mysql.host=xxxx
mysql.database=azkaban
mysql.user=azkaban
mysql.password=aaaaaaaaaa
'''
props = read_props(sys.argv[1])


dbconn = pymysql.connect(host=props['mysql.host'], user=props['mysql.user'], 
    password=props['mysql.password'], database=props['mysql.database'],
    port=int(props.get('mysql.port', 3306)))


cursor = dbconn.cursor()
print('\n\n%s start' % time.strftime('%Y-%m-%d %H:%M'))


try:
    cursor.execute(sql_find_max_exec_id)
    max_exec_id, submit_time = cursor.fetchone()

    print('clear logs, max exec_id %d submit_time %s' % (max_exec_id, submit_time))

    for tbl in tables_to_clear:
        sql = sql_clear_execution % (tbl, max_exec_id)
        affected = cursor.execute(sql)
        print('delete from table %20s, %8d records deleted' % (tbl, affected))
    dbconn.commit()
except Exception, e:
    dbconn.rollback()
    raise e
finally:
    cursor.close()
    dbconn.close()

上一篇下一篇

猜你喜欢

热点阅读