并发编程-进程池和multiprocess.Pool

2019-10-28  本文已影响0人  Yanl__

multiprocess.Pool模块

1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值 (一般设置为cpu个数+1)
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组

  1. 返回值:ret = apply_async返回的对象,让用户可以通过ret.get()获得func的返回值。get会阻塞直到对应的func执行完毕拿到结果。
  2. 使用apply_async给进程池分配任务,需要先close后join来保持多进程和主进程代码的同步性

进程池的同步调用

# -*- coding: UTF-8 -*-

"""
# @Time    : 2019-10-23 17:02
# @Author  : yanlei
# @FileName: 进程池的同步调用.py
"""
import os, time
from multiprocessing import Pool

def work(n):
    print('%s run'%os.getpid())
    time.sleep(1)
    return n**2

p = Pool(3)
res_l = []
for i in range(10):
    res = p.apply(work, args=(i, )) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
    res_l.append(res)
print(res_l)

进程池的异步调用

import os, time
from multiprocessing import Pool

def work(n):
    print('%s run'%os.getpid())
    time.sleep(1)
    return n**2

p = Pool(3)
ret_l = []
for i in range(10):
    ret = p.apply_async(work, args=(i, )) #  异步调用
    ret_l.append(ret)
p.close()
p.join()
for ret in ret_l:
    print(ret.get())

进程池版socket并发聊天

server端

import os
from socket import *
from multiprocessing import Pool

server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)

def talk(conn):
    print('进程pid:%s'%os.getpid())
    while True:
        try:
            msg = conn.recv(1024)
            if not msg:break
            print(msg.decode('utf-8'))
            conn.send(msg.upper())
        except Exception:
            break

p = Pool(4)
while True:
    conn, *_ = server.accept()
    p.apply_async(talk, args=(conn, ))

client端

from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8080))

while True:
    msg = input('>>>').strip()
    if not msg:break

    client.send(msg.encode('utf-8'))
    msg = client.recv(1024)
    print(msg.decode('utf-8'))

回调函数

p.apply_async(get_data, args=(url, ), callback=call_back)

# -*- coding: UTF-8 -*-

"""
# @Author  : yanlei
# @FileName: 回调函数_爬取数据.py
"""
import requests
from multiprocessing import Pool

def get_data(url):
    response = requests.get(url)
    if response.status_code == 200:
        return url, response.content.decode('utf-8')

def call_back(args):
    url, content = args
    print(url, len(content))

url_list = [
    'https://www.baidu.com',
    'https://www.sohu.com',
    'https://www.sogou.com',
    'https://www.runoob.com',
    'https://leetcode-cn.com',
    'https://cn.bing.com',
]

p = Pool(2)
for url in url_list:
    p.apply_async(get_data, args=(url, ), callback=call_back)
p.close()
p.join()



上一篇 下一篇

猜你喜欢

热点阅读