Python42_多任务之进程

2019-09-28  本文已影响0人  jxvl假装

进程

ps:创建子进程时,会将父进程的代码拷贝一份,但是这个拷贝是“写时拷贝”,即,只有当子进程对数据进程修改时才会单独拷贝一份,不然就是共用一份代码

进程是拥有资源的最小单位,进程中任务的执行也是通过线程完成的

fork创建子进程(Linux)

for创建新进程

import os
os.fork()
print("haha")
#会发现输出两次“haah“

os.fork()有两个返回值

import os
ret = os.fork() #ps:因为os模块与系统相关,所以在windows中没有fork函数,与Linux不同
#当程序执行到上面一句代码时会创建一个新的进程,这个新的进程也从fork处往下开始走,主进程执行print(2),即返回值大于0(但是每次执行结果可能不一样,为所创建子进程的pid),子进程中的ret==0,执行print(1)
if ret == 0:
    print(1)
else:
    print(2)
#会发现输出结果为2和1,但是也可能输出1和2,(先后顺序问题,这个是由操作系统调度算法决定的)
#os中,os.getpid()可以获得当前进程的pid,os.getppid()可以获得当前进程父进程的pid(第一个p为parent)

父子进程调度的先后顺序不确定,哪怕是父进程结束后,子进程还是可以继续执行

全局变量在多进程中不共享

import os
import time

g_num = 100

ret = os.fork()
if ret == 0:
    print("---process 1---")
    g_num += 1
    print("---process 1 g_num = %d---"%g_num)
else:
    time.sleep(3)   #保证子进程先执行
    print("---process 2---")
    print("---process 2 g_num = %d---"%g_num)   #会发现g_num还是100,即全局变量在多进程中不共享

多次fork

case1

import os
import time

ret = os.fork()
if ret == 0:
    #子进程
    print("--1--")
else:
    #父进程
    pirnt("--2--")

#父子进程
ret = os.fork()
if ret == 0:
    #孙子进程(上面子进程的子进程)
    #2儿子(父进程又创建的子进程)
    print("--11--")
else:
    #子进程
    #父进程
    pirnt("--22--")
#运行结果:1和2各输出1次,11和22各自输出两次

case2

import os
import time

ret = os.fork()
if ret == 0:
    #子进程
    print("--1--")
else:
    #父进程
    pirnt("--2--")
    ret = os.fork()
    if ret == 0:
        #2儿子(父进程又创建的子进程)
        print("--11--")
    else:
        #父进程
        pirnt("--22--")

ps:fork炸弹

import os
while True:
    os.fork()

winodws上的多进程:multiprocessing

利用multiprocessing中提供的Process类实现,达到跨平台的多进程效果

multiprocessing:对于windows和Linux,其创建子进程的方法不一样(如:linux是用fork,而windows上不是),所以python提供了multiprocssing,根据安装环境的不同,其会调用对应的系统的方法进程子进程的创建

caution:用Process创建子进程,主进程要等所有的子进程结束后才结束,这与fork是不同的

import multiprocessing  #负责多进程的模块
import time

def run(name):
    print("task: %s"%name)
    time.sleep(2)
def main():
    p1 = multiprocessing.Process(target=run, args=("P1",))
    p2 = multiprocessing.Process(target=run, args=("P2",))
    p1.start()  #子进程开始执行
    p2.start()
    print(p1.pid, p2.pid)
    print(p1.name, p2.name)
    p1.join()   #等待子进程p1的结束后再往下走:阻塞,可加参数:timeout,超时时间(等待的最长时间),即如果时间在timeout内如果还没有结束,仍然继续向下结束
    p2.join()
    print("finished")

if __name__ == "__main__":  #因为启动一个进程的时候,会再次调用此文件,所以:如果不用此方法调用,则会不断反复调用此模块
    main()

进程的创建:Process([group[, target[, name[, args[, kwargs]]]]])

Process类的常用方法

Process常用属性

进程的创建-Process子类

创建新的进程还能够使用类的方法,可以自定义一个类,继承Process类,每次实例化这个类的时候,就等同于实例化一个进程对象

from multiprocessing import Process
import os,time
class Process_Class(Process):
    #因为Process类本省有__init__方法,这个子类相当于重写了这个方法
    #但这样就会带来一些问题,我们并没有完全初始化一个Process类,所以就不能使用从这个类继承的一些方法
    #所以需要调用父类的init方法
    def __init__(self, interval):
        super().__init__()
        self.interval = interval

    def run(self):  #进程要执行的部分(由start自动调用)
        #重写了Process类的run方法
        print("子进程(%s)开始执行,父进程(%s)"%(os.getpid(), os.getppid()))
        t_start = time.time()
        time.sleep((self.interval))
        t_stop = time.time()
        print("(%s)执行结束,耗时%0.2f秒"%(os.getppid(), t_stop - t_start))

