Java高并发高性能编程(多线程,协程,Actor,RxJava、Akka、Reactor)

高性能队列disruptor

2018-07-22  本文已影响0人  划水者

disruptor是一个高性能的内存队列,之所以高性能,因为有以下几个特点:

1 整个disruptor的实现在并发处理中没有使用锁,而是使用的cas操作(disruptor被称为无锁队列的原因)

2 disruptor的内部实现采用循环数组,这样可以避免jvm频繁回收

3 解决了伪共享问题,加速了不同线程同时访问一个缓存行

disruptor的使用场景:

disruptor的内部的设计是生产者和消费者原理,目前log4j2的异步日志就是基于disruptor实现的,还有很多开源项目storm也会依赖disruptor,先来一波demo

package com.guoxiong.disruptor;

/**

* 2018/7/15 下午2:42

*

* @author Jungler

* @since

*/

public class MyData {

private int id;

  private Stringvalue;

  public MyData(int id, String value) {

this.id = id;

      this.value = value;

  }

public int getId() {

return id;

  }

public void setId(int id) {

this.id = id;

  }

public StringgetValue() {

return value;

  }

public void setValue(String value) {

this.value = value;

  }

@Override

  public StringtoString() {

return "MyData{" +

"id=" +id +

", value='" +value +'\'' +

'}';

  }

}


package com.guoxiong.disruptor;

/**

* 2018/7/15 下午2:42

*

* @author Jungler

* @since

*/

public class MyDataEvent {

public MyDataEvent(){

}

private MyDatadata;

