Python: Queue实现生产者及消费者模型

2019-04-10  本文已影响0人  圣地亚哥_SVIP

Python Queue模块是一个线程安全的队列。基于Queue实现生产者及消费者模型如下。参考Effective Python。
Queue相关操作

#!/usr/bin/python3

#Closeable Queeue

from threading import Thread
from queue import Queue


class Task(object):
    def __init__(self,count=-1):
        self.count = count
    def __repr__(self):
        return "Task Id: %d" % self.count
    def __str__(self):
        return "Task Id: %d" % self.count

class ClosableQueue(Queue):
    TERMINATOR = Task()

    def close(self):
        self.put(self.TERMINATOR)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.TERMINATOR:
                    return                #Exit Thread
                yield  item
            finally:
                self.task_done()

class Consumer(Thread):
    def __init__(self,func,work_queue,out_queue):
        super().__init__()
        self.worker = func
        self.task = work_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.task:
            result = self.worker(item)
            self.out_queue.put(result)

def worker(arg):
    print("Worker Solve: %s" % arg)
    return arg


def producer():
    work_queue = ClosableQueue()
    out_queue = ClosableQueue()
    threads = [Consumer(worker, work_queue, out_queue)]
    #threads = [Consumer(worker,work_queue,out_queue),Consumer(worker,work_queue,out_queue)]
    for thread in threads:
        thread.start()
    for num in range(30):
        work_queue.put(Task(num))
    work_queue.close()
    #Num Consumers,then Call Num close.
    #work_queue.close()
    work_queue.join()

if __name__ == '__main__':
    producer()
上一篇 下一篇

猜你喜欢

热点阅读