Python(三十)并发池
2021-12-05 本文已影响0人
Lonelyroots
1. 可重复使用的线程
1.1. 队列计数器
可通过
q.join()
判定队列是否为空,当计数器为0,队列里面无任务,才不会阻塞。
q.task_done()
进行减1操作。
import queue
q=queue.Queue(4)
q.join() # 这个时候是不会阻塞的
q.put('a') # put之后就会阻塞,因为队列内部有个计数器,每次put都会加1,给队列添加一个'a'
q.join() # 这个时候是会阻塞的
q.get() # get时计数器不会减1
q.join() # 会阻塞
q.task_done() # 调用这个方法后,队列元素才会减1
q.join() # 不会阻塞
1.2. 可重复利用线程
利用生产者与消费者模型,实现线程的重复利用。主进程可以看成生产者,主要产生任务线程作为消费者主要处理任务。
import time
import queue
import threading
class MyThread(threading.Thread):
def __init__(self):
super().__init__() # 调用父类的初始化方法
self.daemon = True # 守护模式开启
self.queue = queue.Queue(4) # 设置线程队列初始容量
self.start() # 实例化时直接开启线程
def run(self):
while True: # 无限循环,不停地接收和执行任务 消费者
func,args,kwargs = self.queue.get() # 通过拆包形式获取数据
func(*args,**kwargs) # 执行任务
self.queue.task_done() # 计数器减一操作,便于判断任务是否完成
def submit_tasks(self,func,args=(),kwargs={}): # 给外部提供一个提交任务的接口
self.queue.put((func,args,kwargs)) # 向队列中提交任务,相当于生产者,计数器会加1
def join(self): # 等待所有任务执行完毕
self.queue.join() # 查看计数器是否执行完毕
def func1():
time.sleep(2)
print('当前func1任务完成')
def func2(*args,**kwargs):
time.sleep(2)
print('当前func2任务完成',args,kwargs)
mt = MyThread()
mt.submit_tasks(func1)
mt.submit_tasks(func2,args=(1,2,'a'),kwargs={'q':11,'w':22})
mt.submit_tasks(func1)
mt.submit_tasks(func2,args=(1,2,'a'),kwargs={'q':11,'w':22})
mt.join() # 让主进程等待子进程
2. 线程池的实现
2.1. 线程池概念
主线程类似于生产者,用来生成任务;
线程池:类似消费者,用来处理任务。
2.2. 线程池实现
通过线程池来实现,多线程同时解决问题。
import time
import queue
import threading
class MyPool:
def __init__(self,n):
self.queue = queue.Queue(10)
for i in range(n): # 初始化时,直接开启多条可重复使用的线程
threading.Thread(target=self.func,daemon=True).start()
def func(self):
while True: # 无限循环,不停地接收和执行任务 消费者
func,args,kwargs = self.queue.get() # 通过拆包形式获取数据
func(*args,**kwargs) # 执行任务
self.queue.task_done() # 计数器减一操作,便于判断任务是否完成
def submit_tasks(self,func,args=(),kwargs={}): # 给外部提供一个提交任务的接口
self.queue.put((func,args,kwargs)) # 向队列中提交任务,相当于生产者,计数器会加1
def join(self): # 等待所有任务执行完毕
self.queue.join() # 查看计数器是否执行完毕
def func1():
time.sleep(2)
print('当前func1任务完成')
def func2(*args,**kwargs):
time.sleep(2)
print('当前func2任务完成',args,kwargs)
mp = MyPool(2)
mp.submit_tasks(func1)
mp.submit_tasks(func2,args=(1,2,'a'),kwargs={'q':11,'w':22})
mp.submit_tasks(func1)
mp.submit_tasks(func2,args=(1,2,'a'),kwargs={'q':11,'w':22})
mp.join()
3. 内置池
Python已经内置了线程池,提供了更多验证,更加安全。
3.1. 内置线程池
import queue
import time
from multiprocessing.pool import ThreadPool # 导入内置线程池
from multiprocessing.pool import Pool # 导入内置进程池
def func1():
time.sleep(2)
print('当前func1任务完成')
def func2(*args,**kwargs):
time.sleep(2)
print('当前func2任务完成',args,kwargs)
# tp = ThreadPool(3) # 内置线程池开3个
tp = Pool(3) # 内置进程池开3个
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.close() # 停止继续向队列提交任务
tp.join()
3.2. 内置进程池
import queue
import time
from multiprocessing.pool import ThreadPool # 导入内置线程池
from multiprocessing.pool import Pool # 导入内置进程池
def func1():
time.sleep(2)
print('当前func1任务完成')
def func2(*args,**kwargs):
time.sleep(2)
print('当前func2任务完成',args,kwargs)
# tp = ThreadPool(3) # 内置线程池开3个
tp = Pool(3) # 内置进程池开3个
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.close() # 停止继续向队列提交任务
tp.join()
4.使用池来实现并发服务器
4.1. 内置线程池
import socket
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
server = socket.socket()
server.bind(('127.0.0.1',8989))
server.listen(10)
def socket_recv(conn):
while True:
recv_data = conn.recv(1024)
if recv_data:
print(recv_data)
conn.send(recv_data)
else:
conn.close()
break
n = cpu_count() # 获取当前设备核心数
tp = ThreadPool(n)
while True:
conn,address = server.accept()
tp.apply_async(socket_recv,args=(conn,))
4.2. 内置进程池
'''进程池+线程池并发服务器'''
import time
import socket
from multiprocessing import Pool,cpu_count
from multiprocessing.pool import ThreadPool
server = socket.socket()
server.bind(('127.0.0.1',8989))
server.listen(10)
def socket_recv(conn):
while True:
recv_data = conn.recv(1024)
if recv_data:
print(recv_data)
conn.send(recv_data)
else:
conn.close()
break
def accept_process(server): # 创建对等连接套接字方法
tp = ThreadPool(cpu_count()*2) # 开启线程数一般是CPU核心的两倍
while True:
conn, address = server.accept()
tp.apply_async(socket_recv, args=(conn,))
n = cpu_count() # 获取当前设备核心数
p = Pool(n) # 开启对应进程数
p.apply_async(accept_process,args=(server,))
p.close()
p.join()
文章到这里就结束了!希望大家能多多支持Python(系列)!六个月带大家学会Python,私聊我,可以问关于本文章的问题!以后每天都会发布新的文章,喜欢的点点关注!一个陪伴你学习Python的新青年!不管多忙都会更新下去,一起加油!
Editor:Lonelyroots