Disruptor-04 消费之handleEventsWith

2020-06-09  本文已影响0人  rock_fish

1. 使用handleEventsWith 指定消费者

1.1非批量发布事件

1.2批量发布事件publishEvents

1.3 写入策略

1.4 批量消费数据

消费者在执行消费的时候,要查看一下已经有哪些数据可消费了,即获取可消费的事件的最大下标,然后就通过循环遍历的方式,将当前消费位置,到最大消费位置之间的数据连续的消费掉;而不是消费一个,查看一下下一个位置是不是可以消费.

源码

事件类

public class LongEvent {
    private int id;
    private long value;

    public LongEvent(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }


    public long getValue() {
        return value;
    }

    public void set(long value)
    {
        this.value = value;
    }

    @Override
    public String toString() {
        return "LongEvent{" +
                "id=" + id +
                ", value=" + value +
                '}';
    }
}

事件工厂类

public class LongEventFactory implements EventFactory<LongEvent>
{

    private static int counter =0;
    public LongEvent newInstance()
    {
        LongEvent longEvent =  new LongEvent(counter++);
        System.out.println("new Event:" + longEvent.getId());
        return  longEvent;
    }
}

快消费

public class LongEventHandler implements EventHandler<LongEvent> {
    
    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        //System.out.println("消费 第" + l + "个Event对象,其Id为:" + longEvent.getId() + "其内容为:" + longEvent.getValue());
        System.out.println(LocalDateTime.now()+ "  快消费线程" + Thread.currentThread().getId() + " 消费 第" + l+ " 个 Event对象,其id 为:"+longEvent.getId()+ " ,读取其值为:" + longEvent.getValue());
    }
}

慢消费

public class SlowEventHandler implements EventHandler<LongEvent> {
    
    @Override
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        TimeUnit.SECONDS.sleep(3);
        System.out.println(LocalDateTime.now()+ "  慢消费线程" + Thread.currentThread().getId() + " 消费 第" + l+ " 个 Event对象,其id 为:"+longEvent.getId()+ " ,读取其值为:" + longEvent.getValue());
    }
}

事件转换器

/**
 * 复用对象,仅变更对象中的属性值
 */
public class LongEventTranslator  implements EventTranslatorOneArg<LongEvent,Long> {
    @Override
    public void translateTo(LongEvent longEvent, long l, Long aLong) {
        System.out.println(LocalDateTime.now()+ "  生产线程" + Thread.currentThread().getId() + " 发布 第" + l+ " 个 Event对象,其id 为:"+longEvent.getId()+ " ,设置其值为:" + aLong);
        longEvent.set(aLong);
    }
}

生产者

public class LongEventProducer {
    private RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }


    public void publishEvent(Long aLong) {

        EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
        /**
         * 生产者阻塞等待,直到有ringbuffer中有空闲位置可写入
         */
        ringBuffer.publishEvent(translator, aLong);
    }

    public void tryPublishEvent(Long aLong) {

        EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();

        /**
         * 生产者尝试往ringbuffer写入:
         * 1. 若可写入,则写入并返回成功
         * 2. 若无空闲位置可写入,则直接返回false,不阻塞等待
         */
        boolean publishSucess = ringBuffer.tryPublishEvent(translator, aLong);
        if(!publishSucess){
            System.out.println("尝试发布失败...");
        }

    }

    public void publishEvents(Long[] aLongs) {

        /**
         * 批量发布
         */
        EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
        ringBuffer.publishEvents(translator, aLongs);

    }
}
public static void main(String[] args) throws InterruptedException {

        // Executor that will be used to construct new threads for consumers
        Executor executor = Executors.newCachedThreadPool();

        // 事件工厂,用于创建event
        LongEventFactory factory = new LongEventFactory();

        // 指定ringbuf的大小,必须是2的整数倍
        int bufferSize = 4;

        // 构建一个 Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);

        System.out.println("..new Disruptor...");

        // 给disruptor中添加消费者
        disruptor.handleEventsWith(new LongEventHandler());//快消费,
        disruptor.handleEventsWith(new SlowEventHandler());//慢消费,
        System.out.println("..handleEventsWith...");

        // 启动disruptor
        disruptor.start();
        System.out.println("..start...");

        //-----------万事俱备,只欠消息(消息的生产者投递消息)

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        // 跟blockqueue 比对一下

        /*for (long l = 0; l<8; l++)
        {
            long startAt = System.currentTimeMillis();
            producer.publishEvent(l);
            long endAt = System.currentTimeMillis();
            //System.out.println(endAt-startAt);
            //Thread.sleep(1000);
        }*/


        /* 按照Ringbuffer大小,批发布
        producer.publishEvents(new Long[]{0L,1L,2L,3L});
        producer.publishEvents(new Long[]{4L,5L,6L,7L});
        producer.publishEvents(new Long[]{8L,9L,10L,11L});
        producer.publishEvents(new Long[]{12L,13L,14L,15L});*/
        
        // 小于Ringbuffer大小批写入
        producer.publishEvents(new Long[]{0L,1L});
        producer.publishEvents(new Long[]{2L,3L});
        producer.publishEvents(new Long[]{4L,5L});
        producer.publishEvents(new Long[]{6L,7L});

    }
上一篇下一篇

猜你喜欢

热点阅读