并发通信

2018-09-19  本文已影响0人  Python野路子

多进程之间通信的限制

a = 1  #定义全局变量
def func():
    global a
    a=2     #修改全局变量值
    print(a)

func()
print(a)

运行结果:


image.png

再看利用进程运行的例子:

import multiprocessing

a = 1  #定义全局变量

def func():
    global a
    a=2     #修改全局变量值
    print(a)

process = multiprocessing.Process(target=func)
process.start()
process.join() #等待子进程执行完再继续执行
print(a)
image.png

通过上面2个例子运行结果分析:
按通常应该都是2,应该修改了全局变量值,但是这里只有子进程是2,主进程是1。
这是因为进程之间是独立的,互不干扰的内存空间,故子进程修改的,不影响主进程的。

进程间通信的解决方案

image.png
print('--------------进程间通信的解决方案--------------')

manager = multiprocessing.Manager()  #创建一个服务器进程,并返回与其通信的管理器
list_proxy = manager.list()  #通过管理器在服务器进程中开辟一个列表空间,并返回一个代理
print(list_proxy)   #用法和list一样


def func2(list):
    list.append('a')
    print(list)

#把代理传给子进程,子进程里就可以通过这个代理,来操作共享空间来进行通信
process2 = multiprocessing.Process(target=func2, args=(list_proxy,))
process2.start()
process2.join() #等待子进程执行完再继续执行
print(list_proxy)
运行结果: image.png

多线程之间通信的限制

注意:因为线程属于同一个进程,因此它们之间共享内存区域,因此全局变量是公共的。

import threading

a = 1
def func3():
    global a
    a = 2
    print(a)
thread = threading.Thread(target=func3)
thread.start()
thread.join()
print(a)
运行结果: image.png

但是多线程间共享内存间存在竞争问题。

print('--------------多线程共享内存间存在竞争问题--------------')
import threading

data = 0
n = 100000

def add(n):
    global data
    for i in range(n):
        data +=i

def sub(n):
    global data

    for i in range(n):
        data -=i

t_add = threading.Thread(target=add, args=(n,))
t_sub = threading.Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()   #这2个地方加join阻塞目的是为了让子进程执行完,最后能在主进程看到data,所以用join来阻塞

print(data)
image.png
加了n次减了n次,结果却为负数,按正常应该为0。
使用锁来控制共享资源的访问。
print('--------------使用锁来控制共享资源的访问--------------')

import threading

data = 0
n = 1000000

lock = threading.Lock() #生成一把锁

def add(n):
    global data
    for i in range(n):
        # lock.acquire()   #加锁
        # data +=i
        # lock.release()   #释放锁
        #可以写生上下文格式
        with lock:
            data +=i

def sub(n):
    global data

    for i in range(n):
        # lock.acquire()   #加锁
        # data -=i
        # lock.release()   #释放锁
        with lock:
            data -=i

t_add = threading.Thread(target=add, args=(n,))
t_sub = threading.Thread(target=sub, args=(n,))
t_add.start()
t_sub.start()
t_add.join()
t_sub.join()   #这2个地方加join阻塞目的是为了让子进程执行完,最后能在主进程看到data,所以用join来阻塞

print(data)
运行结果: image.png

这样才达到目的,就像去银行存钱取钱,存取不多不少!

线程与进程的安全队列

队列:先进先出,一个入口,一个出口。 image.png

进程比线程少了task_done()和 join()方法。

生产者和消费者模型

所谓,生产者与消费者模型,本质上是把进程通信的问题分开考虑生产只需要往队列里面丢东西(生产者不需要关心消费者)消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)。


image.png
image.png

线程实现生产者-消费者模型


print('--------------生产者与消费者模型--------------')
'''
所谓,生产者与消费者模型,本质上是把进程通信的问题分开考虑
生产者,只需要往队列里面丢东西(生产者不需要关心消费者)
消费者,只需要从队列里面拿东西(消费者也不需要关心生产者)
'''

print('--------------多线程的消费者与生产者模式--------------')
'''
生产者:没满,则生产,只关心队列是否已满。满了就阻塞。
消费者:只关心队列是否为空。不为空,则消费,为空则阻塞。

'''
import threading
import queue
import random
import time

class Producer(threading.Thread):  #生产者
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        while True:
            item = random.randint(0, 10) #创建0~99
            #只要队列没满,就向队列中添加数据
            self.queue.put(item)
            print('生产者-->生产:%s'%item)
            time.sleep(1)

class Customer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        while True:
            #只要队列不为空,就从队列中取数据
            itme = self.queue.get()
            print('消费者-->消费:%s'%itme)
            time.sleep(1)

q =queue.Queue(5)  #长度为5
producer = Producer(q)
custormer = Customer(q)
producer.start()
custormer.start()
producer.join()
运行结果: image.png

进程实现生产者-消费者模型


import multiprocessing
import queue
import random
import time

class Producer(multiprocessing.Process):  #生产者
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        while True:
            item = random.randint(0, 10) #创建0~99
            #只要队列没满,就向队列中添加数据
            self.queue.put(item)
            print('生产者-->生产:%s'%item)
            time.sleep(1)

class Customer(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        while True:
            #只要队列不为空,就从队列中取数据
            itme = self.queue.get()
            print('消费者-->消费:%s'%itme)
            time.sleep(1)

manager = multiprocessing.Manager()  #创建一个服务器进程,并返回与其通信的管理器

q =manager.Queue(5)  #长度为5
producer = Producer(q)
custormer = Customer(q)
producer.start()
custormer.start()
producer.join()
运行结果: image.png
上一篇 下一篇

猜你喜欢

热点阅读