disruptor笔记之一:快速入门

2021-08-16  本文已影响0人  程序员欣宸

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

关于disruptor

disruptor是LMAX公司开发的一个高性能队列,其作用和阻塞队列(BlockingQueue)类似,都是在相同进程内、不同线程间传递数据(例如消息、事件),另外disruptor也有自己的一些特色:

  1. 以广播的形式发布事件,并且消费者之间存在依赖关系;
  2. 为事件提前分配内存;
  3. 无锁算法;

关于Ring Buffer(环形队列)

在这里插入图片描述

本篇概览

作为《disruptor笔记》系列的开篇,本篇有两个任务:

在这里插入图片描述

用disruptor实现消息的发布和消费的套路

  1. 事件的定义:一个普通的bean(StringEvent.java)
  2. 事件工厂:定义如何生产事件的内存实例,这个实例刚从内存中创建,还没有任何业务数据(StringEventFactory.java)
  3. 事件处理:封装了消费单个事件的具体逻辑(StringEventHandler.java)
  4. 事件生产者:定义了如何将业务数据设置到还没有业务数据的事件中,就是工厂创建出来的那种(StringEventProducer.java)
  5. 初始化逻辑:创建和启动disruptor对象,将事件工厂传给disruptor,创建事件生产者和事件处理对象,并分别与disruptor对象关联;
  6. 业务逻辑:也就是调用事件生产者的<font color="blue">onData</font>方法发布事件,本文的做法是在单元测试类中发布事件,然后检查消费的事件数和生产的事件数是否一致;
    7

环境信息

《Disruptor笔记》系列涉及的环境信息如下:

  1. 操作系统:64位win10
  2. JDK:1.8.0_281
  3. IDE:IntelliJ IDEA 2021.1.1 (Ultimate Edition)
  4. gradle:6.7.1
  5. springboot:2.3.8.RELEASE
  6. disruptor:3.4.4

源码下载

名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
在这里插入图片描述

创建父工程

import java.time.OffsetDateTime
import java.time.format.DateTimeFormatter

buildscript {
    repositories {
        maven {
            url 'https://plugins.gradle.org/m2/'
        }
        // 如果有私服就在此配置,如果没有请注释掉
        maven {
            url 'http://192.168.50.43:8081/repository/aliyun-proxy/'
        }
        // 阿里云
        maven {
            url 'http://maven.aliyun.com/nexus/content/groups/public/'
        }

        mavenCentral()
    }
    ext {
        // 项目版本
        projectVersion = '1.0-SNAPSHOT'

        // sprignboot版本 https://github.com/spring-projects/spring-boot/releases
        springBootVersion = '2.3.8.RELEASE'
    }
}

plugins {
    id 'java'
    id 'java-library'
    id 'org.springframework.boot' version "${springBootVersion}" apply false
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'net.nemerosa.versioning' version '2.14.0'
    id 'io.franzbecker.gradle-lombok' version '4.0.0' apply false
    id 'com.github.ben-manes.versions' version '0.36.0' // gradle dependencyUpdates
}

// If you attempt to build without the `--scan` parameter in `gradle 6.0+` it will cause a build error that it can't find
// a buildScan property to change. This avoids that problem.
if (hasProperty('buildScan')) {
    buildScan {
        termsOfServiceUrl = 'https://gradle.com/terms-of-service'
        termsOfServiceAgree = 'yes'
    }
}

wrapper {
    gradleVersion = '6.7.1'
}

def buildTimeAndDate = OffsetDateTime.now()

ext {
    // 构建时取得当前日期和时间
    buildDate = DateTimeFormatter.ISO_LOCAL_DATE.format(buildTimeAndDate)
    buildTime = DateTimeFormatter.ofPattern('HH:mm:ss.SSSZ').format(buildTimeAndDate)
    buildRevision = versioning.info.commit
}

allprojects {
    apply plugin: 'java'
    apply plugin: 'idea'
    apply plugin: 'eclipse'
    apply plugin: 'io.spring.dependency-management'
    apply plugin: 'io.franzbecker.gradle-lombok'

    compileJava {
        sourceCompatibility = JavaVersion.VERSION_1_8
        targetCompatibility = JavaVersion.VERSION_1_8
        options.encoding = 'UTF-8'
    }

    compileJava.options*.compilerArgs = [
            '-Xlint:all', '-Xlint:-processing'
    ]

    // Copy LICENSE
    tasks.withType(Jar) {
        from(project.rootDir) {
            include 'LICENSE'
            into 'META-INF'
        }
    }

    // 写入到MANIFEST.MF中的内容
    jar {
        manifest {
            attributes(
                    'Created-By': "${System.properties['java.version']} (${System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(),
                    'Built-By': 'travis',
                    'Build-Date': buildDate,
                    'Build-Time': buildTime,
                    'Built-OS': "${System.properties['os.name']}",
                    'Build-Revision': buildRevision,
                    'Specification-Title': project.name,
                    'Specification-Version': projectVersion,
                    'Specification-Vendor': 'Will Zhao',
                    'Implementation-Title': project.name,
                    'Implementation-Version': projectVersion,
                    'Implementation-Vendor': 'Will Zhao'
            )
        }
    }

    repositories {
        mavenCentral()

        // 如果有私服就在此配置,如果没有请注释掉
        maven {
            url 'http://192.168.50.43:8081/repository/aliyun-proxy/'
        }

        // 阿里云
        maven {
            url 'http://maven.aliyun.com/nexus/content/groups/public/'
        }

        jcenter()
    }

    buildscript {
        repositories {
            maven { url 'https://plugins.gradle.org/m2/' }
        }
    }
}

