uid-generator
UidGenerator 是 Java 实现的, 基于 Snowflake 算法的唯一ID生成器。UidGenerator 以组件形式工作在应用项目中,支持自定义workerId位数和初始化策略, 从而适用于 docker 等虚拟化环境下实例自动重启、漂移等场景。在实现上,UidGenerator 通过借用未来时间来解决 sequence 天然存在的并发限制; 采用 RingBuffer 来缓存已生成的 UID, 并行化 UID 的生产和消费,同时对 CacheLine 补齐,避免了由 RingBuffer 带来的硬件级「伪共享」问题. 最终单机 QPS 可达 <font color=red>600万</font>。
Snowflake 算法
snowflakeSnowflake算法描述:指定机器 & 同一时刻 & 某一并发序列,是唯一的。据此可生成一个64 bits的唯一ID(long)。默认采用上图字节分配方式:
-
sign(1bit)
固定1bit符号标识,即生成的UID为正数。 -
delta seconds (28 bits)
当前时间,相对于时间基点"2016-05-20"的增量值,单位:秒,最多可支持约8.7年 -
worker id (22 bits)
机器id,最多可支持约420w次机器启动。内置实现为在启动时由数据库分配,默认分配策略为用后即弃,后续可提供复用策略。 -
sequence (13 bits)
每秒下的并发序列,13 bits可支持每秒8192个并发。
以上参数均可通过Spring进行自定义
快速集成
这里介绍如何在基于Spring的项目中使用UidGenerator, 具体流程如下:
步骤1: 安装依赖
<!-- uid-generator -->
<dependency>
<groupId>com.saicmotor</groupId>
<artifactId>sp-uid-generator</artifactId>
<version>1.0.0</version>
</dependency>
步骤2: 创建表WORKER_NODE
运行sql脚本以导入表 WORKER_NODE, 脚本如下:
DROP DATABASE IF EXISTS `xxxx`;
CREATE DATABASE `xxxx` ;
use `xxxx`;
DROP TABLE IF EXISTS WORKER_NODE;
CREATE TABLE WORKER_NODE
(
ID BIGINT NOT NULL AUTO_INCREMENT COMMENT 'auto increment id',
HOST_NAME VARCHAR(64) NOT NULL COMMENT 'host name',
PORT VARCHAR(64) NOT NULL COMMENT 'port',
TYPE INT NOT NULL COMMENT 'node type: ACTUAL or CONTAINER',
LAUNCH_DATE DATE NOT NULL COMMENT 'launch date',
MODIFIED TIMESTAMP NOT NULL COMMENT 'modified time',
CREATED TIMESTAMP NOT NULL COMMENT 'created time',
PRIMARY KEY(ID)
)
COMMENT='DB WorkerID Assigner for UID Generator',ENGINE = INNODB;
修改 mysql.properties 配置中, jdbc.url, jdbc.username和jdbc.password, 确保库地址, 名称, 端口号, 用户名和密码正确.
步骤3: 修改Spring配置
提供了两种生成器: DefaultUidGenerator、CachedUidGenerator。如对UID生成性能有要求, 请使用CachedUidGenerator,对应配置为:
## 时间增量值占用位数。当前时间相对于时间基点的增量值,单位为秒
sp-uid-generator.time-bits=31
## 工作机器 ID 占用的位数
sp-uid-generator.worker-bits=23
## 序列号占用的位数
sp-uid-generator.seq-bits=9
## 时间基点. 例如 2020-08-01 (毫秒: 1550592000000)
sp-uid-generator.epoch-str=2020-07-30
## 是否容忍时钟回拨, 默认:true
sp-uid-generator.enable-backward=true
## 时钟回拨最长容忍时间
sp-uid-generator.max-backward-seconds=1
## RingBuffer 扩容指数
sp-uid-generator.boost-power=3
## 填充 RingBuffer solt 的百分比数
sp-uid-generator.padding-factor=50
## 定时填充 RingBuffer 任务时间间隔
sp-uid-generator.schedule-interval=
步骤4: 运行示例单测
@RestController
public class TestController {
/**
* 若使用 DefaultUidGenerator,则配置 @Resource(name = "defaultUidGenerator")
* 若使用 CachedUidGenerator,则配置 @Resource(name = "cachedUidGenerator")
*/
@Resource(name = "defaultUidGenerator")
private UidGenerator uidGenerator;
@RequestMapping("index")
public String index() {
// Generate UID
long uid = uidGenerator.getUID();
// Parse UID into [Timestamp, WorkerId, Sequence]
// {"UID":"180363646902239241","parsed":{ "timestamp":"2017-01-19 12:15:46", "workerId":"4", "sequence":"9" }}
return uidGenerator.parseUID(uid);
}
}
DefaultUidGenerator 解析
DefaultUidGenerator 是 Snowflake 算法的默认实现,可以自定义 delta seconds、worker nodeid 和 sequence 的位数,并由 BitsAllocator 类生成 UID
一、DefaultUidGenerator 初始化
DefaultUidGenerator 实例生成后调用 DefaultUidGenerator#afterPropertiesSet 方法进行初始化工作,代码如下所示:
// 1 initialize bits allocator
bitsAllocator = new BitsAllocator(timeBits, workerBits, seqBits);
// 2 initialize worker id
// 分配 workId,此处由数据库实现,分配的 workId 不能超过 workId 的最大值
workerId = workerIdAssigner.assignWorkerId();
if (workerId > bitsAllocator.getMaxWorkerId()) {
throw new RuntimeException("Worker id " + workerId + " exceeds the max " + bitsAllocator.getMaxWorkerId());
}
DefaultUidGenerator 的初始化工作主要做以下两件事:
- 初始化 BitsAllocator
- 初始化 workerId
1.1 初始化 BitsAllocator
/** Bits allocate */
protected int timeBits = 28;
protected int workerBits = 22;
protected int seqBits = 13;
/** Customer epoch, unit as second. For example 2016-05-20 (ms: 1463673600000)*/
protected String epochStr = "2016-05-20";
protected long epochSeconds = TimeUnit.MILLISECONDS.toSeconds(1463673600000L);
public BitsAllocator(int timestampBits, int workerIdBits, int sequenceBits) {
// 1 make sure allocated 64 bits
int allocateTotalBits = signBits + timestampBits + workerIdBits + sequenceBits;
Assert.isTrue(allocateTotalBits == TOTAL_BITS, "allocate not enough 64 bits");
// 2 initialize bits
this.timestampBits = timestampBits;
this.workerIdBits = workerIdBits;
this.sequenceBits = sequenceBits;
// 3 initialize max value (~n = -(n + 1))
this.maxDeltaSeconds = ~(-1L << timestampBits);
this.maxWorkerId = ~(-1L << workerIdBits);
this.maxSequence = ~(-1L << sequenceBits);
// 4 initialize shift
this.timestampShift = workerIdBits + sequenceBits;
this.workerIdShift = sequenceBits;
}
BitsAllocator 的初始化工作总共分为四步:
- 校验 UID 是否满足 64 bits
- 初始化除 sign 位的其他标记位的长度
- 计算 DeltaSeconds、WorkerId 和 Sequence 的最大值,此处应用位运算进行快速计算,若 sequenceBits = 13,则 ~(-1L << sequenceBits) = ~(-8192)= 8191
- 计算 timestampBits 和 workerIdBits 的偏移量,为后面的位运算做基础
1.2 初始化 workerId
@Transactional
public long assignWorkerId() {
// build worker node entity
WorkerNodeEntity workerNodeEntity = buildWorkerNode();
// add worker node for new (ignore the same IP + PORT)
workerNodeDAO.addWorkerNode(workerNodeEntity);
LOGGER.info("Add worker node:" + workerNodeEntity);
return workerNodeEntity.getId();
}
二、生成 UID
protected synchronized long nextId() {
long currentSecond = getCurrentSecond();
// Clock moved backwards, refuse to generate uid
if (currentSecond < lastSecond) {
long refusedSeconds = lastSecond - currentSecond;
throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
}
// At the same second, increase sequence
if (currentSecond == lastSecond) {
sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
// Exceed the max sequence, we wait the next second to generate uid
if (sequence == 0) {
currentSecond = getNextSecond(lastSecond);
}
// At the different second, sequence restart from zero
} else {
sequence = 0L;
}
lastSecond = currentSecond;
// Allocate bits for UID
return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
}
nextId() 方法具体处理流程如下:
-
获取当前时间,单位是秒
private long getCurrentSecond() { long currentSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()); // 以 epochSeconds 为基准,若当前时间超过 DeltaSeconds 的最大值,则无法再生成 UID if (currentSecond - epochSeconds > bitsAllocator.getMaxDeltaSeconds()) { throw new UidGenerateException("Timestamp bits is exhausted. Refusing UID generate. Now: " + currentSecond); } return currentSecond; }
-
若在同一秒内,表示在同一秒内的并发请求,则增加 sequence 的值,若 sequence 的值为 0,表示该秒内的并发请求已达到最大值(默认 sequence 的最大值为 2 ^ 13 = 8192,即当 sequenceBits = 13 时,同一秒内只能生成 8192 个 UID),只能等待下一秒再生成 UID
// At the same second, increase sequence if (currentSecond == lastSecond) { sequence = (sequence + 1) & bitsAllocator.getMaxSequence(); // Exceed the max sequence, we wait the next second to generate uid if (sequence == 0) { currentSecond = getNextSecond(lastSecond); } // At the different second, sequence restart from zero } else { sequence = 0L; }
-
调用 BitsAllocator#allocate 方法生成 UID
public long allocate(long deltaSeconds, long workerId, long sequence) { return (deltaSeconds << timestampShift) | (workerId << workerIdShift) | sequence; }
CachedUidGenerator 解析
CachedUidGenerator 通过借用未来时间来解决 sequence 天然存在的并发限制; 采用 RingBuffer 来缓存已生成的 UID, 并行化 UID 的生产和消费, 同时对CacheLine 补齐,避免了由 RingBuffer 带来的硬件级「伪共享」问题. 最终单机 QPS 可达 600万
一、RingBuffer
RingBuffer 环形数组,数组每个元素成为一个slot 。RingBuffer 容量默认为 Snowflake 算法中 sequence 最大值,且为2^N。可通过boostPower
配置进行扩容,以提高RingBuffer 读写吞吐量。
Tail 指针、Cursor 指针用于环形数组上读写 slot:
-
Tail 指针
表示 Producer 生产的最大序号(此序号从0开始,持续递增)。Tail 不能超过 Cursor,即生产者不能覆盖未消费的 slot。当 Tail 已赶上 curosr,此时可通过rejectedPutBufferHandler
指定 PutRejectPolicy -
Cursor指针
表示 Consumer 消费到的最小序号(序号序列与 Producer 序列相同)。Cursor 不能超过 Tail,即不能消费未生产的 slot。当 Cursor 已赶上 tail,此时可通过rejectedTakeBufferHandler
指定 TakeRejectPolicy
CachedUidGenerator 采用了双 RingBuffer,Uid-RingBuffer 用于存储Uid、Flag-RingBuffer 用于存储Uid状态(是否可填充、是否可消费)
RingBuffer填充时机
- 初始化预填充
RingBuffer 初始化时,预先填充满整个 RingBuffer - 即时填充
Take 消费时,及时检查剩余可用 slot 量 (tail - cursor),如小于设定阈值 ( paddingThreshold ),则补全空闲 slots - 周期填充
通过 Schedule 线程,定时补全空闲 slots。可通过scheduleInterval
配置,以应用定时填充功能,并指定 Schedule 时间间隔
如何解决 sequence 天然存在的并发限制?
因为 DeltaSeconds 部分是以秒为单位的,所以 1 个 worker 1 秒内最多生成的 UID 数为 8192个。从上可知,支持的最大 QPS 为 8192,所以通过缓存 UID 来提高吞吐量。当 1 秒获取 UID 数多于 8192 时,RingBuffer 中的 UID 很快消耗完毕,在填充 RingBuffer 时,生成的 UID 的 DeltaSeconds 部分只能使用未来的时间。(因为使用了未来的时间来生成 UID,所以上面说的是,当 delta seconds 为 28 bits 时,最多可支持约 8.7年)
二、伪共享问题
由于数组元素在内存中是连续分配的,可最大程度利用CPU cache以提升性能。但同时会带来「伪共享」FalseSharing问题,为此在Tail、Cursor指针、Flag-RingBuffer中采用了 CacheLine 补齐方式。
cacheline_padding**
* Represents a padded {@link AtomicLong} to prevent the FalseSharing problem<p>
*
* The CPU cache line commonly be 64 bytes, here is a sample of cache line after padding:<br>
* 64 bytes = 8 bytes (object reference) + 6 * 8 bytes (padded long) + 8 bytes (a long value)
*
* @author yutianbao
*/
public class PaddedAtomicLong extends AtomicLong {
private static final long serialVersionUID = -3415778863941386253L;
/** Padded 6 long (48 bytes) */
public volatile long p1, p2, p3, p4, p5, p6 = 7L;
/**
* Constructors from {@link AtomicLong}
*/
public PaddedAtomicLong() {
super();
}
public PaddedAtomicLong(long initialValue) {
super(initialValue);
}
/**
* To prevent GC optimizations for cleaning unused padded references
*/
public long sumPaddingToPreventOptimization() {
return p1 + p2 + p3 + p4 + p5 + p6;
}
}
三、RingBuffer 初始化
private static final int START_POINT = -1;
/** 表示 slot 可以放置一个 UID */
private static final long CAN_PUT_FLAG = 0L;
/** 表示 solt 可以消费一个 UID */
private static final long CAN_TAKE_FLAG = 1L;
/** 默认填充 RingBuffer solt 的百分比数 */
public static final int DEFAULT_PADDING_PERCENT = 50;
/** The size of RingBuffer's slots, each slot hold a UID */
private final int bufferSize;
private final long indexMask;
private final long[] slots;
private final PaddedAtomicLong[] flags;
/** Tail: last position sequence to produce */
private final AtomicLong tail = new PaddedAtomicLong(START_POINT);
/** Cursor: current position sequence to consume */
private final AtomicLong cursor = new PaddedAtomicLong(START_POINT);
/**
* Threshold for trigger padding buffer
* 检查剩余可用 slot 量 (tail - cursor),如小于设定阈值 ( paddingThreshold ),则补全空闲 slots
* paddingThreshold = {@link #bufferSize} * paddingFactor / 100
*/
private final int paddingThreshold;
public RingBuffer(int bufferSize, int paddingFactor) {
this.bufferSize = bufferSize;
this.indexMask = bufferSize - 1;
this.slots = new long[bufferSize];
// 初始化标记位为 CAN_PUT_FLAG,表示该 solt 可以放置 UID
this.flags = initFlags(bufferSize);
this.paddingThreshold = bufferSize * paddingFactor / 100;
}
private PaddedAtomicLong[] initFlags(int bufferSize) {
PaddedAtomicLong[] flags = new PaddedAtomicLong[bufferSize];
for (int i = 0; i < bufferSize; i++) {
flags[i] = new PaddedAtomicLong(CAN_PUT_FLAG);
}
return flags;
}
四、RingBuffer 填充
4.1 初始化预填充
RingBuffer 初始化时,预先填充满整个 RingBuffer,调用 BufferPaddingExecutor#paddingBuffer() 方法
public void paddingBuffer() {
LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
// is still running
if (!running.compareAndSet(false, true)) {
LOGGER.info("Padding buffer is still running. {}", ringBuffer);
return;
}
// fill the rest slots until to catch the cursor
boolean isFullRingBuffer = false;
while (!isFullRingBuffer) {
List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
for (Long uid : uidList) {
isFullRingBuffer = !ringBuffer.put(uid);
if (isFullRingBuffer) {
break;
}
}
}
// not running now
running.compareAndSet(true, false);
LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
}
BufferedUidProvider 调用 provide() 方法生成当前秒下的 UID,具体实现方法参见 CachedUidGenerator#nextIdsForOneSecond
protected List<Long> nextIdsForOneSecond(long currentSecond) {
// Initialize result list size of (max sequence + 1)
int listSize = (int) bitsAllocator.getMaxSequence() + 1;
List<Long> uidList = new ArrayList<>(listSize);
// Allocate the first sequence of the second, the others can be calculated with the offset
long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
for (int offset = 0; offset < listSize; offset++) {
uidList.add(firstSeqUid + offset);
}
return uidList;
}
4.2 即时填充
RingBuffer#take() 方法消费 UID 时,及时检查剩余可用 slot 量 (tail - cursor),如小于设定阈值 ( paddingThreshold ),则补全空闲 slots
// 若剩余可用 slot 量小于设定阈值,则调用 BufferPaddingExecutor.asyncPadding() 方法补全空闲 slots
if (currentTail - nextCursor < paddingThreshold) {
LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
nextCursor, currentTail - nextCursor);
bufferPaddingExecutor.asyncPadding();
}
BufferPaddingExecutor#asyncPadding 异步调用 paddingBuffer() 方法,具体过程参见 4.1
五、插入 UID
public synchronized boolean put(long uid) {
long currentTail = tail.get();
long currentCursor = cursor.get();
// tail catches the cursor, means that you can't put any cause of RingBuffer is full
long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
// 如果 RingBuffer 已满,则不能插入 uid,返回 false
if (distance == bufferSize - 1) {
rejectedPutHandler.rejectPutBuffer(this, uid);
return false;
}
/* 1. pre-check whether the flag is CAN_PUT_FLAG */
// 获取下一个 uid 的插入位置 index
int nextTailIndex = calSlotIndex(currentTail + 1);
// 若 RingBuffer 对应 slot 的 flag 不为 CAN_PUT_FLAG,则拒绝插入 uid
if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
rejectedPutHandler.rejectPutBuffer(this, uid);
return false;
}
// 2. put UID in the next slot
slots[nextTailIndex] = uid;
// 3. update next slot' flag to CAN_TAKE_FLAG
flags[nextTailIndex].set(CAN_TAKE_FLAG);
// 4. publish tail with sequence increase by one
tail.incrementAndGet();
// The atomicity of operations above, guarantees by 'synchronized'. In another word,
// the take operation can't consume the UID we just put, until the tail is published(tail.incrementAndGet())
return true;
}
六、消费 UID
public long take() {
// spin get next available cursor
long currentCursor = cursor.get();
// 获取可以读取的 uid 位置,即 Cursor
// 若 tail == cursor,则表示没有可以消费的 uid,nextCursor = currentCursor,保持不变;否则 nextCursor = currentCursor + 1
long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
// check for safety consideration, it never occurs
Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");
// trigger padding in an async-mode if reach the threshold
long currentTail = tail.get();
// 若剩余可用 slot 量小于设定阈值,则调用 BufferPaddingExecutor.asyncPadding() 方法补全空闲 slots
if (currentTail - nextCursor < paddingThreshold) {
LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
nextCursor, currentTail - nextCursor);
bufferPaddingExecutor.asyncPadding();
}
// cursor catch the tail, means that there is no more available UID to take
// 若 nextCursor == currentCursor,则表示当前 RingBuffer 没有可消费的 uid,执行 RejectedTakeHandler 的拒绝策略
if (nextCursor == currentCursor) {
rejectedTakeHandler.rejectTakeBuffer(this);
}
// 计算 nextCursor 在 RingBuffer 中的 index
int nextCursorIndex = calSlotIndex(nextCursor);
// check next slot flag is CAN_TAKE_FLAG
Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");
// 2. get UID from next slot
long uid = slots[nextCursorIndex];
// 3. set next slot flag as CAN_PUT_FLAG.
flags[nextCursorIndex].set(CAN_PUT_FLAG);
// Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the
// slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring
return uid;
}
关于UID比特分配的建议
对于并发数要求不高、期望长期使用的应用, 可增加timeBits
位数, 减少seqBits
位数. 例如节点采取用完即弃的WorkerIdAssigner策略, 重启频率为12次/天,
那么配置成{"workerBits":23,"timeBits":31,"seqBits":9}
时, 可支持28个节点以整体并发量14400 UID/s的速度持续运行68年.
对于节点重启频率频繁、期望长期使用的应用, 可增加workerBits
和timeBits
位数, 减少seqBits
位数. 例如节点采取用完即弃的WorkerIdAssigner策略, 重启频率为24*12次/天,
那么配置成{"workerBits":27,"timeBits":30,"seqBits":6}
时, 可支持37个节点以整体并发量2400 UID/s的速度持续运行34年.
吞吐量测试
在MacBook Pro(2.7GHz Intel Core i5, 8G DDR3)上进行了CachedUidGenerator(单实例)的UID吞吐量测试.
首先固定住workerBits为任选一个值(如20), 分别统计timeBits变化时(如从25至32, 总时长分别对应1年和136年)的吞吐量, 如下表所示:
timeBits | 25 | 26 | 27 | 28 | 29 | 30 | 31 | 32 |
---|---|---|---|---|---|---|---|---|
throughput | 6,831,465 | 7,007,279 | 6,679,625 | 6,499,205 | 6,534,971 | 7,617,440 | 6,186,930 | 6,364,997 |
再固定住timeBits为任选一个值(如31), 分别统计workerBits变化时(如从20至29, 总重启次数分别对应1百万和500百万)的吞吐量, 如下表所示:
workerBits | 20 | 21 | 22 | 23 | 24 | 25 | 26 | 27 | 28 | 29 |
---|---|---|---|---|---|---|---|---|---|---|
throughput | 6,186,930 | 6,642,727 | 6,581,661 | 6,462,726 | 6,774,609 | 6,414,906 | 6,806,266 | 6,223,617 | 6,438,055 | 6,435,549 |
由此可见, 不管如何配置, CachedUidGenerator总能提供600万/s的稳定吞吐量, 只是使用年限会有所减少. 这真的是太棒了.
最后, 固定住workerBits和timeBits位数(如23和31), 分别统计不同数目(如1至8,本机CPU核数为4)的UID使用者情况下的吞吐量,
workerBits | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |
---|---|---|---|---|---|---|---|---|
throughput | 6,462,726 | 6,542,259 | 6,077,717 | 6,377,958 | 7,002,410 | 6,599,113 | 7,360,934 | 6,490,969 |
参考文章
伪共享
https://www.iteye.com/blog/coderplay-1486649
https://www.cnblogs.com/cyfonly/p/5800758.html
Java 对象占用内存
https://www.ibm.com/developerworks/java/library/j-codetoheap/index.html
Cache Line
https://www.jianshu.com/p/008cd09fcb67