并发编程之Disruptor-1.核心简介
2020-05-30 本文已影响0人
笨鸡
1.简介
-
DIsruptor核心-RIngBuffer、Disroptor
-
Disruptor核心-Sequence、Sequence Barrier
-
Disruptor核心-WaitStrategy
-
Disruptor核心-Event、EventProcessor
-
Disruptor核心-EventHandler消费者处理器
-
Disruptor核心-WorkProcessor核心工作器
2.Disruptor Quick Start
- 建立一个工厂Event类,用于创建Event类实例对象
@Data public class OrderEvent { private long value; }
public class OrderEventFactory implements EventFactory<OrderEvent> { @Override public OrderEvent newInstance() { return new OrderEvent(); } }
- 需要有一个监听事件类,用于处理数据(Event类)
@Slf4j public class OrderEventHandler implements EventHandler<OrderEvent> { @Override public void onEvent(OrderEvent orderEvent, long l, boolean b) throws Exception { log.info("消费者:" + orderEvent.getValue()); } }
-
实例化Disruptor实例,配置一系列参数,编写Disruptor核心组件
public class Main { public static void main(String[] args) { OrderEventFactory orderEventFactory = new OrderEventFactory(); int ringBufferSize = 1024 * 1024; ThreadFactory threadFactory = r -> new Thread(r); ProducerType producerType = ProducerType.SINGLE; WaitStrategy waitStrategy = new BlockingWaitStrategy(); /** * 1.eventFactory: 消息(event)工厂对象 * 2.ringBufferSize: 容器的长度 * 3.threadFactory: 线程工厂 * 4.producerType: 单(多)生产者 * 5.waitStrategy: 等待策略 */ // 1.实例化disruptor对象 Disruptor<OrderEvent> disruptor = new Disruptor<>( orderEventFactory, ringBufferSize, threadFactory, producerType, waitStrategy ); // 2.添加消费者的监听(构建disruptor与消费者的一个关联关系) disruptor.handleEventsWith(new OrderEventHandler()); // 3.启动disruptor disruptor.start(); // 4.获取实际存储数据的容器: RingBuffer RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer(); OrderEventProducer producer = new OrderEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < 100; i++) { bb.putLong(0, i); producer.sendData(bb); } disruptor.shutdown(); } }
-
编写生产者组件,向Disruptor容器中去投递数据
/** * Copyright (C), 2015-2020, XXX有限公司 * FileName: OrderEventProducer * Author: CT * Date: 2020/5/30 15:24 * Description: * History: * <author> <time> <version> <desc> * 作者姓名 修改时间 版本号 描述 */ package org.ctgu.game.gameserver.disruptor; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; /** * 〈一句话功能简述〉<br> * 〈〉 * * @author CT * @create 2020/5/30 * @since 1.0.0 */ public class OrderEventProducer { private RingBuffer<OrderEvent> ringBuffer; public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data){ // 1.在生产者发送消息的时候,首先需要从我们的ringBuffer里面获取一个可用的序号 long sequence = ringBuffer.next(); try { // 2.根据这个序号,找到具体的"OrderEvent" 元素 // 此时的OrderEvent对象是一个没有被赋值的"空对象" OrderEvent event = ringBuffer.get(sequence); // 3.进行实际的赋值处理 event.setValue(data.getLong(0)); } finally { // 4.提交发布操作 ringBuffer.publish(sequence); } } }