并发编程-进程池和multiprocess.Pool
2019-10-28 本文已影响0人
Yanl__
multiprocess.Pool模块
-
Pool([numprocess [,initializer [, initargs]]])
:
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值 (一般设置为cpu个数+1)
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
-
p.apply(func [, args [, kwargs]])
:在一个池工作进程中执行func(args,*kwargs),然后返回结果。
ret = p.apply(func, args=()) 返回值就是func的return
'''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()''' -
p.apply_async(func [, args [, kwargs]])
:在一个池工作进程中执行func(args,*kwargs),然后返回结果。
- 返回值:ret = apply_async返回的对象,让用户可以通过ret.get()获得func的返回值。get会阻塞直到对应的func执行完毕拿到结果。
- 使用apply_async给进程池分配任务,需要先close后join来保持多进程和主进程代码的同步性
-
p.close()
:关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 -
p.join()
:等待所有工作进程退出。此方法只能在close()或teminate()之后调用
进程池的同步调用
# -*- coding: UTF-8 -*-
"""
# @Time : 2019-10-23 17:02
# @Author : yanlei
# @FileName: 进程池的同步调用.py
"""
import os, time
from multiprocessing import Pool
def work(n):
print('%s run'%os.getpid())
time.sleep(1)
return n**2
p = Pool(3)
res_l = []
for i in range(10):
res = p.apply(work, args=(i, )) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
res_l.append(res)
print(res_l)
进程池的异步调用
import os, time
from multiprocessing import Pool
def work(n):
print('%s run'%os.getpid())
time.sleep(1)
return n**2
p = Pool(3)
ret_l = []
for i in range(10):
ret = p.apply_async(work, args=(i, )) # 异步调用
ret_l.append(ret)
p.close()
p.join()
for ret in ret_l:
print(ret.get())
进程池版socket并发聊天
server端
- 进程池开几个进程,就能同时和几个客户端聊天。其他的需要在后面排队等待前面的进程结束归还进程池才能拿到。
import os
from socket import *
from multiprocessing import Pool
server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)
def talk(conn):
print('进程pid:%s'%os.getpid())
while True:
try:
msg = conn.recv(1024)
if not msg:break
print(msg.decode('utf-8'))
conn.send(msg.upper())
except Exception:
break
p = Pool(4)
while True:
conn, *_ = server.accept()
p.apply_async(talk, args=(conn, ))
client端
from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))
while True:
msg = input('>>>').strip()
if not msg:break
client.send(msg.encode('utf-8'))
msg = client.recv(1024)
print(msg.decode('utf-8'))
回调函数
p.apply_async(get_data, args=(url, ), callback=call_back)
# -*- coding: UTF-8 -*-
"""
# @Author : yanlei
# @FileName: 回调函数_爬取数据.py
"""
import requests
from multiprocessing import Pool
def get_data(url):
response = requests.get(url)
if response.status_code == 200:
return url, response.content.decode('utf-8')
def call_back(args):
url, content = args
print(url, len(content))
url_list = [
'https://www.baidu.com',
'https://www.sohu.com',
'https://www.sogou.com',
'https://www.runoob.com',
'https://leetcode-cn.com',
'https://cn.bing.com',
]
p = Pool(2)
for url in url_list:
p.apply_async(get_data, args=(url, ), callback=call_back)
p.close()
p.join()