任务编排

python multiprocessing模块实现多进程任务中

2019-03-13  本文已影响0人  鸟它鸟

python multiprocessing模块实现多进程任务中运行多进程子任务,并实现并发控制。
起因是想使用celery+ansible做任务执行与回收,代码写好后,发现卧槽 celery不允许他执行的任务再建立子进程,这就比较尴尬了,封装好的ansible接口不能用?? 那怎么做多任务自动执行呢?

研究celery 发现他的默认执行的多进程机制是multiprocessing模块的Pool,通过代码测试这个模块,发现他也不允许自己的任务再建立子进程,于是乎大概明白什么回事。 后边可以研究下如何修改celery worker的默认并发机制。

不过本次想要绕过他,于是测试了multiprocessing的Process模块,发现这个模块是允许任务中建立子进程的。
然后就开始自己写任务执行器,先拿mysql简单做个任务队列,测通后再换kafka或者其他。
测试完成后,任务可以并行执行了,但是节奏得控制啊,要不好几万并发 自己不就挂了?翻了半天文档发现multiprocessing的Semaphore模块可以做到,测试后,代码如下。

#!/bin/python

from multiprocessing import Pool
from multiprocessing import Process,Semaphore,current_process
import sys, os, time, random, json

project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath('__file__'))))
sys.path.append(project_dir)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "wk_api.settings")

import django

django.setup()

from wkexe.models import WorkTables
from wkexe.models import ExecuteTables
from utils.ansible_api import ANSRunner


def RunPlaybook(ip, ymldir):
    s.acquire()
    print(time.strftime('%H:%M:%S'), current_process().name + " 获得锁运行");
    time.sleep(random.random() * 5)
    rbt = ANSRunner(ip)
    rbt.run_playbook(playbook_path='%s' % (ymldir))
    result = json.dumps(rbt.get_playbook_result(), indent=4)
    print(result)
    s.release()
    print(time.strftime('%H:%M:%S'), current_process().name + " 释放锁结束");


def RunModel(ip, model, module_args):
    s.acquire()
    print(time.strftime('%H:%M:%S'), current_process().name + " 获得锁运行");
    rbt = ANSRunner(ip)
    rbt.run_model(model, module_args)
    result = json.dumps(rbt.get_model_result(), indent=4)
    print(result)
    s.release()
    print(time.strftime('%H:%M:%S'), current_process().name + " 释放锁结束");



if __name__ == '__main__':
    while True:
        p_list = []
        works = WorkTables.objects.all().filter(status=10)
        for work in works:
            concurrent = work.concurrent  # 并发参数
            executeInfo = work.executetables_set.all()  # 需要执行的设备
            taskType = work.taskname.tasktype  # 任务类型
            p = Pool(1)
            for ip in executeInfo:
                ipAdd = ip.ip
                ipId = ip.id
                if taskType == 0:
                    ymlDir = work.taskname.taskymldir
                    print(ymlDir)
                    p = Process(target=RunPlaybook, args=(ipAdd, ymlDir))
                    # p.start()
                    # p.join()
                    # p.apply_async(RunPlaybook, args=(ipAdd, ymlDir))
                    p_list.append(p)
                elif taskType == 1:
                    model = work.taskname.taskmodel
                    modelArgs = work.taskname.taskargs
                    p = Process(target=RunModel, args=(ipAdd, model, modelArgs))
                    # p.start()
                    # p.join()
                    # p.apply_async(RunModel, args=(ipAdd, model, modelArgs))
                    p_list.append(p)
                else:
                    print("未定义的任务类型")

        s = Semaphore(concurrent) #用来控制对共享资源的访问数量,可以控制同一时刻并发的进程数
        for p in p_list:
            p.start()

        #
        for p in p_list:
            # p.close()
            p.join()
        print("执行完成")
上一篇下一篇

猜你喜欢

热点阅读