数据蛙数据分析每周作业

Python -- 进程、线程

2019-03-17  本文已影响2人  2023开始学

多进程

由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?
multiprocessing模块是跨平台的多进程模块
1、multiprocessing模块提供了一个Process类来创建一个实例代表一个进程对象
2、创建子进程,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork( )还要简单
3、join( )方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

from multiprocessing import Process  #从multiprocessing模块导入Process类
import  os

#子进程要执行的代码
def run_proc(name):
    print('Run child process: %s(%s)' %(name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' %os.getpid())
    p=Process(target=run_proc,args=('test',))  
#创建一个实例,也即创建一个进程,执行run_proc函数
    print('Child process will start.')
    p.start()
 #可以看出创建实例Process那一行的代码,并未返回任何结果,
#上一行代码返回了结果,要通过start()启动执行该进程
    p.join()
    print('Child process end.')

运行结果:


image.png

Pool 进程池

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
将Pool对象调用join( )方法会等待所有子进程执行完毕,调用join( )之前必须先调用close( ),调用close( )之后就不能继续添加新的process了

from multiprocessing import Pool
import os ,time, random

def long_time_task(name):
    print('Run task %s(%s)……' % (name, os.getpid()))
    start=time.time()
    time.sleep(random.random()*3)
    end=time.time()
    print('Task %s runs %0.2f seconds.' %(name,(end-start)))

if __name__=='__main__':
    print('Parent process %s.' %os.getpid())
    p=Pool(4)
    for i in range(5):
        p.apply_async(func=long_time_task,args=(i,))  #这种创建子进程后,不会立即执行
    print('Waiting for all subprocesses done……')
    p.close()
    p.join()
    print('All subprocesses done.')

运行结果:


image.png

进程间通信

进程指间肯定是需要通信的,Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
下面以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

time.sleep()

功能:推迟调用线程的运行,单位:秒,s

from multiprocessing import Process,Queue
import os ,time, random

#写数据进程执行的代码
def write(q):
    print('Process to write:%s' % os.getpid())
    for value in ['A','B','C']:
        print('Put %s to queue……' % value)
        q.put(value)
        time.sleep(random.random())


#读数据进程执行的代码
def read(q):
    print('Process to read :%s' % os.getpid())
    while True:
        value=q.get(True)
        print('Get %s from queue.' % value)


if __name__=='__main__':
    #父进程创建Queue,并传给各个子进程
    q=Queue()
    pw=Process(target=write,args=(q,))
    pr = Process(target=read, args=(q,))
    #启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    #等待pw结束
    pw.join()
    #pr进程里是死循环,无法等待其结束,只能强行终止
    pr.terminate()

运行结果:


image.png

由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

多线程

多任务可以由多进程完成,也可以由一个进程内的多线程完成
Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,对_thread进行了封装。threading是高级模块

    绝大多数情况下,只需要使用threading这个高级模块

启动一个线程的方式:

threading.Thread(target= , name= )

这里的Thread( )是 threading模块中的类,创建一个实例,并把函数传入进去,
然后调用start( )开始执行

任何进程都会默认启动一个线程,把该线程称为主线程,主线程又可以启动新的线程。
threading模块中的current_thread( )函数:返回当前线程的实例
主线程实例的名字为MainThread
子线程的名字在创建时指定,没有其他意义,如果不起名字Python就自动给线程命名为Thread-1,Thread-2……

import time, threading #这两个是模块

#新线程执行的代码
def loop():
    print('Thread %s is running.'% threading.current_thread().name)
    n=0
    while n<5:
        n=n+1
        print('Thread %s >>> %s' % (threading.current_thread().name,n))
        time.sleep(1)#推迟调用线程的运行,0,1,3之间会有很大的差别
    print('Thread %s  ended.' %threading.current_thread().name)


print('Thread %s is running.'% threading.current_thread().name)
t=threading.Thread(target=loop,name='LoopThread')   
#这里的name属性是用来对子线程进行命名
#Thread()这是threading模块中的类,传入一个函数并创建一个实例
t.start()
t.join()
print('the last line:')
print('Thread %s  ended.' %threading.current_thread().name)

运行结果:


image.png

Lock

由于锁Lock只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,因此其他线程只能等待,直到锁被释放后,获得该锁以后才能修改,所以,不会造成修改的冲突。

创建一个锁的方式:

threading.Lock()
import time, threading

balance=0

def change_it(n):
    global balance
    balance = balance + n
    balance = balance - n


def run_thread(n):
    for i in range(100000):
        change_it(n)



t1=threading.Thread(target=run_thread,args=(5,))
t2=threading.Thread(target=run_thread,args=(8,))

t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

运行结果:


image.png

修改 for i in range(100000):这句 range( )里的参数多加一个0

import time, threading

balance=0

def change_it(n):
    global balance
    balance = balance + n
    balance = balance - n


def run_thread(n):
    for i in range(100000):
        change_it(n)



t1=threading.Thread(target=run_thread,args=(5,))
t2=threading.Thread(target=run_thread,args=(8,))

t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

运行结果:


image.png

设置锁Lock( )之后,无论range( )参数有多大,都返回0

import time, threading

balance=0
lock=threading.Lock()

def change_it(n):
    global balance
    balance = balance + n
    balance = balance - n


def run_thread(n):
    for i in range(1000000):
        lock.acquire()
        try:
            change_it(n)
        finally:
            lock.release()



t1=threading.Thread(target=run_thread,args=(5,))
t2=threading.Thread(target=run_thread,args=(8,))

t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

运行结果:

image.png
参考:https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431927781401bb47ccf187b24c3b955157bb12c5882d000
上一篇 下一篇

猜你喜欢

热点阅读