Java并发编程学习记录

Exchanger使用

2020-03-20  本文已影响0人  桐桑入梦

生成器:

public interface Generator<T> {
    T next();
}
public class BasicGenerator<T> implements Generator<T>{
    private Class<T> type;
    public BasicGenerator(Class<T> type){
        this.type = type;
    }
    public T next(){
        try{
            return type.getDeclaredConstructor().newInstance();
        }catch(Exception e){
            throw new RuntimeException(e);
        }
    }

    public static <T> Generator<T> create(Class<T> type){
        return new BasicGenerator<>(type);
    }
}
class ExchangerProducer<T> implements Runnable{
    //用于创建T类型对象
    private Generator<T> generator;
    //用于交换保存了多个T类型对象容器的Exchanger
    private Exchanger<List<T>> exchanger;
    //保存多个T类型对象的容器
    private List<T> holder;

    public ExchangerProducer(Exchanger<List<T>> exchanger, Generator<T> generator, List<T> holder){
        this.generator = generator;
        this.exchanger = exchanger;
        this.holder = holder;
    }

    @Override
    public void run(){
        try{
            while(!Thread.interrupted()){
                //生产n个对象
                for(int i = 0; i < ExchangerDemo.size; i++)
                    holder.add(generator.next());
                //交换对象的列表,用满的交换成空的容器
                holder = exchanger.exchange(holder);
            }
        }catch(InterruptedException e){
            //异常处理
        }
    }
}

class ExchangerConsumer<T> implements Runnable{
    //用于交换保存了多个T类型对象容器的Exchanger
    private Exchanger<List<T>> exchanger;
    private List<T> holder;
    private volatile T value;

    public ExchangerConsumer(Exchanger<List<T>> exchanger, List<T> holder){
        this.exchanger = exchanger;
        this.holder = holder;
    }

    @Override
    public void run(){
        try{
            while(!Thread.interrupted()){
                //进行交换,如果对方没有执行exchange,那么阻塞等待
                holder = exchanger.exchange(holder);
                //处理数据
                System.out.println(holder);
                for(T item : holder){
                    value = item;
                    holder.remove(item);
                }
            }
        }catch(InterruptedException e){
            //异常处理
        }
        System.out.println("Final value = " + value);
    }
}

public class ExchangerDemo {
    public static int size = 10;
    public static int delay = 5;

    public static void main(String[] args) throws InterruptedException{
        //设置存储对象的容器的大小和延迟时间
        if(args.length > 0)
            size = Integer.parseInt(args[0]);
        if(args.length > 1)
            delay = Integer.parseInt(args[1]);

        //创建线程池
        ExecutorService exec = Executors.newCachedThreadPool();
        //创建Exchanger,用来交换两个List<Fat>对象
        Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
        //这种List允许在列表遍历的时候调用remove(),使用redis快照相同的copy on write技术
        List<Fat> producerList = new CopyOnWriteArrayList<Fat>();
        List<Fat> consumerList = new CopyOnWriteArrayList<Fat>();
        //提交生产者和消费者任务
        exec.execute(new ExchangerProducer<Fat>(xc, BasicGenerator.create(Fat.class), producerList));
        exec.execute(new ExchangerConsumer<Fat>(xc, consumerList));

        TimeUnit.SECONDS.sleep(delay);
        exec.shutdownNow();
    }
}

Copy-On-Write思想

上一篇 下一篇

猜你喜欢

热点阅读