看!源码Redis之listpack(stream消息存储结构)
2023-01-15 本文已影响0人
starskye
此结构在Stream中扮演着存储消息的角色。
//listpack 头部大小为 6 其中total bytes 为4个字节 numelement 为2个字节
#define LP_HDR_SIZE 6
//结束符
#define LP_EOF 0xFF
//设置total值
//p自然是 listpack的指针 而他的前四个字节存储的便是其字节的总数
//所以此处需要将一个四个字节的 int 赋值给四个byte 此处需要注意 存储方式为小端
//因为将低位存储到了 0字节 依次类推
#define lpSetTotalBytes(p,v) do { \
(p)[0] = (v)&0xff; \
(p)[1] = ((v)>>8)&0xff; \
(p)[2] = ((v)>>16)&0xff; \
(p)[3] = ((v)>>24)&0xff; \
} while(0)
//设置listpack的元素数
//与上方相同 存储为小端
#define lpSetNumElements(p,v) do { \
(p)[4] = (v)&0xff; \
(p)[5] = ((v)>>8)&0xff; \
} while(0)
//创建一个listpack
unsigned char *lpNew(void) {
//声明一个7字节的空间 其中+1 是因为 listpack有个结束符 0xFF占用一个字节
unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
//如果声明失败返回null
if (lp == NULL) return NULL;
//设置lp的total bytes初始化数据为 7
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
//设置默认element数为 0
lpSetNumElements(lp,0);
//设置结束符为 0xFF
lp[LP_HDR_SIZE] = LP_EOF;
return lp;
}
//listpack中 entry的最大编码长度为 9
#define LP_MAX_INT_ENCODING_LEN 9
//listpack中 entry长度描述最大为5个字节
#define LP_MAX_BACKLEN_SIZE 5
//前插
#define LP_BEFORE 0
//后插
#define LP_AFTER 1
//替换
#define LP_REPLACE 2
//listpack 插入数据、删除数据、替换数据 根据参数调用来确定
// lp listpack的指针 ele 新数据 size 数据长度 p 如果是插入则此数据为需要插如到的那个数据,如果是删除则为需要删除的数据
// where 需要操作的位置LP_BEFORE(前插) 、 LP_AFTER(后插) 、LP_REPLACE(替换) newp 操作完数据的下一个数据指针
//如果是删除 则ele为NULL
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) {
//首先声明当前entry的 需要编码大小 和 长度描述大小 此处backlen用于后往前遍历 所以此处长度只包含 编码长度+数据长度
unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
unsigned char backlen[LP_MAX_BACKLEN_SIZE];
//计算当前编码的长度
uint64_t enclen;
//如果ele为null说明需要删除 而此处的删除是将当前p占用的字节替换为空字节
if (ele == NULL) where = LP_REPLACE;
//如果是在之后插入 则将其转换为向前插入 这样做少些逻辑代码 值得学习
if (where == LP_AFTER) {
//首先跳转到下一个节点
p = lpSkip(p);
//设置where为before
where = LP_BEFORE;
}
//计算当前p 在lp的偏移值
unsigned long poff = p-lp;
//根据ele 计算需要的编码类型 共 9中类型
//1、 7 位的无符号整型 加上内容共占 1个字节 结构为 0xxx xxxx x是存储数据的地方 0标示位
//2、 63长度以下的字符串 结构为 10LL LLLL L是存储字符串长度的地方 10 标识位 后续便是内容
//3、 13位的有符号整型 加上内容共占 2个字节 结构为 110x xxxx xxxx xxxx x是存储数据的地方 110 标示位
//4、 4095长度以下 63 长度以上的字符串 结构为 1110 LLLL LLLL LLLL L存储字符串长度的地方 1110 标识位 后续便是L计算出长度大小的内容
//5、 4294967295 长度以下 4095 长度以上的字符串 结构为 1111 0000 后续四个字节存储 字符串长度 1111 0000 标示位
//6、 16位整型 结构为 1111 0001 也是为标示位 后续两个字节为 数据值 共 3个字节大小
//7、 24位整型 结构为 1111 0010 也是为标示位 后续三个字节为 数据值 共 4个字节大小
//8、 32位整型 结构为 1111 0100 也是为标示位 后续四个字节为 数据值 共 5个字节大小
//9、 64位整型 结构为 1111 1000 也是为标示位 后续八个字节为 数据值 共 9个字节大小 然而也是已知占用最多的类型所以intenc为9
int enctype;
if (ele) {
//便是使用上方的规则计算当前ele的类型编码 存储到 intenc中 在根据编码的类型和数据获取对应的长度 enclen
enctype = lpEncodeGetType(ele,size,intenc,&enclen);
} else {
//如果为NULL 则设置type为-1 长度为 0
enctype = -1;
enclen = 0;
}
//计算当前backlen需要占用的大小 根据enclen计算 规则如下
//1、enclen <= 127 占用 1 个字节
//2、enclen < 16383 占用 2 个字节
//3、enclen < 2097151 占用 3 个字节
//4、enclen < 268435455 占用4个字节
//5、如果不满足上方的条件则占用五个字节 并且将enclen 写入到backlen中
unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
//获取当前的字节数
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
//如果是替换则计算需要需要替换的字节数 即 encoding size + content size + backlen size 便是一个entry的总共大小
uint32_t replaced_len = 0;
if (where == LP_REPLACE) {
//解析当前p的 encoding size + content size
replaced_len = lpCurrentEncodedSize(p);
//根据获取到len 计算下backlen占用的size
replaced_len += lpEncodeBacklen(NULL,replaced_len);
}
//计算一下最新listpack需要的字节数 当前字节数 + 新ele的字节数 - 需要替换元素的字节数
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
- replaced_len;
//获取到最新的字节数后 判断当前字节数是否超过了限制 即 4294967295 因为listpack的长度描述 total bytes 是四个字节 无符号的最大数便是4294967295
//如果超过那么不在执行 返回NULL 因为程序出错
if (new_listpack_bytes > UINT32_MAX) return NULL;
//获取需要被替换的目标地址
unsigned char *dst = lp + poff;
//如果上方计算的最新大小超过了原有的大小
if (new_listpack_bytes > old_listpack_bytes) {
//那么遍进行realloc 重新分配内存空间
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
//分配完成后再次计算需要操作的目标位置
dst = lp + poff;
}
//判断当前位置 因为之前将after修改为了before所以此处只有 before和replace
if (where == LP_BEFORE) {
//如果是before操作 那么此处将 dst 位置的数据 移动到 dst+enclen+backlen_size的位置
//这样做变空出了当前ele大小的位置方便后续操作
memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
} else {
//如果是替换 那么计算当前元素大小和 原先元素大小的差 如果是正数 则往后移 如果是负数则向前移
long lendiff = (enclen+backlen_size)-replaced_len;
//dst + replacelen 便是 当前元素的下一个元素地址 然后在加上上方计算的差 这样便计算出 下一个元素需要位移的具体位置
//然后设置需要位移的元素首地址 再次计算此元素的长度 总长度-当前替换元素的偏移-当前替换元素的大小 便是后面所有元素的占用大小
memmove(dst+replaced_len+lendiff,
dst+replaced_len,
old_listpack_bytes-poff-replaced_len);
}
//如果当前listpack的大小小于了原先的大小 则需要丢弃后面空出来的字节
//因为上方已经进行了插入/替换的操作所以此处尾部的字节数 必然是多余的
if (new_listpack_bytes < old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
//根据偏移 再次计算当前元素的首地址 因为上方经过了插入 和 替换所以此处指针一定是新元素的地址
dst = lp + poff;
}
//如果需要回传新元素的地址 则设置为dst
if (newp) {
*newp = dst;
//如果是删除操作并且当前删除的元素是最后一个 那么 不存在新元素所以为NULL
if (!ele && dst[0] == LP_EOF) *newp = NULL;
}
//如果ele不为空 要么是替换 要么是 添加 但是之前都是对原先内存的移动并没有赋值
//所以此处是将新元素的具体数据赋值到内存中
if (ele) {
//如果是整型 那么直接copy即可
if (enctype == LP_ENCODING_INT) {
memcpy(dst,intenc,enclen);
} else {
//否则需要将字符串编码为指定的encoding 然后在复制
//此方法的操作是根据size编码为上方的对应类型然后memcpy拷贝到dst中
lpEncodeString(dst,ele,size);
}
//将dst偏移指定的长度
dst += enclen;
//将backlen也拷贝进dst中
memcpy(dst,backlen,backlen_size);
//dst back的长度
dst += backlen_size;
}
//如果当前操作类型不是replace 或者 ele == null
//都是需要对元素个数进行操作的 因为不是replace说明是插入 需要 ++ 如果ele为null 说明是删除需要 --
if (where != LP_REPLACE || ele == NULL) {
//获取当前元素个数
uint32_t num_elements = lpGetNumElements(lp);
//如果当前长度不是65535 即 short的最大值 无符号 那么就对num进行设置
if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
if (ele)
//如果存在ele说明是添加则 +1
lpSetNumElements(lp,num_elements+1);
else
//如果不存在说明是删除则 --
lpSetNumElements(lp,num_elements-1);
}
}
//将当前最新的bytes大小设置到lp中
lpSetTotalBytes(lp,new_listpack_bytes);
//到此处可以总结 下newp 如果是插入或者替换 那么此值为当前p的偏移首地址 如果是删除则为当前p的下一个元素的地址
return lp;
}
//获取当前listpack的第一个元素
unsigned char *lpFirst(unsigned char *lp) {
//在listpack的首地址基础上跳过头部 便是第一个元素的首地址
lp += LP_HDR_SIZE;
//如果首地址的值是0xff 那么说明list为空 直接返回null
if (lp[0] == LP_EOF) return NULL;
//否则返回首地址
return lp;
}
//获取当前listpack的最后一个元素
unsigned char *lpLast(unsigned char *lp) {
//根据当前listpack的首地址 加上 当前listpack的总字节数 便超过了当前listpack的内存空间 进行-1 回到EOF 0xff的地址
unsigned char *p = lp+lpGetTotalBytes(lp)-1;
//根据最后一个字节向前根据backlen算出前一个字节的长度 根据长度进行 p - backlen 便是最后一个元素的首地址
return lpPrev(lp,p);
}
//根据当前元素的首地址 计算前一个元素的首地址
unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
//如果当前元素的地址 减去 listpack的地址 等于6 说明当前lp为空 则不进行计算
if (p-lp == LP_HDR_SIZE) return NULL;
//不为空则跳过当前元素的首地址 到达上一个元素的backlen的位置
p--;
//根据backlen向前计算 获取到元素的长度
uint64_t prevlen = lpDecodeBacklen(p);
//当前获取到的元素长度是编码的长度+数据长度 还缺少了backlen的长度所以此处根据上方解析出的backlen在计算下backlen的长度
//从而获取到当前p 到上一个元素长度
prevlen += lpEncodeBacklen(NULL,prevlen);
//但是之前为了计算backlen所以p进行了--已经不是原来的那个位置 但是我们计算出的长度是根据原先的位置计算的
//所以此处需要+1 才是上一个元素的真正位置
return p-prevlen+1;
}
//如果需要获取下一个元素那么必须需要当前的元素
//lp listpack的指针地址 p 需要查询下个元素的指针地址
unsigned char *lpNext(unsigned char *lp, unsigned char *p) {
//此处暂时没有使用到了lp 但是在编译是会提示未使用 所以此处使用此方法欺骗编译器
((void) lp);
//next使用的是skip跳过当前p
p = lpSkip(p);
//如果跳过的结果是EOF 结束符 那么返回null 否则返回跳过结果
if (p[0] == LP_EOF) return NULL;
return p;
}
//跳过当前p 获取到下一个元素的地址
unsigned char *lpSkip(unsigned char *p) {
//首先计算当前元素占用的长度
unsigned long entrylen = lpCurrentEncodedSize(p);
//根据长度计算出back的长度
entrylen += lpEncodeBacklen(NULL,entrylen);
//根据计算出的 backlen的长度 + entrylen的长度便是下一个元素的长度
//所以此处p加上上方的计算结果便可
p += entrylen;
return p;
}
//获取当前元素的数据
// p 为元素的首地址 count 如果元素存储的是整型并且复合之前编码的 那么count为返回值 但是此处返回值优势char* 所以需要一个参数做为结果
//inbuff如果想获取到当前整型的buff数据 那么此值为返回值 如果需要inbuff 则count语义改成为写入到inbuff的字节个数
unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) {
//如果为整型 此处便是计算 count的临时变量
int64_t val;
//uval 根据char计算出的值 negstart存储的值是否为负数根据此值进行判别
//negmax 如果是负数 需要此值将当前正数计算为负数
uint64_t uval, negstart, negmax;
//如果当前是无符号整型 那么直接计算uval即可
if (LP_ENCODING_IS_7BIT_UINT(p[0])) {
negstart = UINT64_MAX;
negmax = 0;
//当前char & 一个0x7f 便是具体值
uval = p[0] & 0x7f;
//如果当前是六位的字符串 那么此处 p +1 就是数据体 因为encoding站1个字节 之前编码有详细说明过
} else if (LP_ENCODING_IS_6BIT_STR(p[0])) {
*count = LP_ENCODING_6BIT_STR_LEN(p);
return p+1;
//如果当前是13位的有符号整型
} else if (LP_ENCODING_IS_13BIT_INT(p[0])) {
//根据&0x1f 获取到高位值 在位移 8位 设置为高位数据 然后 或 | p1 低位值
uval = ((p[0]&0x1f)<<8) | p[1];
//他的最大值不能超过的 4096 如果超过那么便是负数
negstart = (uint64_t)1<<12;
//计算负数的基数 基数减去 uval -1 便是最终的结果
negmax = 8191;
//与13同理
} else if (LP_ENCODING_IS_16BIT_INT(p[0])) {
uval = (uint64_t)p[1] |
(uint64_t)p[2]<<8;
negstart = (uint64_t)1<<15;
negmax = UINT16_MAX;
//如果13同理
} else if (LP_ENCODING_IS_24BIT_INT(p[0])) {
uval = (uint64_t)p[1] |
(uint64_t)p[2]<<8 |
(uint64_t)p[3]<<16;
negstart = (uint64_t)1<<23;
negmax = UINT32_MAX>>8;
//与13同理
} else if (LP_ENCODING_IS_32BIT_INT(p[0])) {
uval = (uint64_t)p[1] |
(uint64_t)p[2]<<8 |
(uint64_t)p[3]<<16 |
(uint64_t)p[4]<<24;
negstart = (uint64_t)1<<31;
negmax = UINT32_MAX;
//与13同理
} else if (LP_ENCODING_IS_64BIT_INT(p[0])) {
uval = (uint64_t)p[1] |
(uint64_t)p[2]<<8 |
(uint64_t)p[3]<<16 |
(uint64_t)p[4]<<24 |
(uint64_t)p[5]<<32 |
(uint64_t)p[6]<<40 |
(uint64_t)p[7]<<48 |
(uint64_t)p[8]<<56;
negstart = (uint64_t)1<<63;
negmax = UINT64_MAX;
//如果是12位的字符串 那么需要p+2 encoding长度为 2
} else if (LP_ENCODING_IS_12BIT_STR(p[0])) {
*count = LP_ENCODING_12BIT_STR_LEN(p);
return p+2;
//如果是32位的字符串 那么需要p+5 encoding长度为 5
} else if (LP_ENCODING_IS_32BIT_STR(p[0])) {
*count = LP_ENCODING_32BIT_STR_LEN(p);
return p+5;
} else {
//如果都不是那么设置默认值 方便调试
uval = 12345678900000000ULL + p[0];
negstart = UINT64_MAX;
negmax = 0;
}
//如果当前值超过了限定的值 则说明 到达了负数
if (uval >= negstart) {
//之前设置的基数 减去当前值
uval = negmax-uval;
val = uval;
//取反-1 就是当时存储的具体值
val = -val-1;
} else {
//否则将读出的数据存储到val中
val = uval;
}
//如果需要字符串的buff那么snpringf存储进去
if (intbuf) {
//此时count为写入到inbuff中的字节长度
*count = snprintf((char*)intbuf,LP_INTBUF_SIZE,"%lld",(long long)val);
//然后返回intbuff
return intbuf;
} else {
//否则设置count 此为具体value的值
*count = val;
return NULL;
}
}
//追加元素 其实就是添加最后一个元素
//lp listpack 指针 ele 当前元素的指针 size 当前元素的大小
unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size) {
//首先获取当前元素的占用字节数
uint64_t listpack_bytes = lpGetTotalBytes(lp);
//当前lp首地址加上字节数 减去 1 就是 eof的地址
unsigned char *eofptr = lp + listpack_bytes - 1;
//只要在调用insert的时候 指定位before 设置需要插入的元素为eof便为append
return lpInsert(lp,ele,size,eofptr,LP_BEFORE,NULL);
}
//元素的删除 之前在insert中讲解过 lp listpack 指针 p 元素指针 newp如果需要删除元素的下一个指针则不为空
unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp) {
//设置插入值为NULL 类型为REPLACE 便是删除
return lpInsert(lp,NULL,0,p,LP_REPLACE,newp);
}