Python全栈工程师

26.2-Queue使用和分发器实现

2019-10-22  本文已影响0人  BeautifulSoulpy

当你把希望,放在别人身上时,你会选择等待,当你把希望,放在自己身上时,你会选择奔跑!

总结

1.函数的栈跟线程相关;函数的每一次调用都是不同的;
2.时间应该跟timedelta相加减;

Queue 模块

它可用于在生产者(producer)和消费者(consumer)之间线程安全(thread-safe)地传递消息或其它数据,因此多个线程可以共用同一个Queue实例。Queue的大小(元素的个数)可用来限制内存的使用。

Queue 模块实现了三种类型的队列,它们的区别仅仅是队列中元素被取回的顺序。

在 FIFO 队列(先入先出)中,先添加的任务先取回;
在 LIFO 队列(后入先出)中,最近被添加的元素先取回(操作类似一个堆栈);
在优先级队列PriorityQueue中,元素将保持排序( 使用 heapq 模块 ) 并且最小值的条目第一个返回;

阻塞:卡住了,
超时:None 不超时; 5 等5秒 ;

Queue方法

常用方法 -
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与 maxsize 大小对应
Queue.get([block[, timeout]]) 获取队列,timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 写入队列,timeout等待时间, item非可迭代对象
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作

从队列中移除元素并返回这个元素block 为 阻塞,timeout为超时。
如果block为True,是阻塞,timeout为None就是一直阻塞
如果block为True但是timeout有值,就阻塞到一定秒数抛出Empty异常。

如果block为False,是非阻塞,timeout将被忽略,要么成功返回一个元素,要么抛出empty异常。

1 queue 模块定义的类和异常

queue 模块定义了以下四种不同类型的队列,它们之间的区别在于数据入队列之后出队列的顺序不同。

1.1 queue.Queue(maxsize=0)

入参 maxsize 是一个整数,用于设置队列的最大长度。一旦队列达到上限,插入数据将会被阻塞,直到有数据出队列之后才可以继续插入。
如果 maxsize 设置为小于或等于零,则队列的长度没有限制。
block 阻塞

qsize()
获取队列的元素个数。

put(item [,block[, timeout]]): 
往queue中放一个item

get(item [,block[, timeout]]): 
从queue中取出一个item,并在队列中删除的这个item

import queue
q = queue.Queue()  # 创建 Queue 队列
for i in range(3):
    q.put(i)  # 在队列中依次插入0、1、2元素
for i in range(3):
    print(q.get())  # 依次从队列中取出插入的元素,数据元素输出顺序为0、1、2

q.get(True,None)# 一直等,直到有数据为止;
q.get(True,5)   # 等一会 Empty;
q.get(False,None) # 要么拿,要么Empty;
q.get(False,5)  #要么拿,要么直接Empty;

如果 block 为 True , timeout 为 None(也是默认的选项),那么get()/put()可能会阻塞,直到队列中出现可用的数据/位置。如果 timeout 是正整数,那么函数会阻塞直到超时N秒,然后抛出一个异常。

如果 block 为 False ,如果队列无数据,调用get()或者有无空余位置时调用put(),就立即抛出异常
有数据则拿,无数据则抛异常;timeout 将会被忽略)。

get 和 put _ 阻塞block 和 超时timeout
import queue

# 设置上限maxsize=10
q = queue.Queue(maxsize=10)

# 往队列加10个数据
for i in range(100):
    if q.qsize() >= 10:
        # 存放的数据达到上限maxsize,插入会导致阻塞
        break
    else:
        q.put(i)

# 从队列取值
while not q.empty():
    n = q.get()
    print("本次取出数据:%s" % n)
#----------------------------------------------------------------
本次取出数据:0
本次取出数据:1
本次取出数据:2
本次取出数据:3
本次取出数据:4
本次取出数据:5
本次取出数据:6
本次取出数据:7
本次取出数据:8
本次取出数据:9


# 一直提交数据;
from queue import Queue

q = Queue(5)
for i in range(5):
    q.put(i + 1)
print('-' * 30)

# 从对列中取值;
while True:
    a = input('<<<<')
    print('~~~~~~~~~')
    print(q.full())
    if not q.full():
        q.put_nowait(a)
    else:
        while not q.empty():
            print(q.get())
    print('~~~~~~~~~~~~~~~')
#--------------------------------------------
~~~~~~~~~
True
1
2
3
4
5
~~~~~~~~~~~~~~~
~~~~~~~~~

2 Queue、LifoQueue、PriorityQueue 和 SimpleQueue 对象的基本使用方法

Queue、LifoQueue、PriorityQueue 和 SimpleQueue 四种队列定义的对象均提供了以下函数使用方法,下面以 Queue 队列为例进行介绍。

2.1 Queue.qsize()

