散文

python多进程与多线程、互斥锁与信号量

2021-12-05  本文已影响0人  Cache_wood

@[toc]

并行与并发

进程与线程

多进程

在Windows中Process()必须放到if __name__ == '__main__'

<font color='blue'>eg: 通过Process实现

from multiprocessing import Process
print('main process start')
def run():
    pass
if __name__=='__main__':
    p=Process(target=run)#recursive
    p.start()
main process start
main process start

<font color='blue'>eg: 通过继承Process实现

import time
import random
from multiprocessing import Process

n=0

def task(name):
    print("task of {} starts".format(name))
    time.sleep(random.randrange(50,100))
    print("task of {} ends".format(name))

class Task(Process):
    def __init__(self,name):
        super().__init__()#不能忘了
        self._name=name

    @property
    def name(self):
        return self._name

    def run(self):#不能忘了
        print('task {} starts with pid {}...'.format(self.name,self.pid))
        global n
        n=random.random()
        print("n={} in process {}".format(n,self.name))
        #mem
        l=list(range(int(n*100000000)))#观察内存的不同
        time.sleep(random.randrange(20,50))
        print('task {} ends with pid {}...'.format(self.name,self.pid))

if __name__=='__main__':
    '''for i in range(0,4):
        p=Process(target=task,args=('task {}'.format(i),))
        p.start()
        print('pid:{}'.format(p.pid))'''
    plist=[]
    for i in range(0,4):
        p=Task('process-{}'.format(i+1))
        plist.append(p) 
    for p in plist:
        p.start()
    for p in plist:
        #注意一定是先子进程都启动后,再一一join,否则启动后马上join会变成“串行”
        #1. 是的,这样写join仍然会卡着等p1运行结束,但其他进程如p2, p3等仍在运行,等p1运行结束后,循环继续,p2,p3等可能也运行结束了,会迅速完成join的检验
        #2. join花费的总时间仍然是耗费时间最长的那个进程运行的时间,这样跟我们的目的是一致的。
        pass
        p.join()
    print('main')
    print("n={} in main".format(n))

首先用start函数启动所有的子进程,之后重新join所有的子进程,这样可能p1会等待运行结果但不影响其他进程的运行。

如果在start函数的后面直接join,在p1的时候所有的进程都被卡住,相当于“串行”

task process-1 starts with pid 13044...
n=0.17599680720372746 in process process-1
task process-2 starts with pid 3368...
n=0.2165234151520603 in process process-2
task process-3 starts with pid 11588...
task process-4 starts with pid 14920...
n=0.8302889804652497 in process process-3
n=0.7734786605773896 in process process-4
task process-4 ends with pid 14920...
task process-1 ends with pid 13044...
task process-2 ends with pid 3368...
task process-3 ends with pid 11588...
main
n=0 in main

关于join的使用

• 先在主进程中启动所有子进程
• 然后在主进程中对所有子进程进行join

同步和异步

临界区

多进程

互斥锁Lock
from multiprocessing import Process
from multiprocessing import Lock
import json
import random
import time

tf='ticks.json'

def info():
    ticks=json.load(open(tf))
    print("ticks from {} to {} left {}".format(ticks['origin'],ticks['dest'],ticks['count']))

def buy(pname):
    ticks=json.load(open(tf))
    time.sleep(random.random())
    if ticks['count']>0:
        ticks['count']-=1
        time.sleep(random.random())
        json.dump(ticks,open(tf,'w'))
        print('{} buy one tick from {} to {}!'.format(pname,ticks['origin'],ticks['dest']))
    else:
        print('oh....no ticks :(')

def task(pname):
    info()
    buy(pname)

#加锁
def lock_task(name,lock):
    lock.acquire()
    try:
        info()
        buy(name)
    except:
        raise
    finally:
        lock.release()

if __name__=='__main__':
    lock=Lock()
    clients=[]
    for i in range(20):
        name='client-{}'.format(i+1)
        #p=Process(target=task,name=name,args=(name,))
        p=Process(target=lock_task,name=name,args=(name,lock))
        clients.append(p)
    for p in clients:
        p.start()
    for p in clients:
        p.join()
    print("all clients finished...")

加锁之后同一时间只能有一个进程进行连接,当该进程连接结束之后其他进程才能继续连接。

前10个人买到票,后10个人没有票。同一时间只允许一个人买票。

ticks from bj to cq left 10
client-9 buy one tick from bj to cq!
ticks from bj to cq left 9
client-4 buy one tick from bj to cq!
ticks from bj to cq left 8
client-16 buy one tick from bj to cq!
ticks from bj to cq left 7
client-14 buy one tick from bj to cq!
ticks from bj to cq left 6
client-6 buy one tick from bj to cq!
ticks from bj to cq left 5
client-15 buy one tick from bj to cq!
ticks from bj to cq left 4
client-1 buy one tick from bj to cq!
ticks from bj to cq left 3
client-8 buy one tick from bj to cq!
ticks from bj to cq left 2
client-11 buy one tick from bj to cq!
ticks from bj to cq left 1
client-5 buy one tick from bj to cq!
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
all clients finished...

