Python多进程

2019-04-27  本文已影响0人  千鸟月读

Python多进程使用与总结

1.概要

​ 众所周知,由于GIL锁的存在,Python多线程并不是真正意义上的多线程,不能很好的利用多核CPU,为了充分的利用系统资源,py提供了multiprocessing多进程库,其支持子进程、通信和数据共享、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。[参考](https://www.cnblogs.com/tkqasn/p/5701230.html

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
  • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。
  • Process.PID中保存有PID,如果进程还没有start(),则PID为None。

注意:window下启动子进程必须在if name == "main"语句后面写相关语句,这个坑踩过

注意:window下启动子进程必须在if name == "main"语句后面写相关语句,这个坑踩过

2.使用实例

2.1 创建多进程

方式一:直接使用Process

import multiprocessing
import queue
import time
import random
import os

def out(msg):
    s = random.randint(1, 3) #随机延时
    time.sleep(s)
    print("msg:{}  time:{}   pid:{} ppid:{}".format(msg, time.asctime(time.localtime(time.time())), os.getpid(), os.getppid()))


def que():
    Queue = queue.Queue()#创建队列 先进先出
    for q in range(10):
        Queue.put(q)
    return Queue


if __name__ == "__main__":

    Que = que() 
    for i in range(10):
        p = multiprocessing.Process(target=out, args=(Que.get(), ))
        p.start()
        # p.join() # location1
    # p.join() #location2
    print("我是主进程")

方式二:重写run()方法

from multiprocessing import Process
import queue
import time
import random
import os

class Myprocess(Process):

    def __init__(self, msg):
        super(Myprocess, self).__init__()# 或者Process.__init__(self)
        self.msg = msg

    def run(self):
        s = random.randint(1, 3)  # 随机延时
        time.sleep(s)
        print("msg:{}  time:{}   pid:{} ppid:{}".format(self.msg, time.asctime(time.localtime(time.time())), os.getpid(),
                                                        os.getppid()))

def que():
    Queue = queue.Queue()  # 创建队列 先进先出
    for q in range(10):
        Queue.put(q)
    return Queue

if __name__ == "__main__":

    Que = que()
    for i in range(10):
        p = Myprocess(msg=Que.get())
        p.start()
    p.join()
    print("我是主进程")
2.2 Process类

  • 构造方法:

    Process([group [, target [, name [, args [, kwargs]]]]])

    group: 线程组,目前还没有实现,库引用中提示必须是None;
      target: 要执行的方法;
      name: 进程名;
      args/kwargs: 要传入方法的参数。。

  • 实例方法:

    is_alive():返回进程是否在运行。

    join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

    start():进程准备就绪,等待CPU调度

    run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

    terminate():不管任务是否完成,立即停止工作进程,但是子进程如果存在没完成,就会变成僵尸进程

  • 属性:

    authkey

    daemon:和线程的setDeamon功能一样,守护进程,父进程死了子进程也死了

    exitcode(进程在运行时为None、如果为–N,表示被信号N结束)

    name:进程名字。

    pid:进程号。

    ..........
    p = Myprocess(msg=10086)
    p.start()
    print("is_alive() :{}".format(p.is_alive()))
    print("p.exitcode: {}".format(p.exitcode))
    p.join()
    print("is_alive() :{}".format(p.is_alive()))
    print("p.exitcode: {}".format(p.exitcode))
    print("p.name :{}".format(p.name))
    print("p.pid :{}".format(p.pid))
    print("我是主进程")


    .......
    p = Myprocess(msg=10086)
    p.daemon = True
    p.start()
    # p.join()
    print("我是主进程")
2.3 Pool类

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。进程池设置最好等于CPU核心数量

构造方法:

Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

实例方法:

apply(func[, args[, kwds]]):同步进程池

apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池

close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

terminate() : 结束工作进程,不在处理未完成的任务

join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。pool.join()必须使用在

2.4 数据共享

进程各自持有一份数据,默认无法共享数据

2.5 Lock互斥锁

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突

进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理。

注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全

2.6 Semaphore

用来控制对共享资源的访问数量

2.7 Event

用于进程间通信,即程序中的其一个线程需要通过判断某个进程的状态来确定自己下一步的操作,就用到了event对象

event对象默认为假(Flase),即遇到event对象在等待就阻塞线程的执行

可以通过is_set()查看当前状态

2.8 Queue

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。Queue的一段示例代码:

2.9 Pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。close方法表示关闭管道,当消息接收结束以后,关闭管道。

2.10 subprocess三方模块

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

参考链接:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431927781401bb47ccf187b24c3b955157bb12c5882d000

https://www.cnblogs.com/kaituorensheng/p/4445418.html

https://www.cnblogs.com/UncleYong/p/6987112.html

https://www.cnblogs.com/haiyan123/p/7429568.html

https://www.cnblogs.com/lidagen/p/7252247.html

https://blog.51cto.com/286577399/2051155


2019/04/01 22:26

补充:多进程的map使用:

import multiprocessing
import requests
import time
import random


def get(_):
    time.sleep(random.randint(0, 5))
    url = "http://www.baidu.com"
    r = requests.get(url)
    print(r.status_code)


if __name__ == "__main__":
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    pool.map(get, range(20)) 

2019/04/10 22:58

上一篇 下一篇

猜你喜欢

热点阅读