初入python

多进程简解

2018-08-20  本文已影响0人  言烬

多进程
进程:正在执行的应用程序
多进程:多个运行的应用程序

Python多进程开发:内建标准模块multiprocessing
可以通过该模块的Process进程类型,可以很方便的创建和管理多个进程

常见的multiprocessing属性和模块
multiprocessing.Process 进程类型,用于创建和管理进程
multiprocessing.Lock/RLock 进程互斥锁/重用锁, 用于进程同步,数据锁
multiprocessing.Event 进程事件类型,用于进程同步,数据信号通信
multiprocessing.Condition 进程条件类型, 用于进程同步, 数据信号通信
multiprocessing.Queue 进程队列类型,用于多进程数据共享
multiprocessing.Manager 进程管理类型,专门用于多进程数据共享
multiprocessing.Listener/Client 进程监听客户端,基于网络多进程之间的数据共享

多进程的基础操作及面向对象的实现

#coding:utf-8
import multiprocessing, os

#基于函数
def my_proc():
    print("第一个独立的进程,程序的进程编号:", os.getpid(), os.getppid())

def my_proc2(name):
    print(name, "第二个独立的进程,程序的进程编号:", os.getpid(), os.getppid())


#基于类型的
class MyProc(multiprocessing.Process):

    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        print(self.name, "基于类型的进程执行", os.getpid(),self.ident, os.getppid())


if __name__ == "__main__":
    #创建基于函数的多进程
    p1 = multiprocessing.Process(target=my_proc)
    p2 = multiprocessing.Process(target=my_proc)
    #启动进程
    p1.start()
    p2.start()

    #基于函数的,添加参数之后的多进程
    p11 = multiprocessing.Process(target=my_proc2, args=('tom',))#元组中若只有一个元素,则必须带逗号!
    p12 = multiprocessing.Process(target=my_proc2, args=('jerry',))#元组中若只有一个元素,则必须带逗号!
    #启动进程
    p11.start()
    p12.start()
    
    #创建基于类型的多个进程
    p1 = MyProc("进程A")
    p2 = MyProc("进程B")
    #启动进程
    p1.start()
    p2.start()

多进程的简化:内置进程池
多进程的操作在实际应用中也是非常多的,但是纯底层的代码开发控制并发也是一件非常繁 琐的事情,所以就出现了面向过程多进程并发的优化操作方式:进程池 Pool

通过进程池 Pool 可以快速创建多个进程执行指定函数,完成高并发处理操作

"""
进程池
"""
#引入需要的模块
import multiprocessing, os, time

#创建程序函数
def my_proc():
    print(multiprocessing.current_process().name, "一个进程正在执行", os.getpid(), os.getppid())
    time.sleep(1)


if __name__ == "__main__":
    #创建一个进程池
    pool = multiprocessing.Pool(2)  #同时工作的任务数

    #循环任务
    for i in range(20):  #安排的任务数
        pool.apply_async(my_proc)

    #停止提交任务
    pool.close()

    #独占执行
    pool.join()

简单案例:多进程下载 有了进程池,可以简单完成一个多进程任务下载的操作处理

#模拟下载:通过多进程的方式执行
import time
from multiprocessing import current_process, Pool

def download(url):
    #下载函数
    print(current_process().name, "开始下载数据》》》》", url)
    time.sleep(1)
    print(current_process().name, "下载数据完成《《《")
    time.sleep(1)
    return "下载号的数据"


def sava_data(data):
    print("保存下载的数据:", data)


if __name__ == "__main__":
    #创建一个进程池
    pool = Pool(5)

    #循环下载数据
    for i in range(20):
        pool.apply_async(download, args=("http://baudu.com",), callback=sava_data)

    #停止提交任务给进程池
    pool.close()
    #独占
    pool.join()

多个进程通信:multiprocessing.Manager
不同线程之间的数据通信,涉及到核心的数据共享问题,主要由 PYTHON 中提供的内建模 块 multiprocessing.Manager 类型实现,该类型内置了大量的用于数据共享的操作

multiprocessing.Manager 常见属性和方法

Array 内置进程间共享数组类型
Queue 内置进程间共享队列类型
list() 内置进程间共享列表类型
dict() 内置进程间共享字典类型
Value 内置进程间共享值类型
Barrier 进程同步类型
BoundedSemaphore| Semaphore 进程信号量类型
Lock|RLock 进程互斥锁/重用锁
Event 进程同步事件类型
Condition 进程同步条件类型

import multiprocessing, time, random

#创建一个条件对象
con = multiprocessing.Condition()

#创建篮子
basket = list()

def product():
    while True:
        time.sleep(1)
        #上锁
        con.acquire()

        if len(basket) > 20:
            print(":篮子满了, 快来吃吧")
            con.wait()
        else:
            #生产一个包子
            _no = random.randint(1, 10)
            print("蒸好了一个包子", _no)
            basket.append(_no)
            con.notify()

        #解锁
        con.release()

def consumer():
    while True:
        time.sleep(0.5)
        #上锁
        con.acquire()

        if len(basket) <= 0:
            print("吃光了,快上菜吧")
            con.wait()
        else:
            _no = basket.pop()
            print("吃掉了一个包子", _no)
            con.notify()
        #解锁
        con.release()


if __name__ == "__main__":

    for i in range(5):
        p = multiprocessing.Process(name="生产者" + str(i) + "号", target=product)
        p.start()

    for j in range(5):
        c = multiprocessing.Process(name="消费者" + str(j) + "号", target=consumer)
        c.start()

多个进程通信:multiprocessing.Queue
多个进程之间的通信操作,数据的传递在 PYTHON 中的 multiprocessing 模块中提供了一个 专门用于多进程之间进行数据传递的队列:Queue

multiprocessing.Queue 常见属性和方法
put(data [, timeout=None]) 添加一个数据到队列中
put_nowait(data) 添加一个数据到队列中,非阻塞模式
get([timeout=None]) 从队列中获取一个数据
get_nowait() 从队列中获取一个数据,非阻塞模式
full() 判断队列是否已满
empty() 判断队列是否已空
close() 关闭队列
qsize() 获取队列中的元素数量

引入需要的模块

import multiprocessing, time, random

#定义一个队列储存数据
basket = multiprocessing.Queue(20)

def product():

    while True:
        time.sleep(1)
        _no = random.randint(0, 10)
        try:
            basket.put(_no, timeout=1)
            print("生产者生产了一个数据", _no)

        except:
            basket.full()
            print("数据框满了")


def consumer():

    while True:
        time.sleep(0.5)
        try:
            _no = basket.get(timeout=1)
            print("消费者取走了一个数据", _no)

        except:
            basket.empty()
            print("数据框空了")


if __name__ == "__main__":
    for i in range(2):
        p = multiprocessing.Process(target=product)
        p.start()

    for j in range(2):
        c = multiprocessing.Process(target=consumer)
        c.start()
上一篇下一篇

猜你喜欢

热点阅读