进程,协程

2018-04-26  本文已影响10人  两分与桥

进程跟线程调用的基本是同一套 api,使用方法跟线程几乎是相同的

import multiprocessing
import time

def a(title):
    print(title,time.ctime())
    time.sleep(2)
    print(title, 'end')

if __name__ == '__main__':
    process = multiprocessing.Process(target=a,args=('python',))
    process.start()
    process.join()
    print('ending -----------------',time.ctime())

输出结果:
python Thu Apr 26 16:43:57 2018
python end
ending ----------------- Thu Apr 26 16:43:59 2018

自定义的类继承 multiprocessing.Process,

import multiprocessing
import time

class MyProcess(multiprocessing.Process):
    def __init__(self,title):
        multiprocessing.Process.__init__(self)
        self.title = title

    def run(self):
        print('begin to run')
        time.sleep(2)
        print('hello', self.title, self.name, time.ctime())

if __name__ == '__main__':
    process = MyProcess('world')
    process.start()
    process.join()
    print('ending -----------------',time.ctime())

输出结果:
begin to run
hello world MyProcess-1 Thu Apr 26 16:53:50 2018
ending ----------------- Thu Apr 26 16:53:50 2018

关于进程的子进程和父进程

import multiprocessing
import os
import time

def info(title):
    print("title:", title)
    print('parent process:', os.getppid()) # 获得这个进程的父进程的 pid
    print('process id:', os.getpid()) # 获得这个进程的 pid

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main process line')
    time.sleep(1)
    print("------------------")
    p = multiprocessing.Process(target=info, args=('hello world',))
    p.start()
    p.join()

输出结果:
title: main process line
parent process: 6164 # 这个进程的父进程也就是 pycharm了(我用pycharm运行程序的)
process id: 6004 
------------------
title: hello world
parent process: 6004 # 子进程的父进程也就是 6004 了
process id: 12220 # 子进程

进程间通信,用进程队列,Queue, 主要的是数据同步,数据用复制的形式传到另一个进程,

import multiprocessing
import time

def aa(q):
    print('begin')
    time.sleep(2)
    q.put(123)
    q.put('hello')
    print('son end, ',time.ctime())


if __name__ == '__main__':
    q = multiprocessing.Queue() # 进程队列
    process = multiprocessing.Process(target=aa,args=(q,))
    process.start()

    print(q.get())
    print(q.get())
    print('main end')

输出结果:
begin
son end,  Thu Apr 26 20:07:17 2018
123
hello
main end

管道通信

import multiprocessing
import time

def aa(conn):
    print('son id  = ',id(conn))
    time.sleep(1)
    conn.send('hello')
    print(conn.recv())

if __name__ == '__main__':
    p_conn, c_conn = multiprocessing.Pipe() # 双向管道
    process = multiprocessing.Process(target=aa,args=(c_conn,))

    process.start()
    print('main id  = ',id(p_conn))
    print(p_conn.recv())
    p_conn.send('libai')
    print('main end')

输出结果:
main id  =  1768407138600
son id  =  1942925877768
hello
main end
libai

Queue 和 Pipe 只是实现了数据交互, 并没实现数据共享,所谓的数据共享,即一个进程去更改另一个进程的数据。

import multiprocessing

def aa(l, i):
    l.append(i)

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        l = manager.list('hello')
        process_list = []
        for i in range(5):
            process = multiprocessing.Process(target=aa, args=(l,i))
            process.start()
            process_list.append(process)
        for i in process_list:
            i.join()
        print(l)

输出结果:
['h', 'e', 'l', 'l', 'o', 0, 1, 2, 3, 4]
# 实现数据交互

进程同步,类似于线程内锁的概念

import multiprocessing
import time
def aa(lock, l,i):
    lock.acquire()
    time.sleep(1)
    print('hello')
    l.append(i)
    lock.release()

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    l = multiprocessing.Manager().list()
    process_list = []
    for i in range(10):
        process = multiprocessing.Process(target=aa,args=(lock,l,i))
        process.start()
        process_list.append(process)
    for p in process_list:
        p.join()
    print(l)

输出结果:
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
[0, 2, 1, 3, 4, 5, 6, 7, 8, 9]

进程池, Pool

import multiprocessing
import time
def aa(i):
    time.sleep(1)
    print('hello',i)

if __name__ == '__main__':
    pool = multiprocessing.Pool(5) # 不填参数就会以 CPU 核数作为 max
    for i in range(10):
        # pool.apply(func=aa, args=(i,)) # 同步接口,就是串行执行
        pool.apply_async(func=aa, args=(i,)) # 异步接口,并发或并行
  
    pool.close()
    pool.join() # close 和 join 顺序是固定的

输出结果:
hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9

回调函数,callback() ,子进程中有时候并不能并行的执行某些操作,例如操作文件打印日志和修改数据,可以交给 callback() ,让主进程串行执行