  public MyDatagetData() {

return data;

  }

public void setData(MyData data) {

this.data = data;

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.EventFactory;

/**

* 2018/7/15 下午2:43

*

* @author Jungler

* @since

*/

public class MyDataEventFactoryimplements EventFactory {

@Override

  public MyDataEventnewInstance() {

return new MyDataEvent();

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.*;

/**

* 2018/7/22 下午3:18

*

* 消费者处理

*

* @author Jungler

* @since

*/

public class MsgBatchConsumerimplements EventHandler {

private Stringname;

  public MsgBatchConsumer(String name){

this.name = name;

  }

@Override

  public void onEvent(MyDataEvent myDataEvent, long l, boolean b)throws Exception {

System.out.println("name = " +name +" data = " + myDataEvent.getData().toString());

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.WorkHandler;

/**

* 2018/7/22 下午3:42

*

* @author Jungler

* @since

*/

public class MsgWorkConsumerimplements WorkHandler {

private Stringname;

  public MsgWorkConsumer(String name){

this.name = name;

  }

@Override

  public void onEvent(MyDataEvent myDataEvent)throws Exception {

System.out.println("name = " +name +" data = " + myDataEvent.getData().toString());

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.RingBuffer;

import com.lmax.disruptor.dsl.Disruptor;

import java.util.List;

/**

* 2018/7/22 下午3:22

*

* @author Jungler

* @since

*/

public class MsgProducer {

private Disruptordisruptor;

  public MsgProducer(Disruptor disruptor){

this.disruptor = disruptor;

  }

public void send(MyData data){

RingBuffer ringBuffer =this.disruptor.getRingBuffer();

      long next = ringBuffer.next();

      try{

MyDataEvent event = ringBuffer.get(next);

        event.setData(data);

      }finally {

if(next ==5){

return;

        }

ringBuffer.publish(next);

      }

}

public void send(List dataList){

for(MyData data : dataList){

this.send(data);

      }

}

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.dsl.Disruptor;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.Executors;

/**

* 2018/7/22 下午3:25

*

* 单生产者,多消费者,每一个消费者消费全部的数据

*

* @author Jungler

* @since

*/

public class DisruptorDemo1 {

public static void main(String[] args) {

MyDataEventFactory myDataEventFactory =new MyDataEventFactory();

      Disruptor disruptor =new Disruptor(myDataEventFactory, 1024, Executors.defaultThreadFactory());

      //定义消费者

      MsgBatchConsumer msg1 =new MsgBatchConsumer("1");

      MsgBatchConsumer msg2 =new MsgBatchConsumer("2");

      MsgBatchConsumer msg3 =new MsgBatchConsumer("3");

      disruptor.handleEventsWith(msg1, msg2, msg3);

      disruptor.start();

      // 定义要发送的数据

      MsgProducer msgProducer =new MsgProducer(disruptor);

      List myDataList =new ArrayList();

      myDataList.add(new MyData(2,"2222"));

      myDataList.add(new MyData(3,"3333"));

      myDataList.add(new MyData(1,"1111"));

      myDataList.add(new MyData(4,"4444"));

      myDataList.add(new MyData(5,"5555"));

      msgProducer.send(myDataList);

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.dsl.Disruptor;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.Executors;

/**

* 2018/7/22 下午3:45

*

* 单生产者,多消费者,分组消费(每一个分组合并消费全部数据)

*

* @author Jungler

* @since

*/

public class DisruptorDemo2 {

public static void main(String[] args) {

MyDataEventFactory myDataEventFactory =new MyDataEventFactory();

      Disruptor disruptor =new Disruptor(myDataEventFactory, 16, Executors.defaultThreadFactory());

      MsgWorkConsumer consumer1 =new MsgWorkConsumer("aa");

      //MsgWorkConsumer consumer2 = new MsgWorkConsumer("bb");

//MsgWorkConsumer consumer3 = new MsgWorkConsumer("cc");

//MsgWorkConsumer consumer4 = new MsgWorkConsumer("dd");

      disruptor.handleEventsWithWorkerPool(consumer1);

      //disruptor.handleEventsWithWorkerPool(consumer3,consumer4);

      disruptor.start();

      MsgProducer msgProducer1 =new MsgProducer(disruptor);

      MsgProducer msgProducer2 =new MsgProducer(disruptor);

      List myDataList1 =new ArrayList();

      List myDataList2 =new ArrayList();

      for(int i =1; i <6; i++){

myDataList1.add(new MyData(i,"data" + i));

      }

for(int i =6; i <11; i++){

myDataList2.add(new MyData(i,"data" + i));

      }

msgProducer1.send(myDataList1);

      msgProducer2.send(myDataList2);

      System.out.println(disruptor.getRingBuffer());

      try {

Thread.sleep(5000);

      }catch (Exception e){

}

disruptor.getRingBuffer().publish(5);

      System.out.println(disruptor.getRingBuffer());

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.dsl.Disruptor;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.Executors;

/**

* 2018/7/22 下午4:05

*

* 多个消费者顺序消费

*

* @author Jungler

* @since

*/

public class DisruptorDemo3 {

public static void main(String[] args) {

MyDataEventFactory myDataEventFactory =new MyDataEventFactory();

      Disruptor disruptor =new Disruptor(myDataEventFactory, 1024, Executors.defaultThreadFactory());

      //定义消费者

      MsgBatchConsumer msg1 =new MsgBatchConsumer("1");

      MsgBatchConsumer msg2 =new MsgBatchConsumer("2");

      MsgBatchConsumer msg3 =new MsgBatchConsumer("3");

      disruptor.handleEventsWith(msg1, msg3).then(msg2);

      disruptor.start();

      // 定义要发送的数据

      MsgProducer msgProducer =new MsgProducer(disruptor);

      List myDataList =new ArrayList();

      myDataList.add(new MyData(2,"2222"));

      myDataList.add(new MyData(3,"3333"));

      myDataList.add(new MyData(1,"1111"));

      myDataList.add(new MyData(4,"4444"));

      myDataList.add(new MyData(5,"5555"));

      msgProducer.send(myDataList);

  }

}


package com.guoxiong.disruptor;

import com.lmax.disruptor.dsl.Disruptor;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.Executors;

/**

* 2018/7/22 下午4:13

*

* @author Jungler

* @since

*/

public class DisruptorDemo4 {

public static void main(String[] args) {

MyDataEventFactory myDataEventFactory =new MyDataEventFactory();

      Disruptor disruptor =new Disruptor(myDataEventFactory, 1024, Executors.defaultThreadFactory());

      //定义消费者

      MsgBatchConsumer msg1 =new MsgBatchConsumer("1");

      MsgBatchConsumer msg2 =new MsgBatchConsumer("2");

      MsgBatchConsumer msg3 =new MsgBatchConsumer("3");

      MsgBatchConsumer msg4 =new MsgBatchConsumer("4");

      MsgBatchConsumer msg5 =new MsgBatchConsumer("5");

      disruptor.handleEventsWith(msg1, msg3);

      disruptor.handleEventsWith(msg2, msg4);

      disruptor.after(msg3,msg4).handleEventsWith(msg5);

      disruptor.start();

      // 定义要发送的数据

      MsgProducer msgProducer =new MsgProducer(disruptor);

      List myDataList =new ArrayList();

      myDataList.add(new MyData(2,"2222"));

      myDataList.add(new MyData(3,"3333"));

      myDataList.add(new MyData(1,"1111"));

      myDataList.add(new MyData(4,"4444"));

      myDataList.add(new MyData(5,"5555"));

      msgProducer.send(myDataList);

  }

}

上一篇 下一篇

猜你喜欢

热点阅读