初见-码农工具文计算机语言

Python(三十)并发池

2021-12-05  本文已影响0人  Lonelyroots

1. 可重复使用的线程

1.1. 队列计数器

可通过q.join()判定队列是否为空,当计数器为0,队列里面无任务,才不会阻塞。
q.task_done()进行减1操作。

import queue
q=queue.Queue(4)
q.join()        # 这个时候是不会阻塞的
q.put('a')      # put之后就会阻塞,因为队列内部有个计数器,每次put都会加1,给队列添加一个'a'
q.join()        # 这个时候是会阻塞的
q.get()         # get时计数器不会减1
q.join()        # 会阻塞
q.task_done()   # 调用这个方法后,队列元素才会减1
q.join()        # 不会阻塞

1.2. 可重复利用线程

利用生产者与消费者模型,实现线程的重复利用。主进程可以看成生产者,主要产生任务线程作为消费者主要处理任务。

import time
import queue
import threading

class MyThread(threading.Thread):
    def __init__(self):
        super().__init__()      # 调用父类的初始化方法
        self.daemon = True      # 守护模式开启
        self.queue = queue.Queue(4)     # 设置线程队列初始容量
        self.start()            # 实例化时直接开启线程

    def run(self):
        while True:     # 无限循环,不停地接收和执行任务   消费者
            func,args,kwargs = self.queue.get()     # 通过拆包形式获取数据
            func(*args,**kwargs)       # 执行任务
            self.queue.task_done()        # 计数器减一操作,便于判断任务是否完成

    def submit_tasks(self,func,args=(),kwargs={}):     # 给外部提供一个提交任务的接口
        self.queue.put((func,args,kwargs))      # 向队列中提交任务,相当于生产者,计数器会加1

    def join(self):     # 等待所有任务执行完毕
        self.queue.join()       # 查看计数器是否执行完毕

def func1():
    time.sleep(2)
    print('当前func1任务完成')

def func2(*args,**kwargs):
    time.sleep(2)
    print('当前func2任务完成',args,kwargs)

mt = MyThread()
mt.submit_tasks(func1)
mt.submit_tasks(func2,args=(1,2,'a'),kwargs={'q':11,'w':22})
mt.submit_tasks(func1)
mt.submit_tasks(func2,args=(1,2,'a'),kwargs={'q':11,'w':22})
mt.join()       # 让主进程等待子进程

2. 线程池的实现

2.1. 线程池概念

主线程类似于生产者,用来生成任务;

线程池:类似消费者,用来处理任务。

2.2. 线程池实现

通过线程池来实现,多线程同时解决问题。

import time
import queue
import threading

class MyPool:
    def __init__(self,n):
        self.queue = queue.Queue(10)
        for i in range(n):       # 初始化时,直接开启多条可重复使用的线程
            threading.Thread(target=self.func,daemon=True).start()

    def func(self):
        while True:     # 无限循环,不停地接收和执行任务   消费者
            func,args,kwargs = self.queue.get()     # 通过拆包形式获取数据
            func(*args,**kwargs)       # 执行任务
            self.queue.task_done()        # 计数器减一操作,便于判断任务是否完成

    def submit_tasks(self,func,args=(),kwargs={}):     # 给外部提供一个提交任务的接口
        self.queue.put((func,args,kwargs))      # 向队列中提交任务,相当于生产者,计数器会加1

    def join(self):     # 等待所有任务执行完毕
        self.queue.join()       # 查看计数器是否执行完毕

def func1():
    time.sleep(2)
    print('当前func1任务完成')

def func2(*args,**kwargs):
    time.sleep(2)
    print('当前func2任务完成',args,kwargs)

mp = MyPool(2)
mp.submit_tasks(func1)
mp.submit_tasks(func2,args=(1,2,'a'),kwargs={'q':11,'w':22})
mp.submit_tasks(func1)
mp.submit_tasks(func2,args=(1,2,'a'),kwargs={'q':11,'w':22})
mp.join()

3. 内置池

Python已经内置了线程池,提供了更多验证,更加安全。

3.1. 内置线程池

import queue
import time
from multiprocessing.pool import ThreadPool     # 导入内置线程池
from multiprocessing.pool import Pool    # 导入内置进程池

def func1():
    time.sleep(2)
    print('当前func1任务完成')

def func2(*args,**kwargs):
    time.sleep(2)
    print('当前func2任务完成',args,kwargs)

# tp = ThreadPool(3)      # 内置线程池开3个
tp = Pool(3)      # 内置进程池开3个
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.close()      # 停止继续向队列提交任务
tp.join()

3.2. 内置进程池

import queue
import time
from multiprocessing.pool import ThreadPool     # 导入内置线程池
from multiprocessing.pool import Pool    # 导入内置进程池

def func1():
    time.sleep(2)
    print('当前func1任务完成')

def func2(*args,**kwargs):
    time.sleep(2)
    print('当前func2任务完成',args,kwargs)

# tp = ThreadPool(3)      # 内置线程池开3个
tp = Pool(3)      # 内置进程池开3个
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.apply_async(func1)
tp.apply_async(func2,args=('moran',18),kwds={'beidou':48})
tp.close()      # 停止继续向队列提交任务
tp.join()

4.使用池来实现并发服务器

4.1. 内置线程池

import socket
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool

server = socket.socket()
server.bind(('127.0.0.1',8989))
server.listen(10)

def socket_recv(conn):
    while True:
        recv_data = conn.recv(1024)
        if recv_data:
            print(recv_data)
            conn.send(recv_data)
        else:
            conn.close()
            break

n = cpu_count()     # 获取当前设备核心数
tp = ThreadPool(n)
while True:
    conn,address = server.accept()
    tp.apply_async(socket_recv,args=(conn,))

4.2. 内置进程池

'''进程池+线程池并发服务器'''
import time
import socket
from multiprocessing import Pool,cpu_count
from multiprocessing.pool import ThreadPool

server = socket.socket()
server.bind(('127.0.0.1',8989))
server.listen(10)

def socket_recv(conn):
    while True:
        recv_data = conn.recv(1024)
        if recv_data:
            print(recv_data)
            conn.send(recv_data)
        else:
            conn.close()
            break

def accept_process(server):     # 创建对等连接套接字方法
    tp = ThreadPool(cpu_count()*2)     # 开启线程数一般是CPU核心的两倍
    while True:
        conn, address = server.accept()
        tp.apply_async(socket_recv, args=(conn,))

n = cpu_count()     # 获取当前设备核心数
p = Pool(n)        # 开启对应进程数
p.apply_async(accept_process,args=(server,))
p.close()
p.join()

文章到这里就结束了!希望大家能多多支持Python(系列)!六个月带大家学会Python,私聊我,可以问关于本文章的问题!以后每天都会发布新的文章,喜欢的点点关注!一个陪伴你学习Python的新青年!不管多忙都会更新下去,一起加油!

Editor:Lonelyroots

上一篇下一篇

猜你喜欢

热点阅读