项目实战- rabbitmq 可靠性投递

SnowFlake 全局唯一 id 服务

2019-01-17  本文已影响178人  HmilyMing

项目是采用 dubbo + spring boot 2.1.1.RELEASE ,首先来看我们的依赖

注意:这里的 common 就是我们项目中的自定义 通用 common jar 包,用来放一些通用的类、接口、常量 等等

        <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-test</artifactId>
           <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>com.hmily.dubbo</groupId>
          <artifactId>common</artifactId>
          <version>0.0.1-SNAPSHOT</version>
        </dependency>

看看 application.properties 的配置

server.port=8020
server.servlet.context-path=/
# snowFlake 的配置
##  server setting for LongID Gene  
snowFlake.workerId = 0
snowFlake.datacenterId = 0
# dubbo 的配置
# Dubbo Config properties
dubbo.application.id=rabbitmq-snowFlake
dubbo.application.name=rabbitmq-snowFlake
dubbo.application.qosPort=22212
dubbo.application.qosEnable=true
dubbo.scan.basePackages=com.hmily.dubbo.snowFlakeDemo.*
dubbo.protocol.id=dubbo
dubbo.protocol.name=dubbo
dubbo.protocol.port=12343
dubbo.registry.id=rabbitmq-snowFlake-registry
dubbo.registry.address=zookeeper://130.80.151.179:2181
# Enables Dubbo All Endpoints
management.endpoint.dubbo.enabled = true
management.endpoint.dubbo-shutdown.enabled = true
management.endpoint.dubbo-configs.enabled = true
management.endpoint.dubbo-services.enabled = true
management.endpoint.dubbo-references.enabled = true
management.endpoint.dubbo-properties.enabled = true
# Dubbo Health
## StatusChecker Name defaults (default : "memory", "load" )
management.health.dubbo.status.defaults = memory
## StatusChecker Name extras (default : empty )
management.health.dubbo.status.extras = load,threadpool

接着就是我们核心 SnowFlake 实现

public class SnowFlake {
      
    protected static final Logger LOG = LoggerFactory.getLogger(SnowFlake.class);
    
    @Value("snowFlake.workerId")
    private static long workerId;
    @Value("snowFlake.datacenterId")
    private static long datacenterId;
    
    static SnowFlake instance = new SnowFlake(workerId, datacenterId);
    
    private long sequence = 0L;
 
    private long twepoch = 1288834974657L;
    // 机器标识位数 
    private long workerIdBits = 5L;
 // 数据中心标识位数
    private long datacenterIdBits = 5L;
 // 机器ID最大值 
    private long maxWorkerId = -1L ^ (-1L << workerIdBits);
 // 数据中心ID最大值 
    private long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
 // 毫秒内自增位 
    private long sequenceBits = 12L;
 // 机器ID偏左移12位 
    private long workerIdShift = sequenceBits;
 // 数据中心ID左移17位 
    private long datacenterIdShift = sequenceBits + workerIdBits;
 // 时间毫秒左移22位
    private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    private long sequenceMask = -1L ^ (-1L << sequenceBits);
 
    private long lastTimestamp = -1L;
 
    public SnowFlake(long workerId, long datacenterId) {
        // sanity check for workerId
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
        }
        this.workerId = workerId;
        this.datacenterId = datacenterId;
        //LOG.info(String.format("worker starting. timestamp left shift %d, datacenter id bits %d, worker id bits %d, sequence bits %d, workerid %d", timestampLeftShift, datacenterIdBits, workerIdBits, sequenceBits, workerId));
    }
 
    public synchronized long nextId() {
        long timestamp = timeGen();
 
        if (timestamp < lastTimestamp) {
            LOG.error(String.format("clock is moving backwards.  Rejecting requests until %d.", lastTimestamp));
            throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }
 
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }
 
        lastTimestamp = timestamp;
 
        return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence;
    }
 
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }
 
    protected long timeGen() {
        return System.currentTimeMillis();
    }
    
    public static long getId() {
       return instance.nextId();
    }
}

接着封装一下我们的 Service 方法

public interface ISnowFlakeService {

   long getSnowFlakeID();
   
   long[] getSnowFlakeIDs(int size);
}

写一个 service 的实现类

@Service
public class SnowFlakeServiceImpl implements ISnowFlakeService {

    private final  static Logger log = LoggerFactory.getLogger(SnowFlakeServiceImpl.class);

   @Override
   public long getSnowFlakeID() {
        long id = SnowFlake.getId();
        log.info("id: {}", id);
        return id;
   }