返回队列中数据元素的个数。注意并发情况;

import queue
q = queue.Queue()
q.put('python-100')  # 在队列中插入元素 'python-100'
print(q.qsize())  # 输出队列中元素个数为1
2.2 Queue.put(item, block=True, timeout=None)

item,放入队列中的数据元素。
timeout,设置超时时间。

block,当队列中元素个数达到上限继续往里放数据时:
如果 block=False,直接引发 queue.Full 异常;
如果 block=True,且 timeout=None,则一直等待直到有数据出队列后可以放入数据;
如果 block=True,且 timeout=N,N 为某一正整数时,则等待 N 秒,如果队列中还没有位置放入数据就引发 queue.Full 异常。

import queue
try:
    q = queue.Queue(2)  # 设置队列上限为2
    q.put('python')  # 在队列中插入字符串 'python'
    q.put('-') # 在队列中插入字符串 '-'
    q.put('100', block = True, timeout = 5) # 队列已满,继续在队列中插入字符串 '100',等待5秒后会引发 queue.Full 异常
except queue.Full:
    print('queue.Full')
2.3 Queue.get(block=True, timeout=None)

从队列中取出数据并返回该数据内容。

block,当队列中没有数据元素继续取数据时:
如果 block=False,直接引发 queue.Empty 异常;
如果 block=True,且 timeout=None,则一直等待直到有数据入队列后可以取出数据;
如果 block=True,且 timeout=N,N 为某一正整数时,则等待 N 秒,如果队列中还没有数据放入的话就引发 queue.Empty 异常。

import queue
try:
    q = queue.Queue()
    q.get(block = True, timeout = 5) # 队列为空,往队列中取数据时,等待5秒后会引发 queue.Empty 异常
except queue.Empty:
    print('queue.Empty')

2.4 Queue.get_nowait()

相当于 Queue.get(block=False)block,当队列中没有数据元素继续取数据时直接引发 queue.Empty 异常。

import queue
try:
    q = queue.Queue()
    q.get_nowait() # 队列为空,往队列中取数据时直接引发 queue.Empty 异常
except queue.Empty:
    print('queue.Empty')
2.5 Queue.put_nowait(item)

相当于 Queue.put(item, block=False),当队列中元素个数达到上限继续往里放数据时直接引发 queue.Full 异常。

import queue
try:
    q = queue.Queue(2)  # 设置队列上限为2
    q.put_nowait('python')  # 在队列中插入字符串 'python'
    q.put_nowait('-') # 在队列中插入字符串 '-'
    q.put_nowait('100') # 队列已满,继续在队列中插入字符串 '100',直接引发 queue.Full 异常
except queue.Full:
    print('queue.Full')
2.6 Queue.full()

如果队列中元素个数达到上限,返回 True,否则返回 False。

import queue
q = queue.Queue(3)  # 定义一个长度为3的队列
print(q.full())  # 元素个数未达到上限,返回 False
q.put('python')  # 在队列中插入字符串 'python'
q.put('-') # 在队列中插入字符串 '-'
q.put('100') # 在队列中插入字符串 '100'
print(q.full())  # 元素个数达到上限,返回 True
2.7 Queue.empty()

如果队列为空,返回 True,否则返回 False。

import queue
q = queue.Queue()
print(q.empty())  # 对列为空,返回 True
q.put('python-100')  # 在队列中插入元素 'python-100'
print(q.empty())  # 对列不为空,返回 False

命令分发器实现

生产者(数据源)生产数据,缓冲到消息队列中
数据处理流程: 数据加载 -》 提取 -》 分析(滑动窗口函数)

处理大量数据的时候,对于一个数据源来说,需要多个消费者处理。但是如何分配数据就是个问题了。
需要一个分发器(调度器),把数据分发给不同的消费者处理。
每一个消费者拿到数据后,有自己的处理函数。所以要有一种注册机制
数据加载 --》 提取 --》 分发 ---》 分析函数1|----》 分析函数2

分析1和分析2是不同的handler,不同的窗口宽度、间隔时间

如何分发?

这里就简单一点,轮询策略。
一对多的副本发送,一个数据通过分发器,发送到n个消费者。

消息队列

在生产者和消费者之间使用消息队列,那么所有消费者共用一个消息队列,还是各自拥有一个消息队列呢?
共用一个消息队列也可以,但是需要解决争抢的问题。相对来说每一个消费者自己拥有一个队列,较为容易。

如何注册?

在调度器内部记录有哪些消费者,每一个消费者拥有自己的队列。

线程
由于一条数据会被多个不同的注册过的handler处理,所以最好的方式是多线程。

线程

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位;
守护线程是一个在后台运行,且不用费心去关闭它的线程,因为它会随程序自动关闭。

