Exchanger使用
2020-03-20 本文已影响0人
桐桑入梦
生成器:
public interface Generator<T> {
T next();
}
public class BasicGenerator<T> implements Generator<T>{
private Class<T> type;
public BasicGenerator(Class<T> type){
this.type = type;
}
public T next(){
try{
return type.getDeclaredConstructor().newInstance();
}catch(Exception e){
throw new RuntimeException(e);
}
}
public static <T> Generator<T> create(Class<T> type){
return new BasicGenerator<>(type);
}
}
class ExchangerProducer<T> implements Runnable{
//用于创建T类型对象
private Generator<T> generator;
//用于交换保存了多个T类型对象容器的Exchanger
private Exchanger<List<T>> exchanger;
//保存多个T类型对象的容器
private List<T> holder;
public ExchangerProducer(Exchanger<List<T>> exchanger, Generator<T> generator, List<T> holder){
this.generator = generator;
this.exchanger = exchanger;
this.holder = holder;
}
@Override
public void run(){
try{
while(!Thread.interrupted()){
//生产n个对象
for(int i = 0; i < ExchangerDemo.size; i++)
holder.add(generator.next());
//交换对象的列表,用满的交换成空的容器
holder = exchanger.exchange(holder);
}
}catch(InterruptedException e){
//异常处理
}
}
}
class ExchangerConsumer<T> implements Runnable{
//用于交换保存了多个T类型对象容器的Exchanger
private Exchanger<List<T>> exchanger;
private List<T> holder;
private volatile T value;
public ExchangerConsumer(Exchanger<List<T>> exchanger, List<T> holder){
this.exchanger = exchanger;
this.holder = holder;
}
@Override
public void run(){
try{
while(!Thread.interrupted()){
//进行交换,如果对方没有执行exchange,那么阻塞等待
holder = exchanger.exchange(holder);
//处理数据
System.out.println(holder);
for(T item : holder){
value = item;
holder.remove(item);
}
}
}catch(InterruptedException e){
//异常处理
}
System.out.println("Final value = " + value);
}
}
public class ExchangerDemo {
public static int size = 10;
public static int delay = 5;
public static void main(String[] args) throws InterruptedException{
//设置存储对象的容器的大小和延迟时间
if(args.length > 0)
size = Integer.parseInt(args[0]);
if(args.length > 1)
delay = Integer.parseInt(args[1]);
//创建线程池
ExecutorService exec = Executors.newCachedThreadPool();
//创建Exchanger,用来交换两个List<Fat>对象
Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>();
//这种List允许在列表遍历的时候调用remove(),使用redis快照相同的copy on write技术
List<Fat> producerList = new CopyOnWriteArrayList<Fat>();
List<Fat> consumerList = new CopyOnWriteArrayList<Fat>();
//提交生产者和消费者任务
exec.execute(new ExchangerProducer<Fat>(xc, BasicGenerator.create(Fat.class), producerList));
exec.execute(new ExchangerConsumer<Fat>(xc, consumerList));
TimeUnit.SECONDS.sleep(delay);
exec.shutdownNow();
}
}