Java线程操作实现生产者与消费者模型
2020-05-25 本文已影响0人
撸码小狂魔
基本模型
image.png
class Message { //描述公共空间
private String title;
private String content;
public void setContent(String content) {
this.content = content;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public String getTitle() {
return title;
}
}
class ProducerThread implements Runnable {
private final Message message; //获得message的引用
public ProducerThread(Message message) {
this.message = message;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
this.message.setTitle("title: " + i);
this.message.setContent("content: " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class ConsumerThread implements Runnable {
private final Message message; //获得message的引用
public ConsumerThread(Message message) {
this.message = message;
}
@Override
public void run(){
for (int i = 0; i < 100; i++) {
System.out.println("消费者:" + this.message.getTitle() + this.message.getContent());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Test {
public static void main(String[] args) {
Message message = new Message(); //实例化Message
new Thread(
new ProducerThread(message)
).start(); //启动生产者线程
new Thread(
new ConsumerThread(message)
).start(); // 启动消费者线程
}
}
此时的代码仅仅是是实现了两个线程彼此独立的操作交互,但是通过最终的执行结果可以清楚的发现当前的程序代码之中存在如下的几个设计问题:
- 在生产着没有完成向生产的时候哦虎消费者就已经获取数据了,获取的数据就属于一个半成品数据;
- 如果说现在消费者性能高于生产者,则会出现爱你同一个信息重复消费的问题。
解决数据同步问题
在java的程序之中,如果想要实现数据的同步处理,那么肯定要使用synchronized来完成,如果要完成的话那么肯定就需要同步代码块或者是同步方法,之所以现在出现了不同步的问题,主要是没有在生产的过程里面对数据的操作进行锁定。
image.png
范例:修改程序的结构代码
class Message { //描述公共空间
private String title;
private String content;
public synchronized void set(String title,String content){
this.content = content;
this.title = title;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized String get(){
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return this.title + this.content;
}
}
class ProducerThread implements Runnable {
private final Message message; //获得message的引用
public ProducerThread(Message message) {
this.message = message;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
this.message.set(
"title: " + i,"content: "+i
);
}
}
}
class ConsumerThread implements Runnable {
private final Message message; //获得message的引用
public ConsumerThread(Message message) {
this.message = message;
}
@Override
public void run(){
for (int i = 0; i < 100; i++) {
System.out.println(this.message.get());
}
}
}
public class Test {
public static void main(String[] args) {
Message message = new Message(); //实例化Message
new Thread(
new ProducerThread(message)
).start(); //启动生产者线程
new Thread(
new ConsumerThread(message)
).start(); // 启动消费者线程
}
}
通过当前的程序的改进可以发现此时的代码已经完全可以正确的实现相关的内容的配置,并且没有任何的数据错位,所以解决了数据混乱的问题,但是另外一个问题还没有解决:重复生产数据和重复消费数据。
解决线程重复操作问题
而要想实现这样的功能就必须采用线程的同步、等待与唤醒的处理机制,而这样的处理机制的操作全部定义在了Object类里面,在Object类中提供有如下的几个与线程有关的操作方法。
image.png特别重要的提示:以上这几个处理方法是进行线程控制的,但是这些方法都是在synchronized方法中才去使用的,同时这些方法属于最原始的多线程协作控制。如果真的用这些操作进行开发,那么整个项目里面基本上对死锁的情况就会频发。
class Message { //描述公共空间
private String title;
private String content;
private boolean flag = true; //设置一个标志位(红绿灯)
//flag = true:表示可以生产,但是无法进行消费,
//flag = false:表示可以消费,但是无法生产
public synchronized void set(String title,String content){
if (this.flag == false) { //不允许生产,但是允许消费
try {
super.wait(); //等到消费者线程执行完毕后唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.content = content;
this.title = title;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.flag = false; //表示生产完成,可以消费
super.notify(); //唤醒其它等待线程
}
public synchronized String get(){
if(this.flag == true){ //不允许消费,只允许生产
try {
super.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.flag = true; //表示消费完成,可以生产
super.notify(); //唤醒其它等待线程
return this.title + this.content;
}
}
class ProducerThread implements Runnable {
private final Message message; //获得message的引用
public ProducerThread(Message message) {
this.message = message;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
this.message.set(
"title: " + i,"content: "+i
);
}
}
}
class ConsumerThread implements Runnable {
private final Message message; //获得message的引用
public ConsumerThread(Message message) {
this.message = message;
}
@Override
public void run(){
for (int i = 0; i < 100; i++) {
System.out.println(this.message.get());
}
}
}
public class Test {
public static void main(String[] args) {
Message message = new Message(); //实例化Message
new Thread(
new ProducerThread(message)
).start(); //启动生产者线程
new Thread(
new ConsumerThread(message)
).start(); // 启动消费者线程
}
}