一个基于多进程主从模式的简单实现(Python)

2019-03-27  本文已影响0人  slords

这里以一种简单展示的在Python下的主从模式的多进程实现,并对一些实现过程中可能遇到的坑做了简单的说明

import uuid
import time
from multiprocessing import Process, Queue, Value
from queue import Empty


class Master(object):

    def __init__(self, job_func, servant_count=5):
        """
        御主类型
        :param job_func: 任务函数
        :type job_func: function
        :param servant_count: 侍从数量
        :type servant_count: int
        """
        self.job_queue = Queue()
        self.job_func = job_func
        self.servant_list = [
            Servant(str(i), job_func, self.job_queue)
            for i in range(servant_count)
        ]

    def start(self):
        """
        开始运行
        :return: None
        """
        for servant in self.servant_list:
            servant.start()

    def end(self):
        """
        结束运行, 关闭所有侍从
        :return: None
        """
        for servant in self.servant_list:
            servant.inactive()

    def add_job(self, *args, **kwargs):
        """
        添加任务
        :param args: 任务函数的参数
        :param kwargs: 任务函数的参数
        :return: 任务id
        :rtype str
        """
        job_id = str(uuid.uuid1())
        self.job_queue.put((job_id, args, kwargs))
        return job_id


class Servant(Process):

    time_out = 1

    def __init__(self, servant_name, job_func, job_queue):
        """
        侍从类型
        :param servant_name: 侍从名称(进程名称)
        :type servant_name: str
        :param job_func: 任务函数
        :type job_func: function
        :param job_queue: 任务队列
        :type job_queue: Queue
        """
        self.job_func = job_func
        self.job_queue = job_queue
        # 侍从状态, Value('b', 0)是C类型里的bite型
        # 这里0表示关闭,1表示开启
        # 该类型可以用于进程共享,效率比基于pickle的共享内存更具效率
        self.active = Value('b', 0)
        super(Servant, self).__init__(name=servant_name)

    def inactive(self):
        """
        关闭侍从
        :return: None
        """
        self.active.value = 0

    def start(self):
        """
        侍从启动
        :return: None
        """
        self.active.value = 1
        super(Servant, self).start()

    def run(self):
        """
        子进程函数
        :return: None
        """
        # 如果仆从一直处于激活状态则持续完成任务
        while self.active.value:
            try:
                # 获取任务
                job_id, args, kwargs = self.job_queue.get(timeout=self.time_out)
            except Empty:
                # 如果没有获取到任务重复循环
                continue
            try:
                # 执行任务获取结果
                result = self.job_func(*args, **kwargs)
            except Exception as e:
                # 处理任务函数异常
                error = str(e)
                result = None
            else:
                error = None
            # 处理任务结果,这里直接打印
            print(
                '<{servant_id}|{job_id}> result:{result},error:{error}'.format(
                    job_id=job_id,
                    result=result,
                    error=error,
                    servant_id=self.name
                )
            )


# 用于测试的函数
# 这里没有把该函数定义在 if __name__ == '__main__' 中,是为了兼容window
# 因为window不支持fork,在window平台执行时,
# 子进程创建运行时,会需要将父子进程共享的变量以pickle的方式复制一份,
# 如果定义在if __name__ == '__main__'中,
# 则会由于在子进程中引入当前模块时 __name__ != '__main__'
# 导致函数未被定义,最终pickle信息对应的对象路径不存在,无法还原对应函数
def test_func(wait_time=1, return_value=None):
    """
    测试任务函数
    :param wait_time: 等待的时间
    :type wait_time: int / float
    :param return_value: 返回值
    :type return_value: object
    :return: return_value
    :rtype: object
    """
    time.sleep(wait_time)
    return return_value


if __name__ == '__main__':
    import random

    master = Master(job_func=test_func, servant_count=3)
    master.start()
    for x in range(20):
        # 随机等待0~1秒,返回x
        master.add_job(random.random(), x)
    # 等待2秒关闭任务运行
    time.sleep(2)
    master.end()
    # 这个时候正在执行的任务不受影响没执行的任务将不再被执行
    
# 执行结果
"""
<0|98bd686c-5077-11e9-acec-1002b543289d> result:0,error:None
<1|98bdb683-5077-11e9-8494-1002b543289d> result:2,error:None
<2|98bdb682-5077-11e9-9ffb-1002b543289d> result:1,error:None
<0|98bdb684-5077-11e9-9133-1002b543289d> result:3,error:None
<1|98bdb685-5077-11e9-9541-1002b543289d> result:4,error:None
<0|98bdb687-5077-11e9-afed-1002b543289d> result:6,error:None
<2|98bdb686-5077-11e9-9753-1002b543289d> result:5,error:None
<1|98bdb688-5077-11e9-85aa-1002b543289d> result:7,error:None
<2|98bdb68a-5077-11e9-8121-1002b543289d> result:9,error:None
<0|98bdb689-5077-11e9-8e73-1002b543289d> result:8,error:None
"""
上一篇 下一篇

猜你喜欢

热点阅读