spring精选&&收录spring框架源码相关技术文章分布式分布式

分布式ID增强篇--优化时钟回拨问题

2018-03-01  本文已影响771人  阿飞的博客

原生实现

本文承接sharding-jdbc源码之分布式ID,在这篇文章中详细介绍了sharding-jdbc的分布式ID是如何实现的;很遗憾的是sharding-jdbc只是基于snowflake算法实现了如何生成分布式ID,并没有解决snowflake算法的缺点:

  1. 时钟回拨问题;
  2. 趋势递增,而不是绝对递增;
  3. 不能在一台服务器上部署多个分布式ID服务;

第2点算不上缺点,毕竟如果绝对递增的话,需要牺牲不少的性能;第3点也算不上缺点,即使一台足够大内存的服务器,在部署一个分布式ID服务后,还有很多可用的内存,可以用来部署其他的服务,不一定非得在一台服务器上部署一个服务的多个实例;可以参考elasticsearch的主分片和副本的划分规则:某个主分片的副本是不允许和主分片在同一台节点上--因为这样意思不大,如果这个分片只拥有一个副本,且这个节点宕机后,服务状态就是"RED";

所以这篇文章的主要目的是解决第1个缺点:时钟回拨问题;先来看一下sharding-jdbc的DefaultKeyGenerator.javagenerateKey()方法的源码:

