Disruptor框架用作生产者消费者模型
2020-10-19 本文已影响0人
Chermack
package com.example.disruptor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
public class DisruptorTest {
public static void main(String[] args) throws InterruptedException {
MessageReporter reporter = MessageReporter.INSTANCE;
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(() -> reporter.sendMessage(MessageType.A, 120000));
pool.submit(() -> reporter.sendMessage(MessageType.B, 240000));
pool.submit(() -> reporter.sendMessage(MessageType.C, 360000));
pool.shutdown();
while (!pool.isTerminated()) {
pool.awaitTermination(1, TimeUnit.MILLISECONDS);
}
reporter.shutdown();
}
}
@FunctionalInterface
interface ShutdownHook {
void onShutdown();
}
/**
* 枚举单例模式
* shutdownHook是关闭时的回调函数,用于打印验证结果。
*/
enum MessageReporter {
INSTANCE;
private final Disruptor<MessageEvent> disruptor;
private ShutdownHook shutdownHook;
MessageReporter() {
disruptor = new Disruptor<>(MessageEvent::new, 1 << 16
, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new YieldingWaitStrategy());
// multiConsumerOnce(); //多个消费者,不重复消费
multiHandlerSelf(); //多个消费者,重复消费
disruptor.start();
}
/**
* 多个消费者,不重复消费
*/
private void multiConsumerOnce() {
LongAdder total = new LongAdder();
Map<MessageType, LongAdder> map = new ConcurrentHashMap<>();
MessageEventConsumer[] messageEventConsumers = new MessageEventConsumer[3];
for (int i = 0; i < 3; i++) {
messageEventConsumers[i] = new MessageEventConsumer(total, map);
}
disruptor.handleEventsWithWorkerPool(messageEventConsumers);
shutdownHook = () -> {
System.out.println(total);
System.out.println(map);
};
}
/**
* 多个消费者,重复消费
*/
private void multiHandlerSelf() {
MessageEventHandlerA handlerA = new MessageEventHandlerA();
MessageEventHandlerB handlerB = new MessageEventHandlerB();
disruptor.handleEventsWith(handlerA, handlerB);
shutdownHook = () -> {
};
}
/**
* 尝试发送messageType的消息n次
*
* @param messageType 消息类型
* @param n 重复次数
*/
void sendMessage(MessageType messageType, int n) {
for (int i = 0; i < n; i++) {
Message message = new Message(messageType, i + "");
while (!disruptor.getRingBuffer().tryPublishEvent(
(messageEvent, sequence, m) -> messageEvent.setMessage(m), message)) {
Thread.yield(); //publish不成功,是因为队列满了,可以让当前发送线程等待一下再重试(调整生产者消费者数量来达到平衡)
}
}
}
void shutdown() {
disruptor.shutdown();
shutdownHook.onShutdown();
}
}
/**
* 多个消费者不重复消费
*/
class MessageEventConsumer implements WorkHandler<MessageEvent>, LifecycleAware {
LongAdder total;
Map<MessageType, LongAdder> map;
public MessageEventConsumer(LongAdder total, Map<MessageType, LongAdder> map) {
this.total = total;
this.map = map;
}
@Override
public void onEvent(MessageEvent messageEvent) {
System.out.println(Thread.currentThread().getName() + " 消费消息 " + messageEvent.getMessage());
LongAdder count = map.computeIfAbsent(messageEvent.getMessage().getType(),
messageType -> new LongAdder());
count.increment();
total.increment();
}
@Override
public void onStart() {
System.out.println(this + "创建了!");
}
@Override
public void onShutdown() {
System.out.println(this + "要关闭了!");
}
}
/**
* 并行处理者A
*/
class MessageEventHandlerA implements EventHandler<MessageEvent>, LifecycleAware {
long total;
Map<MessageType, AtomicInteger> map = new ConcurrentHashMap<>();
@Override
public void onEvent(MessageEvent messageEvent, long l, boolean b) {
System.out.println(Thread.currentThread().getName() + " 消费消息 " + messageEvent.getMessage());
AtomicInteger count = map.computeIfAbsent(messageEvent.getMessage().getType(),
messageType -> new AtomicInteger(0));
count.incrementAndGet();
total++;
}
@Override
public void onStart() {
System.out.println(this + "创建了!");
}
@Override
public void onShutdown() {
System.out.println(this + "处理了" + total);
System.out.println(this + map.toString());
System.out.println(this + "要关闭了!");
}
}
/**
* 并行处理者B
*/
class MessageEventHandlerB implements EventHandler<MessageEvent>, LifecycleAware {
long total;
Map<MessageType, AtomicInteger> map = new ConcurrentHashMap<>();
@Override
public void onEvent(MessageEvent messageEvent, long l, boolean b) {
System.out.println(Thread.currentThread().getName() + "消费消息" + messageEvent.getMessage());
AtomicInteger count = map.computeIfAbsent(messageEvent.getMessage().getType(),
messageType -> new AtomicInteger());
count.incrementAndGet();
total++;
}
@Override
public void onStart() {
System.out.println(this + "创建了!");
}
@Override
public void onShutdown() {
System.out.println(this + "处理了" + total);
System.out.println(this + map.toString());
System.out.println(this + "要关闭了!");
}
}
enum MessageType {
A, B, C
}
class Message {
private final MessageType type;
private final String message;
public Message(MessageType type, String message) {
this.type = type;
this.message = message;
}
public MessageType getType() {
return type;
}
@Override
public String toString() {
return "Message{" +
"type=" + type +
", message='" + message + '\'' +
'}';
}
}
class MessageEvent {
private Message message;
public Message getMessage() {
return message;
}
public void setMessage(Message message) {
this.message = message;
}
}
多消费者重复消费结果(multiHandlerSelf方法)
![](https://img.haomeiwen.com/i5312099/7432bb8ccb210833.png)
多消费者不重复消费(multiConsumerOnce方法)
![](https://img.haomeiwen.com/i5312099/cf9e4537809c56c1.png)