uid-generator

2020-08-15  本文已影响0人  Q南南南Q

UidGenerator 是 Java 实现的, 基于 Snowflake 算法的唯一ID生成器。UidGenerator 以组件形式工作在应用项目中,支持自定义workerId位数和初始化策略, 从而适用于 docker 等虚拟化环境下实例自动重启、漂移等场景。在实现上,UidGenerator 通过借用未来时间来解决 sequence 天然存在的并发限制; 采用 RingBuffer 来缓存已生成的 UID, 并行化 UID 的生产和消费,同时对 CacheLine 补齐,避免了由 RingBuffer 带来的硬件级「伪共享」问题. 最终单机 QPS 可达 <font color=red>600万</font>。

Snowflake 算法

snowflake

Snowflake算法描述:指定机器 & 同一时刻 & 某一并发序列,是唯一的。据此可生成一个64 bits的唯一ID(long)。默认采用上图字节分配方式:

以上参数均可通过Spring进行自定义

快速集成

这里介绍如何在基于Spring的项目中使用UidGenerator, 具体流程如下:

步骤1: 安装依赖

<!-- uid-generator -->
<dependency>
    <groupId>com.saicmotor</groupId>
    <artifactId>sp-uid-generator</artifactId>
    <version>1.0.0</version>
</dependency>

步骤2: 创建表WORKER_NODE

运行sql脚本以导入表 WORKER_NODE, 脚本如下:

DROP DATABASE IF EXISTS `xxxx`;
CREATE DATABASE `xxxx` ;
use `xxxx`;
DROP TABLE IF EXISTS WORKER_NODE;
CREATE TABLE WORKER_NODE
(
ID BIGINT NOT NULL AUTO_INCREMENT COMMENT 'auto increment id',
HOST_NAME VARCHAR(64) NOT NULL COMMENT 'host name',
PORT VARCHAR(64) NOT NULL COMMENT 'port',
TYPE INT NOT NULL COMMENT 'node type: ACTUAL or CONTAINER',
LAUNCH_DATE DATE NOT NULL COMMENT 'launch date',
MODIFIED TIMESTAMP NOT NULL COMMENT 'modified time',
CREATED TIMESTAMP NOT NULL COMMENT 'created time',
PRIMARY KEY(ID)
)
 COMMENT='DB WorkerID Assigner for UID Generator',ENGINE = INNODB;

修改 mysql.properties 配置中, jdbc.url, jdbc.username和jdbc.password, 确保库地址, 名称, 端口号, 用户名和密码正确.

步骤3: 修改Spring配置

提供了两种生成器: DefaultUidGeneratorCachedUidGenerator。如对UID生成性能有要求, 请使用CachedUidGenerator,对应配置为:

## 时间增量值占用位数。当前时间相对于时间基点的增量值,单位为秒
sp-uid-generator.time-bits=31
## 工作机器 ID 占用的位数
sp-uid-generator.worker-bits=23
## 序列号占用的位数
sp-uid-generator.seq-bits=9
## 时间基点. 例如 2020-08-01 (毫秒: 1550592000000)
sp-uid-generator.epoch-str=2020-07-30
## 是否容忍时钟回拨, 默认:true
sp-uid-generator.enable-backward=true
## 时钟回拨最长容忍时间
sp-uid-generator.max-backward-seconds=1
## RingBuffer 扩容指数
sp-uid-generator.boost-power=3
## 填充 RingBuffer solt 的百分比数
sp-uid-generator.padding-factor=50
## 定时填充 RingBuffer 任务时间间隔
sp-uid-generator.schedule-interval=

步骤4: 运行示例单测

@RestController
public class TestController {

    /**
     * 若使用 DefaultUidGenerator,则配置 @Resource(name = "defaultUidGenerator")
     * 若使用 CachedUidGenerator,则配置 @Resource(name = "cachedUidGenerator")
     */
    @Resource(name = "defaultUidGenerator")
    private UidGenerator uidGenerator;

    @RequestMapping("index")
    public String index() {
        // Generate UID
        long uid = uidGenerator.getUID();
        // Parse UID into [Timestamp, WorkerId, Sequence]
        // {"UID":"180363646902239241","parsed":{    "timestamp":"2017-01-19 12:15:46",    "workerId":"4",    "sequence":"9"        }}
        return uidGenerator.parseUID(uid);
    }
}

DefaultUidGenerator 解析

