我爱编程java技术分享

生产者-消费者模式实现

2018-03-26  本文已影响0人  BUG弄潮儿

生产者是指:生产数据的线程

消费者是指:使用数据的线程

生产者和消费者是不同的线程,他们处理数据的速度是不一样的,一般在二者之间还要加个“桥梁参与者”,用于缓冲二者之间处理数据的速度差。

下面用代码来说明:

//生产者 

public class MakerThread extends Thread {  

    private final Random random;  

    private final Table table;  

    private static int id = 0;   

    public MakerThread(String name, Table table, long seed) {  

        super(name);  

        this.table = table;//table就是桥梁参与者 

        this.random = new Random(seed);  

    }  

    public void run() {  

        try {  

            while (true) {  

                Thread.sleep(random.nextInt(1000));//生产数据要耗费时间 

//生产数据

                String cake = "[ Cake No." + nextId() + " by " + getName() + " ]";   

                table.put(cake);//将数据存入桥梁参与者 

            }  

        } catch (InterruptedException e) {  

        }  

    }  

    private static synchronized int nextId() {  

        return id++;  

    }  

}  

 再来看看消费者:

//消费者线程

public classEaterThread extends Thread {

    private final Random random;

    private final Table table;

    public EaterThread(String name, Table table,long seed) {

        super(name);

        this.table = table;

        this.random = new Random(seed);

    }

    public void run() {

        try {

            while (true) {

                String cake = table.take();//从桥梁参与者中取数据

               Thread.sleep(random.nextInt(1000));//消费者消费数据要花时间

            }

        } catch (InterruptedException e) {

        }

    }

}

看来在这个模式里table是个很重要的角色啊,让我们来看看他吧(这里只给出个简单的):

public class Table {  

    private final String[] buffer;  

    private int tail;  //下一个放put(数据)的地方    

    private int head;  //下一个取take(数据)的地方 

    private int count; // buffer内的数据数量 

    public Table(int count) {  

        this.buffer = new String[count];//总量是确定的 

        this.head = 0;  

        this.tail = 0;  

        this.count = 0;  

    }  

    // 放置数据 

    public synchronized void put(String cake) throws InterruptedException {  

        System.out.println(Thread.currentThread().getName() + " puts " + cake);  

        while (count >= buffer.length) {//数据放满了就只能等待 

            wait();  

        }  

        buffer[tail] = cake;  

        tail = (tail + 1) % buffer.length;  

        count++;  

        notifyAll();//有数据了,唤醒线程去取数据 

    }  

    // 取得数据 

    public synchronized String take() throws InterruptedException {  

        while (count <= 0) {//没有数据就只能等待 

            wait();  

        }  

        String cake = buffer[head];  

        head = (head + 1) % buffer.length;  

        count--;  

        notifyAll();//有位置可以放数据了,唤醒线程,不等了 

        System.out.println(Thread.currentThread().getName() + " takes " + cake);  

        return cake;  

    }  

}

 好了我们来实验吧:

public class Main {  

    public static void main(String[] args) {  

        Table table = new Table(3);     // 建立可以放置数据的桥梁参与者,3是他所能放置的最大数量的数据。

        new MakerThread("MakerThread-1", table, 31415).start();//生产数据 

        new MakerThread("MakerThread-2", table, 92653).start();  

        new MakerThread("MakerThread-3", table, 58979).start();  

        new EaterThread("EaterThread-1", table, 32384).start();//消费数据 

        new EaterThread("EaterThread-2", table, 62643).start();  

        new EaterThread("EaterThread-3", table, 38327).start();  

    }  

}  

上一篇下一篇

猜你喜欢

热点阅读