2018-12-25 线程生产消费者模式

2018-12-28  本文已影响0人  转头就成空

原理: 建立一个公共的容器用来存放产品(产品池),生产者和消费者分别为单独的线程对产品池进行操作。为了保证线程的安全需要利用synchronized来使得产品池来实现阻塞的功能(互锁)。

建立产品池(容器):

public class Container<T> {                       //建立产品池的泛型
    private T[] arr;                              //泛型数组
    private  int top=0;                          //数组指针,指示数组的元素序号          
    public Container() {                        //确定容器的容量
        this(5);
    }

    public Container(int size) {                    //容器的构造方法
        this.arr = (T[])new Object[size];
    }

    public synchronized void save(T o){              //生产者生产产品的同步方法
        while (top==arr.length){                     //判定数组内没有元素
            try {
                this.wait();                          //等待
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
}
        arr[top]=o;                               //添加元素到数组
        for (int i=0;i<arr.length;i++)
        {
         top++;                                    //指针移动
        System.out.println("做了"+top+"个包子");
}
        notify();                                  //通知等待的对象
        System.out.println("有包子了");
    }

测试:

public class Test {
    public static void main(String[] args) {     //主线程
        Container<String> container = new Container<>(); //创建容器对象
        new Thread("做包子") {                      //创建生产者的线程(匿名类)
            @Override
            public void run() {
                for (int i = 0; i < 10; i++)         //生产者生产出的产品数量
                    container.save("包子");
                System.out.println("最后存量:" + container.count());
            }
        }.start();
        new Thread("取包子1") {   //消费者1
            @Override
            public void run() {
                for (int i = 0; i < 100; i++) //保持有消费者进行购买
                    container.fetch();
            }
        }.start();
        new Thread("取包子2") {
            @Override
            public void run() {
                container.fetch();//消费者购买一次
            }
        }.start();
    }
}

此功能实现了在最大存货量达成后,在有消费者需求将不再回应,此时消费者保持等待(过程结束)。这类方法是利用了数组建立栈模型来存放产品,实际中利用队列的模型(或者是)更合适。

为了使主程序简洁,可以单独提出产品类、消费者类、生产者类使得整个程序结构更加优化

产品类:

public class BaoZi {
    private Integer id;
    private String name;

    public BaoZi() {

    }

    public BaoZi(Integer id, String name) {
        this.id = id;
        this.name = name;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "BaoZi{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

生产者类:

public class Productor extends Thread{
    private Container<BaoZi> container;
    private Integer sum;

    public Productor(Container<BaoZi> container,Integer sum,String name) {
        super(name);
        this.container=container;
        this.sum = sum;
    }

    @Override
    public void run() {
        String[] names={"鲜肉包","香菇包","豆沙包"};
        Random random=new Random();
        for(int i=0;i<sum;i++){
             BaoZi baozi=new BaoZi(i+1,names[random.nextInt(3)]);
            System.out.println(Thread.currentThread().getName()+"做一个"+baozi);
            container.save(baozi);
        }
    }
}

消费者类:

public class Customer extends Thread{
    private Container<BaoZi>container;
    private Integer sum;

    public Customer(Container<BaoZi>container,Integer sum,String name) {
        super(name);
        this.container=container;
        this.sum = sum;
    }

    @Override
    public void run() {

        Random random=new Random();
        for(int i=0;i<sum;i++){
            BaoZi baoZi=container.fetch();
            System.out.println(Thread.currentThread().getName()+"买了一个"+baoZi);
            try {
                Thread.sleep(1000+random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

产品池:

public class Container<T> {
    private T[] arr;
    private int top = 0;
    public Container() {
        this(5);
    }

    public Container(int size) {
        this.arr = (T[]) new Object[size];
    }

    public synchronized void save(T o) {
        while (top == arr.length) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
}
      arr[top] = o;
          top++;
     notify();
  }
public synchronized T fetch() {
        while (top == 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            Thread.sleep(400);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        top--;
        T t = arr[top];
        notify();
         return t;
    }
 public int count() {
        return top;
    }
}
上一篇下一篇

猜你喜欢

热点阅读