import multiprocessing
import time
import os
def aa(i):
    time.sleep(1)
    print('hello',i, os.getpid())

def Bar(arg):
    print('bar',os.getpid())

if __name__ == '__main__':
    pool = multiprocessing.Pool(5)
    print('main ',os.getpid())
    for i in range(10):
        # 回调函数: 就是某个动作或者函数执行成功后再去执行的函数
        # 回调函数是在主进程中调用的,而不是子进程
        pool.apply_async(func=aa, args=(i,),callback=Bar)

    pool.close()
    pool.join()

输出结果:
main  11048
hello 0 1228
bar 11048
hello 1 9348
bar 11048
hello 2 10280
bar 11048
hello 3 8768
bar 11048
hello 4 8544
bar 11048
hello 5 1228
bar 11048
hello 6 9348
bar 11048
hello 7 10280
bar 11048
hello 8 8768
bar 11048
hello 9 8544
bar 11048
# 可以看出,回调函数的 bar 是在主进程中运行的

回调函数,获取返回值

import multiprocessing
import time
def aa(i):
    time.sleep(1)
    return i # 返回的值被 callback 函数 Bar 接收到,

def Bar(arg):
    print('bar get i = ',arg)

if __name__ == '__main__':
    pool = multiprocessing.Pool(5)
    for i in range(10):
        pool.apply_async(func=aa, args=(i,),callback=Bar)

    pool.close()
    pool.join()

输出结果:
bar get i =  0
bar get i =  1
bar get i =  2
bar get i =  3
bar get i =  4
bar get i =  5
bar get i =  6
bar get i =  7
bar get i =  8
bar get i =  9

生成器,yield,每 next 一次,遇到 yield 就返回

def test():
    print('one')
    yield 1
    print('two')
    yield


t = test() # 返回一个生成器对象
print(t)
res1 = t.__next__() # 执行到yield 1 停止
print(res1)
print('------------------------------------------------------')
res2 = next(t) #从上次执行到的 yield 1 后面继续执行, 但是并没有设定返回值
print(res2)

输出结果:
<generator object test at 0x00000202ECBDD6D0>
one
1
------------------------------------------------------
two
None

生成器,yield 传值

def test():
    print('one')
    s = yield 1
    print(s)
    print('two')
    yield

t = test()
res1 = t.__next__()
print(res1)
print('------------------------------------------------------')
t.send(150) # send 相当于一个 next

输出结果:
one
1
------------------------------------------------------
150
two

协程,协作式,非抢占式的程序,关键在 yield,主要是什么时候切换,解决的也是 IO 操作
协程本质上也就是一个线程
优势:

  1. 没有切换的消耗
  2. 没有锁的概念
  3. 要使用多核,可以和多进程配合
import time

def getSend(name):
    while True:
        s = yield
        time.sleep(1)
        print('[%s] get the send is %s, time %s'%(name, s, time.ctime()))

if __name__ == '__main__':
    s1 = getSend('one')
    s2 = getSend('two')
    s1.__next__()
    s2.__next__()
    n = 0
    while n < 4:
        time.sleep(3)
        s1.send(n)
        s2.send(n+1)
        n = n + 2
    print('ending')

输出结果:
[one] get the send is 0, time Mon Apr 30 19:29:29 2018
[two] get the send is 1, time Mon Apr 30 19:29:30 2018
[one] get the send is 2, time Mon Apr 30 19:29:34 2018
[two] get the send is 3, time Mon Apr 30 19:29:35 2018
ending

greenlet 模块, 简单但不是很实用

import greenlet

def test1():
    print('12')
    gr2.switch()
    print('56')
    gr2.switch()

def test2():
    print('34')
    gr1.switch()
    print('78')

gr1 = greenlet.greenlet(test1)
gr2 = greenlet.greenlet(test2)

gr1.switch()

输出结果:
12
34
56
78

gevent 模块, 遇到 IO 就切换

import time
import requests
import gevent

start = time.time()
url_list = ['https://www.baidu.com/',
            'http://www.sina.com.cn/',
            'https://www.jianshu.com/'
            ]
def req(url):
    requests.get(url)
    print('get url',url)

# for i in url_list:
#     req(i)

gevent.joinall([
    gevent.spawn(req, url_list[0]),
    gevent.spawn(req, url_list[1]),
    gevent.spawn(req, url_list[2]),
])

print('cost time : ', time.time()-start)

输出结果:
get url https://www.baidu.com/
get url http://www.sina.com.cn/
get url https://www.jianshu.com/
cost time :  0.6288654804229736
# 和串行执行的结果消耗时间相差不多。

更具体的可以看:https://www.cnblogs.com/yuanchenqi/articles/6248025.html

上一篇下一篇

猜你喜欢

热点阅读