DefaultUidGenerator 是 Snowflake 算法的默认实现,可以自定义 delta seconds、worker nodeid 和 sequence 的位数,并由 BitsAllocator 类生成 UID

一、DefaultUidGenerator 初始化

DefaultUidGenerator 实例生成后调用 DefaultUidGenerator#afterPropertiesSet 方法进行初始化工作,代码如下所示:

// 1 initialize bits allocator
bitsAllocator = new BitsAllocator(timeBits, workerBits, seqBits);
// 2 initialize worker id
// 分配 workId,此处由数据库实现,分配的 workId 不能超过 workId 的最大值
workerId = workerIdAssigner.assignWorkerId();
if (workerId > bitsAllocator.getMaxWorkerId()) {
    throw new RuntimeException("Worker id " + workerId + " exceeds the max " + bitsAllocator.getMaxWorkerId());
}

DefaultUidGenerator 的初始化工作主要做以下两件事:

  1. 初始化 BitsAllocator
  2. 初始化 workerId

1.1 初始化 BitsAllocator

/** Bits allocate */
protected int timeBits = 28;
protected int workerBits = 22;
protected int seqBits = 13;

/** Customer epoch, unit as second. For example 2016-05-20 (ms: 1463673600000)*/
protected String epochStr = "2016-05-20";
protected long epochSeconds = TimeUnit.MILLISECONDS.toSeconds(1463673600000L);

public BitsAllocator(int timestampBits, int workerIdBits, int sequenceBits) {
    // 1 make sure allocated 64 bits
    int allocateTotalBits = signBits + timestampBits + workerIdBits + sequenceBits;
    Assert.isTrue(allocateTotalBits == TOTAL_BITS, "allocate not enough 64 bits");

    // 2 initialize bits
    this.timestampBits = timestampBits;
    this.workerIdBits = workerIdBits;
    this.sequenceBits = sequenceBits;

    // 3 initialize max value (~n = -(n + 1))
    this.maxDeltaSeconds = ~(-1L << timestampBits);
    this.maxWorkerId = ~(-1L << workerIdBits);
    this.maxSequence = ~(-1L << sequenceBits);

    // 4 initialize shift
    this.timestampShift = workerIdBits + sequenceBits;
    this.workerIdShift = sequenceBits;
}

BitsAllocator 的初始化工作总共分为四步:

  1. 校验 UID 是否满足 64 bits
  2. 初始化除 sign 位的其他标记位的长度
  3. 计算 DeltaSeconds、WorkerId 和 Sequence 的最大值,此处应用位运算进行快速计算,若 sequenceBits = 13,则 ~(-1L << sequenceBits) = ~(-8192)= 8191
  4. 计算 timestampBits 和 workerIdBits 的偏移量,为后面的位运算做基础

1.2 初始化 workerId

@Transactional
public long assignWorkerId() {
    // build worker node entity
    WorkerNodeEntity workerNodeEntity = buildWorkerNode();

    // add worker node for new (ignore the same IP + PORT)
    workerNodeDAO.addWorkerNode(workerNodeEntity);
    LOGGER.info("Add worker node:" + workerNodeEntity);

    return workerNodeEntity.getId();
}

二、生成 UID

protected synchronized long nextId() {
    long currentSecond = getCurrentSecond();

    // Clock moved backwards, refuse to generate uid
    if (currentSecond < lastSecond) {
        long refusedSeconds = lastSecond - currentSecond;
        throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
    }

    // At the same second, increase sequence
    if (currentSecond == lastSecond) {
        sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
        // Exceed the max sequence, we wait the next second to generate uid
        if (sequence == 0) {
            currentSecond = getNextSecond(lastSecond);
        }

    // At the different second, sequence restart from zero
    } else {
        sequence = 0L;
    }

    lastSecond = currentSecond;

    // Allocate bits for UID
    return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
}

