Java线程操作实现生产者与消费者模型

2020-05-25  本文已影响0人  撸码小狂魔
基本模型
class Message {  //描述公共空间
    private String title;
    private String content;

    public void setContent(String content) {
        this.content = content;
    }

    public void setTitle(String title) {
        this.title = title;
    }

    public String getContent() {
        return content;
    }

    public String getTitle() {
        return title;
    }

}

class ProducerThread implements Runnable {
    private final Message message;    //获得message的引用

    public ProducerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            this.message.setTitle("title: " + i);
            this.message.setContent("content: " + i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

class ConsumerThread implements Runnable {

    private final Message message;    //获得message的引用

    public ConsumerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run(){

        for (int i = 0; i < 100; i++) {
            System.out.println("消费者:" + this.message.getTitle() + this.message.getContent());

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

    }
}

public class Test {

    public static void main(String[] args) {

        Message message = new Message();        //实例化Message
        new Thread(
                new ProducerThread(message)
        ).start();                  //启动生产者线程

        new Thread(
                new ConsumerThread(message)
        ).start();                  // 启动消费者线程

    }

}

此时的代码仅仅是是实现了两个线程彼此独立的操作交互,但是通过最终的执行结果可以清楚的发现当前的程序代码之中存在如下的几个设计问题:

解决数据同步问题

在java的程序之中,如果想要实现数据的同步处理,那么肯定要使用synchronized来完成,如果要完成的话那么肯定就需要同步代码块或者是同步方法,之所以现在出现了不同步的问题,主要是没有在生产的过程里面对数据的操作进行锁定。


image.png

范例:修改程序的结构代码

class Message {  //描述公共空间
    private String title;
    private String content;


    public synchronized void set(String title,String content){
        this.content = content;
        this.title = title;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public synchronized  String get(){
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return  this.title + this.content;

    }


}

class ProducerThread implements Runnable {
    private final Message message;    //获得message的引用

    public ProducerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            this.message.set(
                    "title: " + i,"content: "+i
            );
        }
    }

}

class ConsumerThread implements Runnable {

    private final Message message;    //获得message的引用

    public ConsumerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run(){

        for (int i = 0; i < 100; i++) {
            System.out.println(this.message.get());
        }

    }
}

public class Test {

    public static void main(String[] args) {

        Message message = new Message();        //实例化Message
        new Thread(
                new ProducerThread(message)
        ).start();                  //启动生产者线程

        new Thread(
                new ConsumerThread(message)
        ).start();                  // 启动消费者线程

    }

}

通过当前的程序的改进可以发现此时的代码已经完全可以正确的实现相关的内容的配置,并且没有任何的数据错位,所以解决了数据混乱的问题,但是另外一个问题还没有解决:重复生产数据和重复消费数据。

解决线程重复操作问题

而要想实现这样的功能就必须采用线程的同步、等待与唤醒的处理机制,而这样的处理机制的操作全部定义在了Object类里面,在Object类中提供有如下的几个与线程有关的操作方法。

image.png

特别重要的提示:以上这几个处理方法是进行线程控制的,但是这些方法都是在synchronized方法中才去使用的,同时这些方法属于最原始的多线程协作控制。如果真的用这些操作进行开发,那么整个项目里面基本上对死锁的情况就会频发。

class Message {  //描述公共空间
    private String title;
    private String content;
    private boolean flag = true;    //设置一个标志位(红绿灯)

    //flag = true:表示可以生产,但是无法进行消费,
    //flag = false:表示可以消费,但是无法生产

    public synchronized void set(String title,String content){
        if (this.flag == false) {   //不允许生产,但是允许消费
            try {
                super.wait();   //等到消费者线程执行完毕后唤醒
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        this.content = content;
        this.title = title;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.flag = false;      //表示生产完成,可以消费
        super.notify();         //唤醒其它等待线程
    }

    public synchronized  String get(){

        if(this.flag == true){  //不允许消费,只允许生产
            try {
                super.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        this.flag = true;      //表示消费完成,可以生产
        super.notify();         //唤醒其它等待线程

        return  this.title + this.content;
    }


}

class ProducerThread implements Runnable {
    private final Message message;    //获得message的引用

    public ProducerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            this.message.set(
                    "title: " + i,"content: "+i
            );
        }
    }

}

class ConsumerThread implements Runnable {

    private final Message message;    //获得message的引用

    public ConsumerThread(Message message) {
        this.message = message;
    }

    @Override
    public void run(){

        for (int i = 0; i < 100; i++) {
            System.out.println(this.message.get());
        }

    }
}

public class Test {

    public static void main(String[] args) {

        Message message = new Message();        //实例化Message
        new Thread(
                new ProducerThread(message)
        ).start();                  //启动生产者线程

        new Thread(
                new ConsumerThread(message)
        ).start();                  // 启动消费者线程

    }

}
上一篇下一篇

猜你喜欢

热点阅读