如果不设置互斥锁Lock,而使用在启动每个线程之后直接join的形式,也可以得到同样的效果。

from multiprocessing import Process
from multiprocessing import Lock
import json
import random
import time

tf='ticks.json'

def info():
    ticks=json.load(open(tf))
    print("ticks from {} to {} left {}".format(ticks['origin'],ticks['dest'],ticks['count']))

def buy(pname):
    ticks=json.load(open(tf))
    time.sleep(random.random())
    if ticks['count']>0:
        ticks['count']-=1
        time.sleep(random.random())
        json.dump(ticks,open(tf,'w'))
        print('{} buy one tick from {} to {}!'.format(pname,ticks['origin'],ticks['dest']))
    else:
        print('oh....no ticks :(')

def task(pname):
    info()
    buy(pname)

if __name__=='__main__':
    clients=[]
    for i in range(20):
        name='client-{}'.format(i+1)
        p=Process(target=task,name=name,args=(name,))
        #p=Process(target=lock_task,name=name,args=(name,lock))
        clients.append(p)
    for p in clients:
        p.start()
    #for p in clients:
        p.join()
    print("all clients finished...")
ticks from bj to cq left 10
client-1 buy one tick from bj to cq!
ticks from bj to cq left 9
client-2 buy one tick from bj to cq!
ticks from bj to cq left 8
client-3 buy one tick from bj to cq!
ticks from bj to cq left 7
client-4 buy one tick from bj to cq!
ticks from bj to cq left 6
client-5 buy one tick from bj to cq!
ticks from bj to cq left 5
client-6 buy one tick from bj to cq!
ticks from bj to cq left 4
client-7 buy one tick from bj to cq!
ticks from bj to cq left 3
client-8 buy one tick from bj to cq!
ticks from bj to cq left 2
client-9 buy one tick from bj to cq!
ticks from bj to cq left 1
client-10 buy one tick from bj to cq!
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
ticks from bj to cq left 0
oh....no ticks :(
all clients finished...
信号量 Semaphore
from multiprocessing import Process
from multiprocessing import Semaphore
from multiprocessing import current_process
import time
import random

def get_connections(s):
    s.acquire()
    try:
        print(current_process().name+' acqiure a connection')
        time.sleep(random.randint(1,2))
        print(current_process().name+' finishes its job and return the connection')
    except:
        raise
    finally:
        s.release()

if __name__=='__main__':
    connections=Semaphore(5)
    workers=[]
    for i in range(20):
        p=Process(target=get_connections,args=(connections,),name='worker:'+str(i+1))
        workers.append(p)
    for p in workers:
        p.start()
    for p in workers:
        p.join()
    print("all workers exit")

信号量设置为5,表示5个子线程可以连接,当有子线程退出之后,新的线程才可以继续连接。
比如刚开始3、1、4、2、11线程接入,当3完成工作之后释放,新的线程7进行连接。以此类推。

worker:3 acqiure a connection
worker:1 acqiure a connection
worker:4 acqiure a connection
worker:2 acqiure a connection
worker:11 acqiure a connection
worker:3 finishes its job and return the connection
worker:7 acqiure a connection
worker:1 finishes its job and return the connection
worker:12 acqiure a connection
worker:2 finishes its job and return the connection
worker:17 acqiure a connection
worker:11 finishes its job and return the connection
worker:8 acqiure a connection
worker:12 finishes its job and return the connection
worker:5 acqiure a connection
worker:4 finishes its job and return the connection
worker:10 acqiure a connection
worker:7 finishes its job and return the connection
worker:14 acqiure a connection
worker:5 finishes its job and return the connection
worker:15 acqiure a connection
worker:10 finishes its job and return the connection
worker:18 acqiure a connection
worker:17 finishes its job and return the connection
worker:9 acqiure a connection
worker:8 finishes its job and return the connection
worker:13 acqiure a connection
worker:15 finishes its job and return the connection
worker:16 acqiure a connection
worker:9 finishes its job and return the connection
worker:6 acqiure a connection
worker:13 finishes its job and return the connection
worker:19 acqiure a connection
worker:14 finishes its job and return the connection
worker:20 acqiure a connection
worker:16 finishes its job and return the connection
worker:18 finishes its job and return the connection
worker:6 finishes its job and return the connection
worker:19 finishes its job and return the connection
worker:20 finishes its job and return the connection
all workers exit

如果把start函数和join函数写在一起,那么执行情况实际上变为串行

worker:1 acqiure a connection
worker:1 finishes its job and return the connection
worker:2 acqiure a connection
worker:2 finishes its job and return the connection
worker:3 acqiure a connection
worker:3 finishes its job and return the connection
worker:4 acqiure a connection
worker:4 finishes its job and return the connection
worker:5 acqiure a connection
worker:5 finishes its job and return the connection
worker:6 acqiure a connection
worker:6 finishes its job and return the connection
worker:7 acqiure a connection
worker:7 finishes its job and return the connection
worker:8 acqiure a connection
worker:8 finishes its job and return the connection
worker:9 acqiure a connection
worker:9 finishes its job and return the connection
worker:10 acqiure a connection
worker:10 finishes its job and return the connection
worker:11 acqiure a connection
worker:11 finishes its job and return the connection
worker:12 acqiure a connection
worker:12 finishes its job and return the connection
worker:13 acqiure a connection
worker:13 finishes its job and return the connection
worker:14 acqiure a connection
worker:14 finishes its job and return the connection
worker:15 acqiure a connection
worker:15 finishes its job and return the connection
worker:16 acqiure a connection
worker:16 finishes its job and return the connection
worker:17 acqiure a connection
worker:17 finishes its job and return the connection
worker:18 acqiure a connection
worker:18 finishes its job and return the connection
worker:19 acqiure a connection
worker:19 finishes its job and return the connection
worker:20 acqiure a connection
worker:20 finishes its job and return the connection
all workers exit
事件
from multiprocessing import Process
from multiprocessing import Event
import time
import random

def car(event,name):
    while True:
        if event.is_set():
            time.sleep(random.random())
            print("car {} passes...".format(name))
            break
        else:
            print("car {} waits...".format(name))
            event.wait()#阻塞直至事件状态发生变化

def light(event):
    while True:
        if event.is_set():
            event.clear()
            print("红灯")
            time.sleep(random.random())
        else:
            event.set()
            print("绿灯")
            time.sleep(random.random())

if __name__=='__main__':
    event=Event()
    event.clear()
    l=Process(target=light,args=(event,))
    l.daemon=True
    l.start()
    cars=[]
    for i in range(10):
        c=Process(target=car,args=(event,'c_'+str(i+1)))
        cars.append(c)
    for c in cars:
        c.start()
    for c in cars:
        c.join()
    print("all cars passed...")

每次运行得到的结果不尽相同。

car c_1 waits...
car c_2 waits...
绿灯
car c_3 passes...
car c_1 passes...
car c_4 passes...
car c_7 passes...
car c_8 passes...
car c_6 passes...
car c_10 passes...
car c_5 passes...
红灯
car c_9 passes...
绿灯
car c_2 passes...
all cars passed...
生产者-消费者模型
from multiprocessing import Process, Queue
import time
import random
import os

def produce(q):
    time.sleep(random.random())
    q.put('car_by_{}'.format(os.getpid()))
    print("{} produces a car...".format(os.getpid()))

def buy(q):
    car=q.get()
    if car is None:
        print("no car. {} ends.".format(os.getpid()))
        return
    else:
        time.sleep(random.random())
        print("{} buy the car {}".format(os.getpid(),car))

if __name__=='__main__':
    q=Queue()
    procucers=[]
    consumers=[]
    for i in range(0,10):
        p=Process(target=produce,args=(q,))
        procucers.append(p)
    for i in range(0,50):
        c=Process(target=buy,args=(q,))
        consumers.append(c)
    for p in procucers:
        p.start()
    for p in procucers:
        p.join()
    for c in consumers:
        c.start()
    for c in consumers:
        q.put(None)#主进程发信号结束,但要给每一个consumer准备
    print('main')
14764 produces a car...
17672 produces a car...
8124 produces a car...
18736 produces a car...
17312 produces a car...
1460 produces a car...
6200 produces a car... 
18568 produces a car...
14964 produces a car...
18560 produces a car...
main
7740 buy the car car_by_14764
15340 buy the car car_by_14964
no car. 16420 ends.
no car. 8736 ends.
no car. 11540 ends.
15176 buy the car car_by_17672
13416 buy the car car_by_8124
no car. 1208 ends.
no car. 14524 ends.
no car. 8036 ends.
no car. 12192 ends.
no car. 10984 ends.
no car. 13016 ends.
7540 buy the car car_by_17312
10980 buy the car car_by_1460
no car. 12132 ends.
no car. 17976 ends.
no car. 12704 ends.
no car. 18572 ends.
20156 buy the car car_by_18568
no car. 17216 ends.
no car. 2492 ends.
no car. 17628 ends.
no car. 8396 ends.
no car. 13528 ends.
no car. 12440 ends.
16108 buy the car car_by_18736
no car. 16200 ends.
14096 buy the car car_by_6200
no car. 9296 ends.
no car. 2464 ends.
no car. 19956 ends.
13672 buy the car car_by_18560
no car. 11244 ends.
no car. 2832 ends.
no car. 5484 ends.
no car. 11108 ends.
no car. 1648 ends.
no car. 14492 ends.
no car. 11720 ends.
no car. 18240 ends.
no car. 18936 ends.
no car. 7676 ends.
no car. 5408 ends.
no car. 12232 ends.
no car. 18080 ends.
no car. 14404 ends.
no car. 15296 ends.
no car. 19800 ends.
no car. 4744 ends.
上一篇 下一篇

猜你喜欢

热点阅读