如果程序运行的线程是非守护线程,那么程序将等待所有线程结束后再终止。
如果运行的是守护线程,当程序退出时,守护线程会被自动杀死。

# 线程
import threading, time
def add(x, y):
    print('enter this thread!')
    time.sleep(3)   # 等3秒的时候运行~~~~~~~~~,在再继续运行函数;
    print(x + y)
    print('===========end')

# 创建线程:target是函数名,args是一个元组类型
t = threading.Thread(target=add, args=(4, 5))   #Thread类很好地封装了有关线程的子类
t.start()    #调用.start()方法:
print('\n~~~~~~~~~~~~~~~~~~~~~')
#----------------------------------------------------------
enter this thread!
~~~~~~~~~~~~~~~~~~~~~   

9
===========end

# 线程
import threading,time
from queue import Queue
q = Queue()
def add(x,y):
    while True:
        print('enter this thread!')
        cmd = q.get() # 卡住了;
        print("~~~~~~~~~~~~~~~~")
    print(x+y)
    
t = threading.Thread(target=add,args=(4,5))
t.start()
print('-----------------------')
while True:
    cmd = input('<<<')
    if cmd == '':
        break
    else:
        q.put(cmd)
#-----------------------------------
enter this thread!-----------------------

<<<1
~~~~~~~~~~~~~~~~
enter this thread!
<<<


# 生成器;
import random #产生随机数;
import time  # 休息一会
import datetime # 时间

def source(seconds=1):
    while True:
        yield {'datetime':datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))),'value':random.randint(1,100)}
        time.sleep(seconds)
               
s = source()
# collecting date
items = [next(s) for _ in range(3)]
print(items)

def avg_handler(iterable):
    return sum(map(lambda item:item['value'],iterable)) / len(iterable)

ret = avg_handler(items)
print('{:.2f}'.format(ret))

import re,datetime,time,threading
from queue import Queue

def window(q:Queue,handler,width:int,interval:int):
    buf = []
    start =datetime.datetime.strptime('19700101 00:00:01 +0800','%Y%m%d %H:%M:%S %z')
    current =datetime.datetime.strptime('19700101 00:00:01 +0800','%Y%m%d %H:%M:%S %z')
    delta = datetime.timedelta(seconds=width - interval)  
    
    while True:
        data = q.get() # 阻塞的 next(iterator)
        if data:
            buf.append(data)
            current =data['datetime']
        print(current,start)
        
        if (current - start).total_seconds() > interval:
            print('~~~~~~~~~~~~~')
            ret = handler(buf)
            print('{:.2f}'.format(ret))
            print(threading.current_thread())
            start = current
            
            # clean old_date
            buf = [x for x in buf if x['datetime'] > current - delta ]
            
            
def dispatcher(src):
    handlers = []
    queues = []
    
    def reg(handler,width,interval):    # 数据谁,handler、width、interval  ;
        q = Queue()
        t = threading.Thread(target=window,args=(q,handler,width,interval))
        
        queues.append(q)
        handlers.append(t)
        
    def run():
        for t in handlers:
            t.start()
            
        while True:
            data = next(src)
            for q in queues:
                q.put(data)
            
    return reg,run

reg,run = dispatcher(s)

reg(avg_handler,10,5)
# reg(avg_handler,10,5)            


# window(s,avg_handler,10,5)           
# run()
print(threading.current_thread())
run()
#---------------------------------------------------------
<_MainThread(MainThread, started 2488)>
2019-10-22 22:04:31.513446+08:00 1970-01-01 00:00:01+08:00
~~~~~~~~~~~~~
36.00
<Thread(Thread-12, started 1960)>
2019-10-22 22:04:32.513764+08:00 2019-10-22 22:04:31.513446+08:00
2019-10-22 22:04:33.514088+08:00 2019-10-22 22:04:31.513446+08:00
2019-10-22 22:04:34.514438+08:00 2019-10-22 22:04:31.513446+08:00
2019-10-22 22:04:35.514738+08:00 2019-10-22 22:04:31.513446+08:00
2019-10-22 22:04:36.515061+08:00 2019-10-22 22:04:31.513446+08:00
~~~~~~~~~~~~~
54.83
<Thread(Thread-12, started 1960)>
2019-10-22 22:04:37.515387+08:00 2019-10-22 22:04:36.515061+08:00
2019-10-22 22:04:38.515513+08:00 2019-10-22 22:04:36.515061+08:00
2019-10-22 22:04:39.515842+08:00 2019-10-22 22:04:36.515061+08:00
2019-10-22 22:04:40.516410+08:00 2019-10-22 22:04:36.515061+08:00
2019-10-22 22:04:41.517063+08:00 2019-10-22 22:04:36.515061+08:00

上一篇下一篇

猜你喜欢

热点阅读