生产者与消费者问题
在前一篇《实现进程互斥的几种方法》中最后提到了Peterson解法,还有硬件上的TSL解法,它们都是正确可用的方法,但是都有忙等待的缺点,浪费了CPU的资源。还有可能产生优先级反转的问题。
为了解决这些问题需要用到几条进程间的通信原语。例如sleep和wakeup。sleep将引起调用进程的阻塞,直到另外一个进程把它唤醒。wakeup有一个参数,即要唤醒的进程。
生产者与消费者问题
生产者与消费者问题也被称为有界缓冲区问题。两个进程共享一个固定大小的缓冲区。其中一个作为生产者,将信息放入缓冲区中;另外一个作为消费者,试图把缓冲区的信息取出来。
用C语言初步实现一个解决方案
#define N 100 //缓冲区的容量
int count = 0; //记录缓冲区中数据的项数
void producer(void)
{
int item;
while(TRUE){
item = produce_item(); //产生数据
if(count == N)sleep(); //缓冲区满的,就把生产者进程阻塞
insert_item(item); //把数据放进缓冲区中
count = count +1; //数据项数量加1
if(count == 1)wakeup(consumer) ; //count之前是0,说明消费者进程之前一定阻塞了自己
}
}
void consumer(void)
{
int item;
while(TRUE){
if(count == 0)sleep(); //如果缓冲区里没数据了,把消费者进程阻塞
item = remove_item(); //从缓冲区中取出数据
count = count - 1; //数据项数量减1
if(count == N-1)wakeup(producer); //count之前是N,说明生产者进程之前一定阻塞了自己
consume_item(item); //打印数据项
}
}
这种方法实现的解决方案由于对count的访问未加限制,存在竞争的问题,这里我们可以把count理解成一个锁变量,在count为0的时候,生产者进程与消费者进程在访问count时依然存在竞争,可能存在生产者发送唤醒请求时,消费者并没有沉睡,造成消费者沉睡后不能被唤醒,最终两个进程都被阻塞。
改进的方法:添加一个唤醒等待位,即在进程收到wakeup信号时将唤醒等待位置1,将wakeup信号记录下来。但是这种改进并没有从根本上解决问题。有可能存在唤醒等待位不够用的情况。
使用信号量解决生产者与消费者问题
首先明确信号量是什么:
信号量是一个变量(可以是整型变量),用来累计wakeup信号的次数。对信号量的操作一般有两种,down和up;对信号量执行down操作是检查信号量是否大于0,如果是,就将信号量减1,进程并不沉睡;如果为0,则进程沉睡(信号量如果不是整型的就可以为负数代表正在等待的进程)。up操作是给信号量的值加1,如果有一个或多个进程在信号量上休眠,此时信号量为0;up操作之后,有系统选择一个进程wakeup,此时信号量还是0。
对生产者与消费者问题
为了保证 检测信号量、修改信号量、sleep、wakeup是原子操作,操作系统需要在执行这些操作时短暂的关闭中断。
该解决方案需要三个信号量:
full记录缓冲区内已有的数据项数,初值为0
empty记录缓冲区中空的数据项数(即剩下的数据项数),初值为N。
mutex确保生产者和消费者不会同时访问缓冲区,初值为1确保只有一个进程可以进入缓冲区
#define N 100 //缓冲区的容量
typedef int semaphore; //整型信号量
semaphore mutex = 1; //控制对临界区的访问
semaphore empty = N; //记录缓冲区中空的数据槽数
semaphore full = 0; //记录缓冲区中满的数据槽数
void producer(void)
{
int item;
while(TRUE){
item = produce_item(); //产生数据项
down(&empty); //减少一个空槽
down(&mutex); //进入临界区,这里down操作会关闭中断
insert_item(item); //将数据插入缓冲区
up(&mutex);
up(&full);
}
}
void consumer(void)
{
int item;
while(TRUE){
down(&full);
down(&mutex);
item = remove_item();
up(&mutex);
up(&empty);
}
}
用消息传递解决生产者-消费者问题
使用两条原语send(destination, &message);
和receive(source, &message);
,前一个调用向一个给定的目标发送一条消息,后一个调用从给定的源接收一条信息。
#define N 100 //缓冲区的容量
void producer(void)
{
int item;
message m;
while(TRUE){
item = produce_item(); //产生数据项
receive(consumer, &m);
build_message(&m, item);
send(consumer, &m);
}
}
void consumer(void)
{
int item;
message m;
for(i = 0; i < N; i++)send(producer, &m);
while(TRUE){
receive(producer, &m);
item = extract_item(&m);
send(producer, &m);
consum
}
}