@Override
public synchronized Number generateKey() {
    long currentMillis = timeService.getCurrentMillis();
    Preconditions.checkState(lastTime <= currentMillis, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds", lastTime, currentMillis);
    if (lastTime == currentMillis) {
        if (0L == (sequence = ++sequence & SEQUENCE_MASK)) {
            currentMillis = waitUntilNextTime(currentMillis);
        }
    } else {
        sequence = 0;
    }
    lastTime = currentMillis;
    if (log.isDebugEnabled()) {
        log.debug("{}-{}-{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime)), workerId, sequence);
    }
    return ((currentMillis - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (workerId << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
}

说明:从这段代码可知,sharding-jdbc并没有尝试去解决snowflake算法时钟回拨的问题,只是简单判断如果lastTime <= currentMillis不满足就抛出异常:Preconditions.checkState(lastTime <= currentMillis, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds", lastTime, currentMillis);

改进思路

snowflake算法一个很大的优点就是不需要依赖任何第三方组件,笔者想继续保留这个优点:毕竟多一个依赖的组件,多一个风险,并增加了系统的复杂性。

snowflake算法给workerId预留了10位,即workId的取值范围为[0, 1023],事实上实际生产环境不大可能需要部署1024个分布式ID服务,所以:将workerId取值范围缩小为[0, 511],[512, 1023]这个范围的workerId当做备用workerId。workId为0的备用workerId是512,workId为1的备用workerId是513,以此类推……

说明:如果你的业务真的需要512个以上分布式ID服务才能满足需求,那么不需要继续往下看了,这个方案不适合你^^;

改进实现

generateKey()方法的改进如下:

// 修改处: workerId原则上上限为1024, 但是为了每台sequence服务预留一个workerId, 所以实际上workerId上限为512
private static final long WORKER_ID_MAX_VALUE = 1L << WORKER_ID_BITS >> 1;
    
/**
  * 保留workerId和lastTime, 以及备用workerId和其对应的lastTime
  */
 private static Map<Long, Long> workerIdLastTimeMap = new ConcurrentHashMap<>();
 
/**
 * Generate key. 考虑时钟回拨, 与sharding-jdbc源码的区别就在这里</br>
 * 缺陷: 如果连续两次时钟回拨, 可能还是会有问题, 但是这种概率极低极低
 * @return key type is @{@link Long}.
 * @Author 阿飞
 */
@Override
public synchronized Number generateKey() {
    long currentMillis = System.currentTimeMillis();

    // 当发生时钟回拨时
    if (lastTime > currentMillis){
        // 如果时钟回拨在可接受范围内, 等待即可
        if (lastTime - currentMillis < MAX_BACKWARD_MS){
            try {
                Thread.sleep(lastTime - currentMillis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else {
            // 如果时钟回拨太多, 那么换备用workerId尝试

        // 当前workerId和备用workerId的值的差值为512
            long interval = 512L;
            // 发生时钟回拨时, 计算备用workerId[如果当前workerId小于512,
            // 那么备用workerId=workerId+512; 否则备用workerId=workerId-512, 两个workerId轮换用]
            if (MyKeyGenerator.workerId >= interval) {
                MyKeyGenerator.workerId = MyKeyGenerator.workerId - interval;
            } else {
                MyKeyGenerator.workerId = MyKeyGenerator.workerId + interval;
            }

            // 取得备用workerId的lastTime
            Long tempTime = workerIdLastTimeMap.get(MyKeyGenerator.workerId);
            lastTime = tempTime==null?0L:tempTime;
            // 如果在备用workerId也处于过去的时钟, 那么抛出异常
            // [这里也可以增加时钟回拨是否超过MAX_BACKWARD_MS的判断]
            Preconditions.checkState(lastTime <= currentMillis, "Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds", lastTime, currentMillis);
            // 备用workerId上也处于时钟回拨范围内的逻辑还可以优化: 比如摘掉当前节点. 运维通过监控发现问题并修复时钟回拨
        }
    }

    // 如果和最后一次请求处于同一毫秒, 那么sequence+1
    if (lastTime == currentMillis) {
        if (0L == (sequence = ++sequence & SEQUENCE_MASK)) {
            currentMillis = waitUntilNextTime(currentMillis);
        }
    } else {
        // 如果是一个更近的时间戳, 那么sequence归零
        sequence = 0;
    }

    lastTime = currentMillis;
    // 更新map中保存的workerId对应的lastTime
    workerIdLastTimeMap.put(MyKeyGenerator.workerId, lastTime);

    if (log.isDebugEnabled()) {
        log.debug("{}-{}-{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime)), workerId, sequence);
    }
    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime))
                +" -- "+workerId+" -- "+sequence+" -- "+workerIdLastTimeMap);
    return ((currentMillis - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (workerId << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
}

改进验证

Powered By 阿飞Javaer.png

第一个红色箭头的地方通过修改本地时间,通过启动了备用workerId从而避免了时钟回拨60s(window系统直接修改系统时间模拟)引起的问题;第二个红色箭头在60s内再次模拟时钟回拨,就会有问题,因为无论是workerId还是备用workerId都会有冲突;如果第二红色箭头是60s后模拟时钟回拨,依然可以避免问题,原因嘛,你懂得;

再次优化

每个workerId可以配置任意个备用workerId,由使用者去平衡sequence服务的性能及高可用,终极版代码如下:

public final class MyKeyGenerator implements KeyGenerator {

    private static final long EPOCH;
    
    private static final long SEQUENCE_BITS = 12L;
    
    private static final long WORKER_ID_BITS = 10L;
    
    private static final long SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1;
    
    private static final long WORKER_ID_LEFT_SHIFT_BITS = SEQUENCE_BITS;
    
    private static final long TIMESTAMP_LEFT_SHIFT_BITS = WORKER_ID_LEFT_SHIFT_BITS + WORKER_ID_BITS;

    /**
     * 每台workerId服务器有3个备份workerId, 备份workerId数量越多, 可靠性越高, 但是可部署的sequence ID服务越少
     */
    private static final long BACKUP_COUNT = 3;

    /**
     * 实际的最大workerId的值<br/>
     * workerId原则上上限为1024, 但是需要为每台sequence服务预留BACKUP_AMOUNT个workerId,
     */
    private static final long WORKER_ID_MAX_VALUE = (1L << WORKER_ID_BITS) / (BACKUP_COUNT + 1);

    /**
     * 目前用户生成ID的workerId
     */
    private static long workerId;
    
    static {
        Calendar calendar = Calendar.getInstance();
        calendar.set(2018, Calendar.NOVEMBER, 1);
        calendar.set(Calendar.HOUR_OF_DAY, 0);
        calendar.set(Calendar.MINUTE, 0);
        calendar.set(Calendar.SECOND, 0);
        calendar.set(Calendar.MILLISECOND, 0);
        // EPOCH是服务器第一次上线时间点, 设置后不允许修改
        EPOCH = calendar.getTimeInMillis();
    }
    
    private long sequence;
    
    private long lastTime;

    /**
     * 保留workerId和lastTime, 以及备用workerId和其对应的lastTime
     */
    private static Map<Long, Long> workerIdLastTimeMap = new ConcurrentHashMap<>();

    static {
        // 初始化workerId和其所有备份workerId与lastTime
        // 假设workerId为0且BACKUP_AMOUNT为4, 那么map的值为: {0:0L, 256:0L, 512:0L, 768:0L}
        // 假设workerId为2且BACKUP_AMOUNT为4, 那么map的值为: {2:0L, 258:0L, 514:0L, 770:0L}
        for (int i = 0; i<= BACKUP_COUNT; i++){
            workerIdLastTimeMap.put(workerId + (i * WORKER_ID_MAX_VALUE), 0L);
        }
        System.out.println("workerIdLastTimeMap:" + workerIdLastTimeMap);
    }

    /**
     * 最大容忍时间, 单位毫秒, 即如果时钟只是回拨了该变量指定的时间, 那么等待相应的时间即可;
     * 考虑到sequence服务的高性能, 这个值不易过大
     */
    private static final long MAX_BACKWARD_MS = 3;

    /**
     * Set work process id.
     * @param workerId work process id
     */
    public static void setWorkerId(final long workerId) {
        Preconditions.checkArgument(workerId >= 0L && workerId < WORKER_ID_MAX_VALUE);
        MyKeyGenerator.workerId = workerId;
    }
    
    /**
     * Generate key. 考虑时钟回拨, 与sharding-jdbc源码的区别就在这里</br>
     * 缺陷: 如果连续两次时钟回拨, 可能还是会有问题, 但是这种概率极低极低
     * @return key type is @{@link Long}.
     * @Author 阿飞
     */
    @Override
    public synchronized Number generateKey() {
        long currentMillis = System.currentTimeMillis();

        // 当发生时钟回拨时
        if (lastTime > currentMillis){
            // 如果时钟回拨在可接受范围内, 等待即可
            if (lastTime - currentMillis < MAX_BACKWARD_MS){
                try {
                    Thread.sleep(lastTime - currentMillis);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else {
                tryGenerateKeyOnBackup(currentMillis);
            }
        }

        // 如果和最后一次请求处于同一毫秒, 那么sequence+1
        if (lastTime == currentMillis) {
            if (0L == (sequence = ++sequence & SEQUENCE_MASK)) {
                currentMillis = waitUntilNextTime(currentMillis);
            }
        } else {
            // 如果是一个更近的时间戳, 那么sequence归零
            sequence = 0;
        }

        lastTime = currentMillis;
        // 更新map中保存的workerId对应的lastTime
        workerIdLastTimeMap.put(MyKeyGenerator.workerId, lastTime);

        if (log.isDebugEnabled()) {
            log.debug("{}-{}-{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime)), workerId, sequence);
        }

        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(lastTime))
                +" -- "+workerId+" -- "+sequence+" -- "+workerIdLastTimeMap);
        return ((currentMillis - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS) | (workerId << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
    }

    /**
     * 尝试在workerId的备份workerId上生成
     * @param currentMillis 当前时间
     */
    private long tryGenerateKeyOnBackup(long currentMillis){
        System.out.println("try GenerateKey OnBackup, map:" + workerIdLastTimeMap);

        // 遍历所有workerId(包括备用workerId, 查看哪些workerId可用)
        for (Map.Entry<Long, Long> entry:workerIdLastTimeMap.entrySet()){
            MyKeyGenerator.workerId = entry.getKey();
            // 取得备用workerId的lastTime
            Long tempLastTime = entry.getValue();
            lastTime = tempLastTime==null?0L:tempLastTime;

            // 如果找到了合适的workerId
            if (lastTime<=currentMillis){
                return lastTime;
            }
        }

        // 如果所有workerId以及备用workerId都处于时钟回拨, 那么抛出异常
        throw new IllegalStateException("Clock is moving backwards, current time is "
                +currentMillis+" milliseconds, workerId map = " + workerIdLastTimeMap);
    }
    
    private long waitUntilNextTime(final long lastTime) {
        long time = System.currentTimeMillis();
        while (time <= lastTime) {
            time = System.currentTimeMillis();
        }
        return time;
    }
}

核心优化代码在方法tryGenerateKeyOnBackup()中,BACKUP_COUNT即备份workerId数越多,sequence服务避免时钟回拨影响的能力越强,但是可部署的sequence服务越少,设置BACKUP_COUNT为3,最多可以部署1024/(3+1)即256个sequence服务,完全够用,抗时钟回拨影响的能力也得到非常大的保障。

改进总结

这种改进方案最大优点就是没有引入任何第三方中间件(例如redis,zookeeper等),但是避免时钟回拨能力得到极大的提高,而且时钟回拨本来就是极小概率。阿飞Javaer认为这种方案能够达到绝大部分sequence服务的需求了;

上一篇下一篇

猜你喜欢

热点阅读