写作与程序

java初入多线程17

2017-10-27  本文已影响4人  胖琪的升级之路

使用Disruptor 实现消费者和生产者

<!--引入需要的jar包 -->
<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.7</version>
</dependency>
// 主方法
public class CusProMain {
    
    public static void main(String[] args) throws Exception {
        ExecutorService  exector = Executors.newCachedThreadPool() ;
        int bufferSize = 1024 ;
        PCDataFactory  eventFactory = new PCDataFactory() ;
        
        Disruptor<PCdataTwo> disruptor =  new Disruptor<>(eventFactory, 
                bufferSize,
                exector,
                ProducerType.MULTI,
                new BlockingWaitStrategy()
                );
        disruptor.handleEventsWithWorkerPool(
                new ConsumerTwo(),
                new ConsumerTwo(),
                new ConsumerTwo(),
                new ConsumerTwo()
                );

        disruptor.start(); 
        
        RingBuffer<PCdataTwo> ringBuffer = disruptor.getRingBuffer() ;
        ProducerTwo producer = new ProducerTwo(ringBuffer) ;
        ByteBuffer bb =ByteBuffer.allocate(8);
        for(long l = 0 ; true ; l++){
            bb.putLong(0 , l);
            producer.pushDate(bb);
            Thread.sleep(100);
            System.out.println("add Data "+ l);
        }
        
    }

}

// 消费者 
public class ConsumerTwo implements WorkHandler<PCdataTwo> {

    @Override
    public void onEvent(PCdataTwo event) throws Exception {
        System.out.println(Thread.currentThread().getId() + ":Event: --"
            +event.getValue() * event.getValue() + "--");
    }
}

//生产者
private final RingBuffer<PCdataTwo> ringBuffer ;
    
    public ProducerTwo(RingBuffer<PCdataTwo> ringBuffer){
        this.ringBuffer = ringBuffer ;
    }
    
    
    public void pushDate (ByteBuffer bb){
        long sequence = ringBuffer.next();  
        try {
            PCdataTwo event = ringBuffer.get(sequence);
            event.setValue(bb.getLong());
        } catch (Exception e) {
        }finally{       
            ringBuffer.publish(sequence);
        }
    }
结果

提高消费者的相应时间:选择合适的策略

上一篇 下一篇

猜你喜欢

热点阅读