allprojects { project ->
    buildscript {
        dependencyManagement {
            imports {
                mavenBom "org.springframework.boot:spring-boot-starter-parent:${springBootVersion}"
                mavenBom "org.junit:junit-bom:5.7.0"
            }

            dependencies {
                dependency 'org.projectlombok:lombok:1.16.16'
                dependency 'org.apache.commons:commons-lang3:3.11'
                dependency 'commons-collections:commons-collections:3.2.2'
                dependency 'com.lmax:disruptor:3.4.4'
            }
        }

        ext {
            springFrameworkVersion = dependencyManagement.importedProperties['spring-framework.version']
        }
    }
}

group = 'bolingcavalry'
version = projectVersion

新建module

plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'com.lmax:disruptor'

    testImplementation('org.springframework.boot:spring-boot-starter-test')
}
package com.bolingcavalry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BasicEventApplication {
    public static void main(String[] args) {
        SpringApplication.run(BasicEventApplication.class, args);
    }
}

事件的定义

package com.bolingcavalry.service;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@ToString
@NoArgsConstructor
public class StringEvent {

    private String value;
}

事件工厂

package com.bolingcavalry.service;

import com.lmax.disruptor.EventFactory;

public class StringEventFactory implements EventFactory<StringEvent> {

    @Override
    public StringEvent newInstance() {
        return new StringEvent();
    }
}

事件处理

package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {

    public StringEventHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
        
        // 这里延时100ms,模拟消费事件的逻辑的耗时
        Thread.sleep(100);

        // 如果外部传入了consumer,就要执行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}

事件生产者

package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class StringEventProducer {
    // 存储数据的环形队列
    private final RingBuffer<StringEvent> ringBuffer;

    public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(String content) {
        // ringBuffer是个队列,其next方法返回的是下最后一条记录之后的位置,这是个可用位置
        long sequence = ringBuffer.next();

        try {
            // sequence位置取出的事件是空事件
            StringEvent stringEvent = ringBuffer.get(sequence);
            // 空事件添加业务信息
            stringEvent.setValue(content);
        } finally {
            // 发布
            ringBuffer.publish(sequence);
        }
    }
}

初始化逻辑

package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service
@Slf4j
public class BasicEventServiceImpl implements BasicEventService {

    private static final int BUFFER_SIZE = 16;

    private Disruptor<StringEvent> disruptor;

    private StringEventProducer producer;

    /**
     * 统计消息总数
     */
    private final AtomicLong eventCount = new AtomicLong();

    @PostConstruct
    private void init() {
        Executor executor = Executors.newCachedThreadPool();

        // 实例化
        disruptor = new Disruptor<>(new StringEventFactory(),
                BUFFER_SIZE,
                new CustomizableThreadFactory("event-handler-"));

        // 准备一个匿名类,传给disruptor的事件处理类,
        // 这样每次处理事件时,都会将已经处理事件的总数打印出来
        Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            }
        };

        // 指定处理类
        disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));

        // 启动
        disruptor.start();

        // 生产者
        producer = new StringEventProducer(disruptor.getRingBuffer());
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
  1. publish方法给外部调用,用于发布一个事件;
  2. eventCountPrinter是Consumer的实现类,被传给了StringEventHandler,这样StringEventHandler消费消息的时候,eventCount就会增加,也就记下了已经处理的事件总数;
  3. Disruptor的构造方法中,BUFFER_SIZE表示环形队列的大小,这里故意设置为16,这样可以轻易的将环形队列填满,此时再发布事件会不会导致环形队列上的数据被覆盖呢?稍后咱们可以测一下;
  4. 记得调用start方法;

web接口

再写一个web接口类,这样就可以通过浏览器验证前面的代码了:

package com.bolingcavalry.controller;

import com.bolingcavalry.service.BasicEventService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;

@RestController
public class BasicEventController {

    @Autowired
    BasicEventService basicEventService;

    @RequestMapping(value = "/{value}", method = RequestMethod.GET)
    public String publish(@PathVariable("value") String value) {
        basicEventService.publish(value);
        return "success, " + LocalDateTime.now().toString();
    }
}

业务逻辑

package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.BasicEventService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class BasicEventServiceImplTest {

    @Autowired
    BasicEventService basicEventService;

    @Test
    public void publish() throws InterruptedException {
        log.info("start publich test");

        int count = 100;

        for(int i=0;i<count;i++) {
            log.info("publich {}", i);
            basicEventService.publish(String.valueOf(i));
        }

        // 异步消费,因此需要延时等待
        Thread.sleep(1000);
        // 消费的事件总数应该等于发布的事件数
        assertEquals(count, basicEventService.eventCount());
    }
}
在这里插入图片描述 在这里插入图片描述

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos

上一篇 下一篇

猜你喜欢

热点阅读