nextId() 方法具体处理流程如下:

  1. 获取当前时间,单位是秒

    private long getCurrentSecond() {
        long currentSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
        // 以 epochSeconds 为基准,若当前时间超过 DeltaSeconds 的最大值,则无法再生成 UID
        if (currentSecond - epochSeconds > bitsAllocator.getMaxDeltaSeconds()) {
            throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond);
        }
        return currentSecond;
    }
    
  2. 若在同一秒内,表示在同一秒内的并发请求,则增加 sequence 的值,若 sequence 的值为 0,表示该秒内的并发请求已达到最大值(默认 sequence 的最大值为 2 ^ 13 = 8192,即当 sequenceBits = 13 时,同一秒内只能生成 8192 个 UID),只能等待下一秒再生成 UID

    // At the same second, increase sequence
    if (currentSecond == lastSecond) {
        sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
        // Exceed the max sequence, we wait the next second to generate uid
        if (sequence == 0) {
            currentSecond = getNextSecond(lastSecond);
        }
    // At the different second, sequence restart from zero
    } else {
        sequence = 0L;
    }
    
  3. 调用 BitsAllocator#allocate 方法生成 UID

    public long allocate(long deltaSeconds, long workerId, long sequence) {
        return (deltaSeconds << timestampShift) | (workerId << workerIdShift) | sequence;
    }
    

CachedUidGenerator 解析

CachedUidGenerator 通过借用未来时间来解决 sequence 天然存在的并发限制; 采用 RingBuffer 来缓存已生成的 UID, 并行化 UID 的生产和消费, 同时对CacheLine 补齐,避免了由 RingBuffer 带来的硬件级「伪共享」问题. 最终单机 QPS 可达 600万

一、RingBuffer

RingBuffer 环形数组,数组每个元素成为一个slot 。RingBuffer 容量默认为 Snowflake 算法中 sequence 最大值,且为2^N。可通过boostPower配置进行扩容,以提高RingBuffer 读写吞吐量。

Tail 指针、Cursor 指针用于环形数组上读写 slot:

ringbuffer

CachedUidGenerator 采用了双 RingBuffer,Uid-RingBuffer 用于存储Uid、Flag-RingBuffer 用于存储Uid状态(是否可填充、是否可消费)

RingBuffer填充时机

如何解决 sequence 天然存在的并发限制?

因为 DeltaSeconds 部分是以秒为单位的,所以 1 个 worker 1 秒内最多生成的 UID 数为 8192个。从上可知,支持的最大 QPS 为 8192,所以通过缓存 UID 来提高吞吐量。当 1 秒获取 UID 数多于 8192 时,RingBuffer 中的 UID 很快消耗完毕,在填充 RingBuffer 时,生成的 UID 的 DeltaSeconds 部分只能使用未来的时间。(因为使用了未来的时间来生成 UID,所以上面说的是,当 delta seconds 为 28 bits 时,最多可支持约 8.7年

二、伪共享问题

由于数组元素在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题,为此在Tail、Cursor指针、Flag-RingBuffer中采用了 CacheLine 补齐方式。

cacheline_padding
**
 * Represents a padded {@link AtomicLong} to prevent the FalseSharing problem<p>
 * 
 * The CPU cache line commonly be 64 bytes, here is a sample of cache line after padding:<br>
 * 64 bytes = 8 bytes (object reference) + 6 * 8 bytes (padded long) + 8 bytes (a long value)
 * 
 * @author yutianbao
 */
public class PaddedAtomicLong extends AtomicLong {
    private static final long serialVersionUID = -3415778863941386253L;

    /** Padded 6 long (48 bytes) */
    public volatile long p1, p2, p3, p4, p5, p6 = 7L;

    /**
     * Constructors from {@link AtomicLong}
     */
    public PaddedAtomicLong() {
        super();
    }

    public PaddedAtomicLong(long initialValue) {
        super(initialValue);
    }

    /**
     * To prevent GC optimizations for cleaning unused padded references
     */
    public long sumPaddingToPreventOptimization() {
        return p1 + p2 + p3 + p4 + p5 + p6;
    }

}

三、RingBuffer 初始化

private static final int START_POINT = -1;
/** 表示 slot 可以放置一个 UID */
private static final long CAN_PUT_FLAG = 0L;
/** 表示 solt 可以消费一个 UID */
private static final long CAN_TAKE_FLAG = 1L;
/** 默认填充 RingBuffer solt 的百分比数 */
public static final int DEFAULT_PADDING_PERCENT = 50;

/** The size of RingBuffer's slots, each slot hold a UID */
private final int bufferSize;
private final long indexMask;
private final long[] slots;
private final PaddedAtomicLong[] flags;

/** Tail: last position sequence to produce */
private final AtomicLong tail = new PaddedAtomicLong(START_POINT);

/** Cursor: current position sequence to consume */
private final AtomicLong cursor = new PaddedAtomicLong(START_POINT);

/**
 * Threshold for trigger padding buffer
 * 检查剩余可用 slot 量 (tail - cursor),如小于设定阈值 ( paddingThreshold ),则补全空闲 slots
 * paddingThreshold = {@link #bufferSize} * paddingFactor / 100
 */
private final int paddingThreshold; 

public RingBuffer(int bufferSize, int paddingFactor) {
    this.bufferSize = bufferSize;
    this.indexMask = bufferSize - 1;
    this.slots = new long[bufferSize];
    // 初始化标记位为 CAN_PUT_FLAG,表示该 solt 可以放置 UID
    this.flags = initFlags(bufferSize);
    this.paddingThreshold = bufferSize * paddingFactor / 100;
}

private PaddedAtomicLong[] initFlags(int bufferSize) {
    PaddedAtomicLong[] flags = new PaddedAtomicLong[bufferSize];
    for (int i = 0; i < bufferSize; i++) {
        flags[i] = new PaddedAtomicLong(CAN_PUT_FLAG);
    }
    return flags;
}

四、RingBuffer 填充

4.1 初始化预填充

RingBuffer 初始化时,预先填充满整个 RingBuffer,调用 BufferPaddingExecutor#paddingBuffer() 方法

public void paddingBuffer() {
    LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);

    // is still running
    if (!running.compareAndSet(false, true)) {
        LOGGER.info("Padding buffer is still running. {}", ringBuffer);
        return;
    }

    // fill the rest slots until to catch the cursor
    boolean isFullRingBuffer = false;
    while (!isFullRingBuffer) {
        List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
        for (Long uid : uidList) {
            isFullRingBuffer = !ringBuffer.put(uid);
            if (isFullRingBuffer) {
                break;
            }
        }
    }

    // not running now
    running.compareAndSet(true, false);
    LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
}