   @Override
   public long[] getSnowFlakeIDs(int size) {
      if (size < 1) {
         throw new SnowFlakeCustomException(500, " size is illegal");
      }
      long[] ids = new long[size];
      for (int i = 0; i < size; i++) {
            long id = SnowFlake.getId();
         ids[i] = id;
            log.info("id: {}", id);
      }
      return ids;
   }

}

写个 Controller 测试一下是否能生成 id

@RestController
public class TestController {
   
   private static final Logger log = LoggerFactory.getLogger(TestController.class);

   @GetMapping("/test")
   public String test() {
      return "hello";
   }
   
   @GetMapping("/test/longid")
   public String testId() {
      String res = null;
      for(int i = 0; i < 5; i++) {
         Long id = SnowFlake.getId();
         log.info("id: {}", id);
         if (i == 0) {
            res = id.toString();
         }
      }
      return res;
   }
}

测试 test/longid 接口,看看是否能正常获取 id。能获取 id 就到了提供 dubbo RPC 服务。
首先,我们是有一个 通用 common 的 jar 包,里面定义了一些对外的 RPC 接口规范,例如这里就定义了 ISnowFlakeServiceApi 接口,这样就能管理好服务方的入参类型 和 返回值类型。

public interface ISnowFlakeServiceApi {

    long getSnowFlakeID();

    long[] getSnowFlakeIDs(int size);

}

然后我们在 snowFlakeDemo ,雪花算法实现的 demo 里面,对外提供服务时要实现这个接口

@Service(
        version = "1.0.0",
        application = "${dubbo.application.id}",
        protocol = "${dubbo.protocol.id}",
        registry = "${dubbo.registry.id}"
)
public class SnowFlakeProvider implements ISnowFlakeServiceApi {

    @Autowired
    private ISnowFlakeService snowFlakeService;

    @Override
    public long getSnowFlakeID(){
        return snowFlakeService.getSnowFlakeID();
    }

    @Override
    public long[] getSnowFlakeIDs(int size){
        return snowFlakeService.getSnowFlakeIDs(size);
    }
}

注意: 这里的 @Service 注解使用 import com.alibaba.dubbo.config.annotation.Service;

我们的调用方 rabbitmq-common 项目里面,也是要依赖 common 的

<dependency>
    <groupId>com.hmily.dubbo</groupId>
    <artifactId>common</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

看看 application.properties 的配置

server.port=8030
server.servlet.context-path=/

spring.http.encoding.charset=UTF-8
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

# Dubbo Config properties
dubbo.application.id=rabbitmq-common
dubbo.application.name=rabbitmq-common
dubbo.application.qosPort=22212
dubbo.application.qosEnable=true
dubbo.scan.basePackages=com.hmily.rabbitmq.rabbitmqcommon.*
dubbo.protocol.id=dubbo
dubbo.protocol.name=dubbo
dubbo.protocol.port=12343
dubbo.registry.id=rabbitmq-common-registry
dubbo.registry.address=zookeeper://130.80.151.179:2181

# Enables Dubbo All Endpoints
management.endpoint.dubbo.enabled = true
management.endpoint.dubbo-shutdown.enabled = true
management.endpoint.dubbo-configs.enabled = true
management.endpoint.dubbo-services.enabled = true
management.endpoint.dubbo-references.enabled = true
management.endpoint.dubbo-properties.enabled = true

# Dubbo Health
## StatusChecker Name defaults (default : "memory", "load" )
management.health.dubbo.status.defaults = memory
## StatusChecker Name extras (default : empty )
management.health.dubbo.status.extras = load,threadpool

写一个测试接口,看看能否调用 RPC 接口成功

@RestController
public class TestController {
   
   private static final Logger log = LoggerFactory.getLogger(TestController.class);

    @Reference(version = "${snowFlakeServiceApi.version}",
            application = "${dubbo.application.id}",
            interfaceName = "com.hmily.dubbo.common.service.ISnowFlakeServiceApi",
            check = false,
            timeout = 1000,
            retries = 0
    )
    private ISnowFlakeServiceApi snowFlakeServiceApi;


    @GetMapping("/test/longid/rpc")
    public String testIdByRPC() {
        Long id = snowFlakeServiceApi.getSnowFlakeID();
        log.info("id: {}", id);
        return id.toString();

    }
}

就这样,SnowFlake 全局唯一 id 生成服务 完成了。

完整代码:
https://github.com/hmilyos/common.git 

https://github.com/hmilyos/snowFlakeDemo.git

https://github.com/hmilyos/rabbitmq-common.git       available 分支
上一篇下一篇

猜你喜欢

热点阅读