通过阻塞队列实现线程间的通信

2019-05-04  本文已影响0人  溪水散人

话不多说,直接上代码:

/**
 * 资源类
 */
class DataSource{
    private volatile boolean FLAG=true;
    private AtomicInteger atomicInteger=new AtomicInteger();

    BlockingQueue<String> blockingDeque=null;

    public DataSource(BlockingQueue blockingDeque){
        this.blockingDeque=blockingDeque;
        System.out.println(blockingDeque.getClass().getName());
    }

    public void produce() throws  Exception{
        String data=null;
        boolean retValue;
        while (FLAG) {
            data=atomicInteger.incrementAndGet()+"";
            retValue=blockingDeque.offer(data,2L, TimeUnit.SECONDS);
            if (retValue){
                System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");
            }
            TimeUnit.SECONDS.sleep(1);
        }
        System.out.println(Thread.currentThread().getName()+"\t叫停:表示FLAG=false,生产动作结束");
    }

    public void consumer() throws Exception{
        String result=null;
        while (true){
            result=blockingDeque.poll(2L,TimeUnit.SECONDS);
            if (result ==null || result.equalsIgnoreCase("")){
                System.out.println(Thread.currentThread().getName()+"\t超过2S没有取到,退出");
                return;
            }
            System.out.println(Thread.currentThread().getName()+"\t消费队列"+result+"成功");
        }

    }

    public void stop(){
        FLAG=false;
    }
public class ProducerConsumerBlockQueue {
    public static void main(String[] args) {
      DataSource dataSource= new DataSource(new SynchronousQueue(true));
      new Thread(() ->{
          System.out.println(Thread.currentThread().getName()+"\t生产线程启动");
          try {
              dataSource.produce();
          } catch (Exception e) {
              e.printStackTrace();
          }
      },"producer").start();

        new Thread(() ->{
            System.out.println(Thread.currentThread().getName()+"\t消费线程启动");
            try {
                dataSource.consumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        },"consumer").start();

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("5s时间到,停止主线程");
        dataSource.stop();
    }
}
java.util.concurrent.SynchronousQueue
producer    生产线程启动
consumer    消费线程启动
consumer    消费队列1成功
producer    插入队列1成功
producer    插入队列2成功
consumer    消费队列2成功
producer    插入队列3成功
consumer    消费队列3成功
consumer    消费队列4成功
producer    插入队列4成功
producer    插入队列5成功
consumer    消费队列5成功
5s时间到,停止主线程
producer    叫停:表示FLAG=false,生产动作结束
consumer    超过2S没有取到,退出
DataSource dataSource= new DataSource(new ArrayBlockingQueue(10));
上一篇下一篇

猜你喜欢

热点阅读