if __name__ == "__main__":
    t_start = time.time()
    print("当前程序进程(%s)"%os.getpid())
    p1 = Process_Class(2)
    p1.start()  #进程开始执行,start会自动调用run方法
    p1.join()
    t_stop = time.time()
    print("(%s)执行结束,耗时%0.2f秒" % (os.getppid(), t_stop - t_start))

进程池

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing中的Pool方法

池的作用:起到一个缓冲的作用。进程池:先创建一堆进程在那里放着,而不用等到用的时候再去创建

ps:进程池中进程执行如果发生了异常,不会有提示

from multiprocessing import Pool
import os, time, random

def worker(msg):
    t_start = time.time()
    print("%s开始执行,进程号为%d" % (msg,os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random()*2)
    t_stop = time.time()
    print(msg,"执行完毕,耗时%0.2f" % (t_stop-t_start))

if __name__ == "__main__":
    po = Pool(3)  # 定义一个进程池,最大进程数3
    print("----start----")
    for i in range(0,10):
        po.apply_async(worker,(i,))  # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))。apply_async:非阻塞方式(异步)
    po.close()  # 关闭进程池,关闭后po不再接收新的请求
    po.join()  # 等待po中所有子进程执行完成,再执行下面的代码,可以设置超时时间join(timeout=)
    print("-----end-----")
'''运行结果如下:
----start----
0开始执行,进程号为15312
1开始执行,进程号为2020
2开始执行,进程号为3148
1 执行完毕,耗时1.62
3开始执行,进程号为2020
2 执行完毕,耗时1.63
4开始执行,进程号为3148
0 执行完毕,耗时1.84
5开始执行,进程号为15312
4 执行完毕,耗时0.37
6开始执行,进程号为3148
5 执行完毕,耗时0.56
7开始执行,进程号为15312
7 执行完毕,耗时0.13
8开始执行,进程号为15312
8 执行完毕,耗时0.02
9开始执行,进程号为15312
9 执行完毕,耗时0.27
6 执行完毕,耗时1.12
3 执行完毕,耗时1.78
-----end-----
''' 

多种创建进程方式的比较

# 方式一:不建议使用(太底层)
ret = os.fork()
if ret == 0:
    #子进程
else:
    #父进程

# 方式二:主进程不会在子进程之前结束,所以可以用主进程做一些其他的事情
p1 = Process(target = xxx)
p1.start()

# 方式三:主进程一般用来等待(因为主进程可能在子进程之前结束,所以需要用join),真正的任务都在子进程中执行
pool = Pool(3)  #进程池中的任务不是越多越好,因为任务数越多,意味着轮一圈的周长越长。根据硬件、系统版本等,得到压力测试的值才合理
pool.apply_async(xxx)

apply阻塞式添加任务

基本不用

需要等到上一个任务执行完毕之后才会继续运行,往池里添加新任务,但是如果这样的话,就不能让多个进程一起执行

from multiprocessing import Pool
import os, time, random

def worker(msg):
    t_start = time.time()
    print("%s开始执行,进程号为%d" % (msg,os.getpid()))
    # random.random()随机生成0~1之间的浮点数
    time.sleep(random.random()*2)
    t_stop = time.time()
    print(msg,"执行完毕,耗时%0.2f" % (t_stop-t_start))

if __name__ == "__main__":
    po = Pool(3)  # 定义一个进程池,最大进程数3
    print("----start----")
    for i in range(0,10):
        po.apply(worker,(i,))  # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))。apply_async:非阻塞方式(异步),当一个进程执行完毕之后才继续往里面添加任务
    po.close()
    po.join() join(timeout=)
    print("-----end-----")
'''#运行结果如下
----start----
0开始执行,进程号为16136
0 执行完毕,耗时0.38
1开始执行,进程号为10228
1 执行完毕,耗时1.02
2开始执行,进程号为2536
2 执行完毕,耗时0.89
3开始执行,进程号为16136
3 执行完毕,耗时1.69
4开始执行,进程号为10228
4 执行完毕,耗时0.56
5开始执行,进程号为2536
5 执行完毕,耗时1.57
6开始执行,进程号为16136
6 执行完毕,耗时1.29
7开始执行,进程号为10228
7 执行完毕,耗时0.44
8开始执行,进程号为2536
8 执行完毕,耗时1.33
9开始执行,进程号为16136
9 执行完毕,耗时1.79
-----end-----
'''

进程间的通信:Queue

queue(队列):先进先出
栈:先进后出

常用方法:

from multiprocessing import Queue

q = Queue(3)    #创建一个队列对象,容量的大小为3,可以不指定大小,则可以往添加任意多的数据
q.qsize()   #获得当前队列中的数据数量
q.put("xxx")    #往队列里面放数据,如果队列已满,默认阻塞。可以往队列里面添加任何形式的数据,哪怕是一个对象也可以
q.get() #从队列里面取数据,如果队列已空,默认阻塞
q.empty()   #判断队列是否已空
q.full()    #判断队列是否已满
q.get_nowait()  #如果队列已空,不阻塞,但是已异常的方式提示队列已空
q.put_nowait()  #如果队列已满,不阻塞,但是以异常的方式提示

实例:

from multiprocessing import Process,Queue
import os, time,random

def write(q):
    for value in ["A","B","C","D"]:
        print("Put {} to queue".format(value))
        q.put(value)
        time.sleep(random.random())

def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print("Get {} from queue".format(value))
            time.sleep(random.random())
        else:
            break

if __name__ == "__main__":
    #父进程创建Queeu,并传递给各个子进程
    q = Queue() #不限制大小
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))

    pw.start()   #启动子进程,写入
    pw.join()   #等待子进程结束

    pr.start()  #启动pr子进程,读出
    pr.join()

