Pythonpython

python3--文件filecmp比对+多进程multipro

2021-06-12  本文已影响0人  w_dll

最近无更新,是因为在写fastapi + vue, 准备写完了再放上来;
这次更新所做的是领导安排的任务,filecmp对比库没用过,索性放上来了;
改进之前文件比对脚本;
我改进的主要有两点
1 将sql提交合并到一次
2 将单线程检查的脚本改为多进程
通过这个方法提高巡检速度;

以下是脚本

脚本

需要注意点的
之前用的threading,他定义的变量是多个线程间可以共享的;
这次改用multiprocessing发现变量读取不了;
写了测试用例才发现,以后要注意;
以下为 multiprocessing.Manager().list()

import os,sys
import filecmp
import pymysql
from datetime import datetime


def compareme(dir1, dir2, list):
    dircomp=filecmp.dircmp(dir1, dir2, ignore=['_gsdata_'])
    left=dircomp.left_only
    diff=dircomp.diff_files
    comm=dircomp.common_dirs
    [list.append(os.path.abspath(os.path.join(dir1,x))) for x in left]
    [list.append(os.path.abspath(os.path.join(dir1,y))) for y in diff]
    for i in comm:
        compareme(os.path.abspath(os.path.join(dir1,i)), os.path.abspath(os.path.join(dir2,i)), list)


def get_sql(**kwargs):
    sour      = kwargs.get('sour')
    dect      = kwargs.get('dect')
    tablename = kwargs.get('tablename')
    cur       = kwargs.get('cur')
    dict = {
                'D':'efb-master_192.168.110.93',
                'H':"efb-slave_192.168.110.94",
                'I':"pudongefb-bak_10.128.129.113",
                'G':'telcom_192.168.190.89',
                'J':'unicom_192.168.176.98',
                'K':'pudongefb_10.128.129.112'
           }
    target=dect[:1]
    if os.path.exists(target+":"):
        list = []
        compareme(sour, dect, list)
        temp = list[:]
        for i in temp:
            if '_gsdata_' in i:
                list.remove(i)
        if len(list):
            sql = "insert into " + tablename + " values('%s', '%s', '%s', '%s')"
            kwargs.get('sql_list').append(sql % (dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0], "fail", cur))
            for i in range(len(list)):
                print("Time %s number %s the ip %s file %s has not sync!" % (cur, i + 1, dict[target].split('_')[1:2][0], list[i]))
            print("Time %s the %s ip %s sync-log has fail create!" % (cur, dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0]))
        else:
            sql = "insert into " + tablename + " values('%s', '%s', '%s', '%s')"
            kwargs.get('sql_list').append(sql % (dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0], "success", cur))
            print("Time %s the %s ip %s sync-log has success create!" % (cur, dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0]))
    else:
        sql = "insert into " + tablename + " values('%s', '%s', '%s', '%s')"
        kwargs.get('sql_list').append(sql % (dict[target].split('_')[:1][0], dict[target].split('_')[1:2][0], "success", cur))
        print("Pls check %s mount" % target)


def get_sql_list(dirs_dict, tablename, cur):
    import multiprocessing
    task       = 6
    sql_list   = multiprocessing.Manager().list()
    create_sql = "create table if not exists %s" % tablename + "(syncname varchar(25),node varchar(25),result varchar(25),cr_date varchar(25))"
    sql_list.append(create_sql)
    while dirs_dict:
        exec_list = []
        for _ in range(task):
            if dirs_dict:
                dir_dict = dirs_dict.pop(0)
                exec_list.append(dir_dict)
        all_process = []
        for dir in exec_list:
            sour = dir['source']
            dect = dir['dist']
            _process = multiprocessing.Process(target=get_sql, kwargs={
                           'sour': sour,
                           'dect': dect,
                           'tablename': tablename,
                           'cur': cur,
                           'sql_list': sql_list
                       })
            all_process.append(_process)
        [ p.start() for p in all_process ]
        [ p.join() for p in all_process ]
    return sql_list


def mysql_conn(**mysql_cfg):
    try:
        conn = pymysql.connect(
                   host    = mysql_cfg.get('host', None),
                   user    = mysql_cfg.get('user', None),
                   passwd  = mysql_cfg.get('passwd', None),
                   db      = mysql_cfg.get('db', None),
                   port    = mysql_cfg.get('port', 3306),
                   charset = mysql_cfg.get('charset', 'utf8')
               )
    except Exception as e:
        return str(e), None
    else:
        return None, conn


def exec_sql(sql_list, conn):
    try:
        cursor = conn.cursor()
        for sql in sql_list:
            cursor.execute(sql)
        conn.commit()
    except Exception as e:
        conn.rollback()
        return str(e)
    else:
        cursor.close()
        return None


def main():
    mysql_cfg = {
                    'host': '192.168.1.111',
                    'user': 'test',
                    'passwd': '123456',
                    'db': 'test'
                }
    dirs_dict = [
                    { 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'H:\efbtomcat\webapps\ROOT\PackageData' },
                    { 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'I:\efbtomcat\webapps\ROOT\PackageData' },
                    { 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'G:\efbtomcat\webapps\ROOT\PackageData' },
                    { 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'J:\efbtomcat\webapps\ROOT\PackageData' },
                    { 'source': r'D:\efbtomcat\webapps\ROOT\PackageData', 'dist': r'K:\efbtomcat\webapps\ROOT\PackageData' }
                ]
    table_name = "efb_synclog_test_by_021786"
    now_time   = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    cur        = datetime.strptime(now_time,'%Y-%m-%d %H:%M:%S')
    total_num  = len(dirs_dict) + 1

    # 获取比对后的sql
    sql_list = get_sql_list(dirs_dict, table_name, cur)
    if len(sql_list) != total_num:
        error_info = 'get sql list error!\n'
        print(error_info)
        return
    else:
        for sql in sql_list:
            print(sql)
            print('=========================')
        print('已获取全部sql')

    # 连接数据库
    error, conn = mysql_conn(**mysql_cfg)
    if error != None:
        error_info = 'connect mysql error!\n' + error + '\n'
        print(error_info)
        return
    else:
        print('数据库连接成功')

    # 执行sql
    error = exec_sql(sql_list, conn)
    if error != None:
        error_info = 'exec sql list error!\n' + error + '\n'
        print(error_info)
        return
    print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'),'执行结束, 检查数据库')


if __name__ == '__main__':
    main()

执行结果

命令行结果

数据库结果

上一篇 下一篇

猜你喜欢

热点阅读