Disruptor-04 消费之handleEventsWith
2020-06-09 本文已影响0人
rock_fish
1. 使用handleEventsWith 指定消费者
- handleEventsWith 可配置多个EventHandler,为其中的每个EventHandler创建一个线程.
- 对于消费者来说,只要有元素可以消费,就进行消费
- 多消费者模式下,不是等到1个元素被所有消费者都消费完了,才继续一起同时开始消费下一个;消费者互相之间是不等待的,只要当前元素消费完了,还有元素可以消费,当前消费者就继续消费下一个元素
- 多个消费者的情况下,每个消费者都顺序消费RingBuffer的消息,只是消费速度不同
1.1非批量发布事件
- 当生产者把RingBuffer放满时,生产者进入等待状态,直到最慢的一个消费者消费掉一个元素(即等到至少有一个位置可写入).
1.2批量发布事件publishEvents
-
批量写入的数据个数,不能超过RingBuffer的大小;如RingBuffer的大小是4,则这一批的数据只能<=4 ,否则报错.
-
批量发布事件对象,消费者 也 按照发布的批次数据来消费;这个批次的数据作为一个整体来作为是否有可写入空间;比如一次批写入是2个事件,那么就需要有2个可写入位置的时候才写入;同样如果一次批写入是4个事件,那么就需要有4个可写入位置的时候才写入.
1.3 写入策略
- 对于
publishEvent
当消费速度慢,导致没有位置可写入的时候,生产者要干巴巴的等待着 - 有没有写入控制策略比如写入超时?放弃写入,尝试写入? 答案是:tryxxx,通过这个
tryPublishEvent
,tryPublishEvents
我们可以自己实现一个超时写入策略:失败丢弃,限时等待,多次尝试...
1.4 批量消费数据
消费者在执行消费的时候,要查看一下已经有哪些数据可消费了,即获取可消费的事件的最大下标,然后就通过循环遍历的方式,将当前消费位置,到最大消费位置之间的数据连续的消费掉;而不是消费一个,查看一下下一个位置是不是可以消费.
源码
事件类
public class LongEvent {
private int id;
private long value;
public LongEvent(int id) {
this.id = id;
}
public int getId() {
return id;
}
public long getValue() {
return value;
}
public void set(long value)
{
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"id=" + id +
", value=" + value +
'}';
}
}
事件工厂类
public class LongEventFactory implements EventFactory<LongEvent>
{
private static int counter =0;
public LongEvent newInstance()
{
LongEvent longEvent = new LongEvent(counter++);
System.out.println("new Event:" + longEvent.getId());
return longEvent;
}
}
快消费
public class LongEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
//System.out.println("消费 第" + l + "个Event对象,其Id为:" + longEvent.getId() + "其内容为:" + longEvent.getValue());
System.out.println(LocalDateTime.now()+ " 快消费线程" + Thread.currentThread().getId() + " 消费 第" + l+ " 个 Event对象,其id 为:"+longEvent.getId()+ " ,读取其值为:" + longEvent.getValue());
}
}
慢消费
public class SlowEventHandler implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
TimeUnit.SECONDS.sleep(3);
System.out.println(LocalDateTime.now()+ " 慢消费线程" + Thread.currentThread().getId() + " 消费 第" + l+ " 个 Event对象,其id 为:"+longEvent.getId()+ " ,读取其值为:" + longEvent.getValue());
}
}
事件转换器
/**
* 复用对象,仅变更对象中的属性值
*/
public class LongEventTranslator implements EventTranslatorOneArg<LongEvent,Long> {
@Override
public void translateTo(LongEvent longEvent, long l, Long aLong) {
System.out.println(LocalDateTime.now()+ " 生产线程" + Thread.currentThread().getId() + " 发布 第" + l+ " 个 Event对象,其id 为:"+longEvent.getId()+ " ,设置其值为:" + aLong);
longEvent.set(aLong);
}
}
生产者
public class LongEventProducer {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void publishEvent(Long aLong) {
EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
/**
* 生产者阻塞等待,直到有ringbuffer中有空闲位置可写入
*/
ringBuffer.publishEvent(translator, aLong);
}
public void tryPublishEvent(Long aLong) {
EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
/**
* 生产者尝试往ringbuffer写入:
* 1. 若可写入,则写入并返回成功
* 2. 若无空闲位置可写入,则直接返回false,不阻塞等待
*/
boolean publishSucess = ringBuffer.tryPublishEvent(translator, aLong);
if(!publishSucess){
System.out.println("尝试发布失败...");
}
}
public void publishEvents(Long[] aLongs) {
/**
* 批量发布
*/
EventTranslatorOneArg<LongEvent, Long> translator = new LongEventTranslator();
ringBuffer.publishEvents(translator, aLongs);
}
}
public static void main(String[] args) throws InterruptedException {
// Executor that will be used to construct new threads for consumers
Executor executor = Executors.newCachedThreadPool();
// 事件工厂,用于创建event
LongEventFactory factory = new LongEventFactory();
// 指定ringbuf的大小,必须是2的整数倍
int bufferSize = 4;
// 构建一个 Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);
System.out.println("..new Disruptor...");
// 给disruptor中添加消费者
disruptor.handleEventsWith(new LongEventHandler());//快消费,
disruptor.handleEventsWith(new SlowEventHandler());//慢消费,
System.out.println("..handleEventsWith...");
// 启动disruptor
disruptor.start();
System.out.println("..start...");
//-----------万事俱备,只欠消息(消息的生产者投递消息)
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 跟blockqueue 比对一下
/*for (long l = 0; l<8; l++)
{
long startAt = System.currentTimeMillis();
producer.publishEvent(l);
long endAt = System.currentTimeMillis();
//System.out.println(endAt-startAt);
//Thread.sleep(1000);
}*/
/* 按照Ringbuffer大小,批发布
producer.publishEvents(new Long[]{0L,1L,2L,3L});
producer.publishEvents(new Long[]{4L,5L,6L,7L});
producer.publishEvents(new Long[]{8L,9L,10L,11L});
producer.publishEvents(new Long[]{12L,13L,14L,15L});*/
// 小于Ringbuffer大小批写入
producer.publishEvents(new Long[]{0L,1L});
producer.publishEvents(new Long[]{2L,3L});
producer.publishEvents(new Long[]{4L,5L});
producer.publishEvents(new Long[]{6L,7L});
}