BufferedUidProvider 调用 provide() 方法生成当前秒下的 UID,具体实现方法参见 CachedUidGenerator#nextIdsForOneSecond

protected List<Long> nextIdsForOneSecond(long currentSecond) {
    // Initialize result list size of (max sequence + 1)
    int listSize = (int) bitsAllocator.getMaxSequence() + 1;
    List<Long> uidList = new ArrayList<>(listSize);

    // Allocate the first sequence of the second, the others can be calculated with the offset
    long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
    for (int offset = 0; offset < listSize; offset++) {
        uidList.add(firstSeqUid + offset);
    }
    return uidList;
}

4.2 即时填充

RingBuffer#take() 方法消费 UID 时,及时检查剩余可用 slot 量 (tail - cursor),如小于设定阈值 ( paddingThreshold ),则补全空闲 slots

// 若剩余可用 slot 量小于设定阈值,则调用 BufferPaddingExecutor.asyncPadding() 方法补全空闲 slots
if (currentTail - nextCursor < paddingThreshold) {
    LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
            nextCursor, currentTail - nextCursor);
    bufferPaddingExecutor.asyncPadding();
}

BufferPaddingExecutor#asyncPadding 异步调用 paddingBuffer() 方法,具体过程参见 4.1

五、插入 UID

public synchronized boolean put(long uid) {
    long currentTail = tail.get();
    long currentCursor = cursor.get();

    // tail catches the cursor, means that you can't put any cause of RingBuffer is full
    long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
    // 如果 RingBuffer 已满,则不能插入 uid,返回 false
    if (distance == bufferSize - 1) {
        rejectedPutHandler.rejectPutBuffer(this, uid);
        return false;
    }

    /* 1. pre-check whether the flag is CAN_PUT_FLAG */
    // 获取下一个 uid 的插入位置 index
    int nextTailIndex = calSlotIndex(currentTail + 1);

    // 若 RingBuffer 对应 slot 的 flag 不为 CAN_PUT_FLAG,则拒绝插入 uid
    if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
        rejectedPutHandler.rejectPutBuffer(this, uid);
        return false;
    }

    // 2. put UID in the next slot
    slots[nextTailIndex] = uid;
    // 3. update next slot' flag to CAN_TAKE_FLAG
    flags[nextTailIndex].set(CAN_TAKE_FLAG);
    // 4. publish tail with sequence increase by one
    tail.incrementAndGet();

    // The atomicity of operations above, guarantees by 'synchronized'. In another word,
    // the take operation can't consume the UID we just put, until the tail is published(tail.incrementAndGet())
    return true;
}

