python多线程示例 -- 有限调度器处理队列数据

2020-04-04  本文已影响0人  打出了枫采

下面代码模拟了一种经典的多线程调度处理常景,4个并行的调度器处理总长度为100的队列中的数据

# -*- coding: utf-8 -*-
"""
待处理数据队列 长度N,M个在运行调度器处理
"""

from queue import Queue, Empty
from threading import Lock, Thread, BoundedSemaphore, current_thread, activeCount
import logging
import time


class AppLogger:
    def __init__(self, moduleName, logfile):
        self._logger = logging.getLogger(moduleName)
        handler = logging.FileHandler(logfile)
        fmt = "%(asctime)-15s %(levelname)s %(filename)s %(lineno)d %(message)s"
        formatter = logging.Formatter(fmt)
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)
        self._logger.setLevel(logging.INFO)

        console = logging.StreamHandler()
        console.setLevel(logging.INFO)
        console.setFormatter(formatter)
        self._logger.addHandler(console)

        self.warnning = self._logger.warning
        self.error = self._logger.error
        self.info = self._logger.info
        self.debug = self._logger.debug


dataQueueMaxLen = 100
schedulerMaxCount = 4

scheduleSources = BoundedSemaphore(schedulerMaxCount)

dataQueue = Queue(dataQueueMaxLen)

myLogger = AppLogger("myapp","test.log")

class Scheduler:
    def __init__(self, func, data):
        self._thread = Thread(target=func, args=data)
        self.record = "Scheduler(%s, %s)" % (func.__name__, *data)

    def __str__(self):
        return self.record

    def start(self):
        # myLogger.info(("START ", self.record))
        self._thread.start()

dataList = range(dataQueueMaxLen)

for data in dataList:
    dataQueue.put(data)

def testFunc(data):
    thd = current_thread()
    myLogger.info("Thread %s start testFunc %s sleep 5 seconds" % (thd , data))

    time.sleep(5)
    scheduleSources.release()
    myLogger.info("Thread %s exit testFunc %s" % (thd, data))

def startScheduler(func, data):
    if scheduleSources.acquire():
        proc = Scheduler(func, [data])
        proc.start()

if __name__ == "__main__":
    while not dataQueue.empty():
        data = dataQueue.get()
        startScheduler(testFunc, data)
        myLogger.info("current running thread count %d " % activeCount())

最终运行打印结果如下,可以看出每隔5秒启动4个线程处理,如此往复处理队列中数据 image.png
上一篇下一篇

猜你喜欢

热点阅读