'''运行结果如下:
Put A to queue
Put B to queue
Put C to queue
Put D to queue
Get A from queue
Get B from queue
Get C from queue
Get D from queue
'''

进程池中的Queue

直接从multiprocessing里面导入的Queue用于直接用Process创建的子进程间的通信,如果是进程池里面进程的通信,则用multiprocessing里面的Manager().Queue(),两种方式得到的队列的使用方式完全一样

import os
# 进程池中的Queeu
from multiprocessing import Manager, Pool


def reader(q):
    print("reader启动{},父进程为{}".format(os.getpid(), os.getppid()))
    for _ in range(q.qsize()):
        print("reader从Queue获取信息:{}".format(q.get()))


def writer(q):
    print("writer启动{},父进程为{}".format(os.getpid(), os.getppid()))
    for i in "Iterable":
        q.put(i)


if __name__ == "__main__":
    print("{} start".format(os.getpid()))
    q = Manager().Queue()  # 用manager里面的queue来完成进程池里面进程的通信
    pl = Pool()
    pl.apply(writer, (q,))  # 阻塞式添加进程,以让进程先往队列里面添加,再取出
    pl.apply(reader, (q,))
    pl.close()  # 关闭进程池,禁止再往里面添加任务
    pl.join()  # 等待进程结束
    print("{} end".format(os.getpid()))

'''运行结果如下:
9932 start
writer启动15896,父进程为9932
reader启动3432,父进程为9932
reader从Queue获取信息:I
reader从Queue获取信息:t
reader从Queue获取信息:e
reader从Queue获取信息:r
reader从Queue获取信息:a
reader从Queue获取信息:b
reader从Queue获取信息:l
reader从Queue获取信息:e
9932 end
'''

多进程的应用:文件拷贝

import os
from multiprocessing import Process


def copyFileTask(old_file_name, new_file_name):
    """
    仅负责将旧文件复制到新文件
    :param old_file_name: 旧文件名,包括路径
    :param new_file_path: 新文件名,包括路径
    :return: None
    """

    print("复制文件{}中...".format(old_file_name))
    with open(old_file_name, "rb+") as fr:
        content = fr.read()
    with open(new_file_name, "wb+") as fw:
        fw.write(content)
    print("复制文件{}完成!".format(old_file_name))


def copyFolderTask(old_folder_name, new_folder_name):
    """
    负责将旧文件夹内的东西复制到新文件夹
    :param old_folder_name: 旧文件夹路径
    :param new_folder_name: 新文件夹路径
    :return: None
    """
    if not os.path.exists(new_folder_name):
        print("目标文件夹不存在,创建中...")
        os.mkdir(new_folder_name)
        print("目标文件夹创建完成!")
    files = os.listdir(old_folder_name)

    for file in files:
        old_file = os.path.join(old_folder_name, file)
        new_file = os.path.join(new_folder_name, file)


        if os.path.isfile(old_file):
            print(file)
            pro = Process(target=copyFileTask, args=(old_file, new_file))   #为了不在主进程中join,使用Process而不是进程池
            pro.start()
        elif os.path.isdir(old_file):
            copyFolderTask(old_file, new_file)


if __name__ == "__main__":
    #注意:以下路径均为绝对路径
    old_folder_path = input("请输入要拷贝文件夹路径:")
    new_folder_path = input("请输入新文件夹路径:")

    if not os.path.exists(old_folder_path):
        print("目标文件夹不存在,请重试")
        exit()
    else:
        copyFolderTask(old_folder_path, new_folder_path)

ps:print("\r要输出的内容")在行首输出内容(同时会删除本行已有内容),如果要对以上复制文件的程序改进使其显示进度,可以用到

上一篇下一篇

猜你喜欢

热点阅读