Disruptor -->Demo1
2018-12-13 本文已影响0人
江南Ryan
package testForFun.demo20181210.demo02;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import javafx.scene.Parent;
import java.util.concurrent.ThreadFactory;
/**
* Created with IntelliJ IDEA
* User:Ryannn
* Date:2018/12/13
* Time:18:34
*/
public class Test02 {
public static void main(String[] args) throws InterruptedException {
System.out.println("位置1");
// 队列中的元素
class Element {
private int value;
public int get(){
return value;
}
public void set(int value){
this.value= value;
}
}
System.out.println("位置2");
//1,生产者工场
ThreadFactory threadFactory = new ThreadFactory() {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
@Override
public Thread newThread(Runnable r) {
System.out.println("位置3");
return new Thread(r,"simple Thread");
}
};
//2,RingBuffer生产工厂,初始化RingBuffer的时候使用
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};
//3,处理Event的handler
EventHandler<Element> handler = new EventHandler<Element>() {
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch) throws Exception {
// System.out.println("位置4");
System.out.println("Element: " + element.get());
}
};
//4,阻塞策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
//5,指定RingBuffer的大小
int bufferSize=16;
//6,创建disruptor线程,采用单生产者模式
Disruptor disruptor = new Disruptor<Element>(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
//设置EventHandler
disruptor.handleEventsWith(handler);
//7,启动distruptor
disruptor.start();
RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
for (int i = 0; i<100 ; i++) {
//获取下一个可用的下标
long sequence = ringBuffer.next();
try{
// System.out.println("位置5"+i);
//返回可用位置的有元素
Element event = ringBuffer.get(sequence);
//设置该元素的值
event.set(i);
}catch (Exception e){
e.printStackTrace();
}finally {
ringBuffer.publish(sequence);
// System.out.println("位置6"+i);
}
Thread.sleep(10);
}
}
}
结果如下:
位置1
位置2
位置3
Element: 0
Element: 1
Element: 2
...