六、消费 UID

public long take() {
    // spin get next available cursor
    long currentCursor = cursor.get();
    // 获取可以读取的 uid 位置,即 Cursor
    // 若 tail == cursor,则表示没有可以消费的 uid,nextCursor = currentCursor,保持不变;否则 nextCursor = currentCursor + 1
    long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);

    // check for safety consideration, it never occurs
    Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");

    // trigger padding in an async-mode if reach the threshold
    long currentTail = tail.get();
    // 若剩余可用 slot 量小于设定阈值,则调用 BufferPaddingExecutor.asyncPadding() 方法补全空闲 slots
    if (currentTail - nextCursor < paddingThreshold) {
        LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
                nextCursor, currentTail - nextCursor);
        bufferPaddingExecutor.asyncPadding();
    }

    // cursor catch the tail, means that there is no more available UID to take
    // 若 nextCursor == currentCursor,则表示当前 RingBuffer 没有可消费的 uid,执行 RejectedTakeHandler 的拒绝策略
    if (nextCursor == currentCursor) {
        rejectedTakeHandler.rejectTakeBuffer(this);
    }

    // 计算 nextCursor 在 RingBuffer 中的 index
    int nextCursorIndex = calSlotIndex(nextCursor);
    // check next slot flag is CAN_TAKE_FLAG
    Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");

    // 2. get UID from next slot
    long uid = slots[nextCursorIndex];
    // 3. set next slot flag as CAN_PUT_FLAG.
    flags[nextCursorIndex].set(CAN_PUT_FLAG);

    // Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the
    // slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring
    return uid;
}

关于UID比特分配的建议

对于并发数要求不高、期望长期使用的应用, 可增加timeBits位数, 减少seqBits位数. 例如节点采取用完即弃的WorkerIdAssigner策略, 重启频率为12次/天,
那么配置成{"workerBits":23,"timeBits":31,"seqBits":9}时, 可支持28个节点以整体并发量14400 UID/s的速度持续运行68年.

对于节点重启频率频繁、期望长期使用的应用, 可增加workerBitstimeBits位数, 减少seqBits位数. 例如节点采取用完即弃的WorkerIdAssigner策略, 重启频率为24*12次/天,
那么配置成{"workerBits":27,"timeBits":30,"seqBits":6}时, 可支持37个节点以整体并发量2400 UID/s的速度持续运行34年.

吞吐量测试

在MacBook Pro(2.7GHz Intel Core i5, 8G DDR3)上进行了CachedUidGenerator(单实例)的UID吞吐量测试.

首先固定住workerBits为任选一个值(如20), 分别统计timeBits变化时(如从25至32, 总时长分别对应1年和136年)的吞吐量, 如下表所示:

timeBits 25 26 27 28 29 30 31 32
throughput 6,831,465 7,007,279 6,679,625 6,499,205 6,534,971 7,617,440 6,186,930 6,364,997
throughput1

再固定住timeBits为任选一个值(如31), 分别统计workerBits变化时(如从20至29, 总重启次数分别对应1百万和500百万)的吞吐量, 如下表所示:

workerBits 20 21 22 23 24 25 26 27 28 29
throughput 6,186,930 6,642,727 6,581,661 6,462,726 6,774,609 6,414,906 6,806,266 6,223,617 6,438,055 6,435,549
throughput2

由此可见, 不管如何配置, CachedUidGenerator总能提供600万/s的稳定吞吐量, 只是使用年限会有所减少. 这真的是太棒了.

最后, 固定住workerBits和timeBits位数(如23和31), 分别统计不同数目(如1至8,本机CPU核数为4)的UID使用者情况下的吞吐量,

workerBits 1 2 3 4 5 6 7 8
throughput 6,462,726 6,542,259 6,077,717 6,377,958 7,002,410 6,599,113 7,360,934 6,490,969
throughput3

参考文章

伪共享

https://www.iteye.com/blog/coderplay-1486649

https://www.cnblogs.com/cyfonly/p/5800758.html

Java 对象占用内存

https://www.ibm.com/developerworks/java/library/j-codetoheap/index.html

Cache Line

https://www.jianshu.com/p/008cd09fcb67

uid generator 源码分析

https://www.cnblogs.com/yeyang/p/10226284.html

上一篇下一篇

猜你喜欢

热点阅读