Redis数据结构使用场景及优化示例
Redis 学习路径
image基础数据类型和底层数据结构对应关系
image一、Hash在什么时候使用压缩列表
Hash 类型底层结构什么时候使用压缩列表,什么时候使用哈希表呢?
其实,Hash 类型设置了用压缩列表保存数据时的两个阈值,一旦超过了阈值,Hash 类型就会用哈希表来保存数据了。
这两个阈值分别对应以下两个配置项:
hash-max-ziplist-entries:表示用压缩列表保存时哈希集合中的最大元素个数。
hash-max-ziplist-value:表示用压缩列表保存时哈希集合中单个元素的最大长度。
// 设置 hash-max-ziplist-entries
config set hash-max-ziplist-entries 512
// 获取 hash-max-ziplist-entries
config get hash-max-ziplist-entries
// 设置 hash-max-ziplist-value
config set hash-max-ziplist-value 64
// 获取 hash-max-ziplist-value
config get hash-max-ziplist-value
如果我们往 Hash 集合中写入的元素个数超过了 hash-max-ziplist-entries,或者写入的单个元素大小超过了 hash-max-ziplist-value,Redis 就会自动把 Hash 类型的实现结构由压缩列表转为哈希表。
一旦从压缩列表转为了哈希表,Hash 类型就会一直用哈希表进行保存,而不会再转回压缩列表了。在节省内存空间方面,哈希表就没有压缩列表那么高效了。
优化示例,存储格式为 key: 1000000000 value: 2000000000
使用String存储1万个键值对,平均每个键值对占用内存情况如下
数据结构 | key类型 | value类型 | 平均占用/B |
---|---|---|---|
String | String | Long | 69.1072 |
String | String | String | 85.1072 |
使用Hash且底层为压缩列表
在保存单值的键值对时,可以采用基于 Hash 类型的二级编码方法。
即将key: 1000000000 拆分为两部分,前7位做为key,后3位做为field
因为field为三位数,范围为0~999共1000个,所以将hash-max-ziplist-entries设置为1000后,按照以上规格进行存储,就可以一直使用Hash且底层为压缩列表来存储数据,存储占用内存情况如下
数据结构 | key类型 | field类型 | value类型 | 平均占用/B |
---|---|---|---|---|
Hash | String | String | Long | 10.2952 |
Hash | String | String | String | 20.5336 |
综上所述,我们以前总认为 String 是“万金油”,什么场合都适用,但是,在保存的键值对本身占用的内存空间不大时(例如这里的例子),String 类型的元数据开销就占据主导了,这里面包括了 RedisObject 结构、SDS 结构、dictEntry 结构的内存开销。针对这种情况,我们可以使用压缩列表保存数据。当然,使用 Hash 这种集合类型保存单值键值对的数据时,我们需要将单值数据拆分成两部分,分别作为 Hash 集合的键和值,希望你能把这个方法用到自己的场景中。
如果你想知道键值对采用不同类型保存时的内存开销,可以在这个http://www.redis.cn/redis_memory/网址里输入你的键值对长度和使用的数据类型,这样就能知道实际消耗的内存大小了。建议你把这个小工具用起来,它可以帮助你充分地节省内存。
二、Set的聚合统计
现在有一个需求,统计每天的新增用户数和第二天的留存用户数。
我们可以使用Set的聚合统计功能,所谓的聚合统计,就是指统计多个集合元素的聚合结果,包括:
1.统计多个集合的共有元素(交集统计);
2.把两个集合相比,统计其中一个集合独有的元素(差集统计);
3.统计多个集合的所有元素(并集统计)。
Java实现代码
@SpringBootTest
public class RedisTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Test
void test() {
// 1. 记录 "2020-08-03登录用户" = [1,2,3,4,5,6,7,8,9,10]
for (int i = 1; i <= 10; i++) {
redisTemplate.opsForSet().add("user:id:20200803", i);
}
// 2. 2020-08-04 00:00:00的时候,设置 "累计用户" = "2020-08-03登录用户" 和 "累计用户" 的并集
redisTemplate.opsForSet().unionAndStore("user:id", "user:id:20200803", "user:id");
// 3. 记录 "2020-08-04登录用户" = [6,7,8,9,10,11,12,13,14,15]
for (int i = 6; i <= 15; i++) {
redisTemplate.opsForSet().add("user:id:20200804", i);
}
// 4. 2020-08-05 00:00:00的时候,计算并存储 "20200804的新增用户" = [11,12,13,14,15]
redisTemplate.opsForSet().differenceAndStore("user:id:20200804", "user:id", "user:id:20200804:add");
// 5. 2020-08-05 00:00:00的时候,计算并存储 "20200804的留存用户" = [6,7,8,9,10]
redisTemplate.opsForSet().intersectAndStore("user:id:20200804", "user:id:20200803", "user:id:20200804:keep");
}
}
运行以上代码后,可在可视化工具中看到以下结果
20200804的新增用户
image
20200804的留存用户
image
当你需要对多个集合进行聚合计算时,Set 类型会是一个非常不错的选择。不过,我要提醒你一下,这里有一个潜在的风险。
Set 的差集、并集和交集的计算复杂度较高,在数据量较大的情况下,如果直接执行这些计算,会导致 Redis 实例阻塞。
所以,我给你分享一个小建议:你可以从主从集群中选择一个从库,让它专门负责聚合计算,或者是把数据读取到客户端,在客户端来完成聚合统计,这样就可以规避阻塞主库实例和其他从库实例的风险了。
三、List和ZSet的排序统计
接下来,我们再来聊一聊应对集合元素排序需求的方法。我以在电商网站上提供最新评论列表的场景为例,进行讲解。
最新评论列表包含了所有评论中的最新留言,这就要求集合类型能对元素保序,也就是说,集合中的元素可以按序排列,这种对元素保序的集合类型叫作有序集合。
在 Redis 常用的 4 个集合类型中(List、Hash、Set、Sorted Set),List 和 Sorted Set 就属于有序集合。
List 是按照元素进入 List 的顺序进行排序的,而 Sorted Set 可以根据元素的权重来排序,我们可以自己来决定每个元素的权重值。比如说,我们可以根据元素插入 Sorted Set 的时间确定权重值,先插入的元素权重小,后插入的元素权重大。
Java实现代码
@SpringBootTest
public class RedisTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// list和zset数据初始化
private static final String key = "commentList";
private static final String key2 = "commentZset";
@Test
void set() {
for (int i = 1; i <= 1000; i++) {
// 根据时间戳生成唯一的score,13位时间戳+3位序列,zset的score超过16位会丢失精度
Double score = ScoreGenerator.nextScore();
Comment comment = Comment.builder()
.score(new BigDecimal(score).toString())
.comment(String.valueOf(i))
.build();
redisTemplate.opsForList().leftPush(key, comment);
redisTemplate.opsForZSet().add(key2, comment, new BigDecimal(score).doubleValue());
}
}
// list分页方法
private List<Comment> listPage(Long current, Long size) {
Long start = (current - 1) * size;
Long end = current * size - 1;
List<Object> objs = redisTemplate.opsForList().range(key, start, end);
List<Comment> comments = objs.stream().map(p -> (Comment) p).collect(Collectors.toList());
return comments;
}
// zset分页方法
private List<Comment> sortedSetPage(Long current, Long size) {
Long start = (current - 1) * size;
Set<Object> objects = redisTemplate.opsForZSet().reverseRangeByScore(key2, 0, 9999999999999999.0, start, size);
List<Comment> comments = objects.stream().map(p -> (Comment) p).collect(Collectors.toList());
return comments;
}
// zset分页方法2
private List<Comment> sortedSetPage2(Double maxScore, Long size) {
Set<Object> objects = redisTemplate.opsForZSet().reverseRangeByScore(key2, 0, maxScore, 0, size);
List<Comment> comments = objects.stream().map(p -> (Comment) p).collect(Collectors.toList());
return comments;
}
// list查询打印
@Test
void getListPage(Long current, Long size) {
long l = System.currentTimeMillis();
List<Comment> comments = listPage(current, size);
System.out.println("list查询耗时: " + (System.currentTimeMillis() - l) + "ms");
System.out.println(JSON.toJSONString(comments));
}
// zset查询打印
@Test
void getSortedSetPage(Long current, Long size) {
long l = System.currentTimeMillis();
List<Comment> comments = sortedSetPage(current, size);
System.out.println("zset查询耗时: " + (System.currentTimeMillis() - l) + "ms");
System.out.println(JSON.toJSONString(comments));
}
// zset查询打印
@Test
double getSortedSetPage2(Double maxScore, Long size) {
long l = System.currentTimeMillis();
List<Comment> comments = sortedSetPage2(maxScore, size);
System.out.println("zset查询耗时: " + (System.currentTimeMillis() - l) + "ms");
System.out.println(JSON.toJSONString(comments));
return new BigDecimal(comments.get(comments.size() - 1).getScore()).doubleValue();
}
@Test
void test() {
// 遍历查询出所有数据,每页10条
Long size = 10L;
Double maxScore = 9999999999999999.0;
for (int i = 1; i <= 100; i++) {
System.out.println("查询第" + i + "页");
// 这两种方法在翻页的时候,如果新插入一条数据都会造成,上一页最后一条和下一页第一条数据重复
getListPage(0L + i, size);
getSortedSetPage(0L + i, size);
// 这种根据maxScore 来查询的方法能避免以上数据重复问题
// 查询第一页,给一个足够大的maxScore,第二页的maxScore等于第一页最后一项的score-1
maxScore = getSortedSetPage2(maxScore, size) - 1;
System.out.println();
}
}
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
class Comment {
private String score;
private String comment;
}
class ScoreGenerator {
public static void main(String[] args) {
HashSet<Double> doubles = new HashSet<>();
long l = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
Double aDouble = ScoreGenerator.nextScore();
// 校验是否会重复
if (!doubles.contains(aDouble)) {
doubles.add(aDouble);
} else {
throw new RuntimeException("重复");
}
}
System.out.println((System.currentTimeMillis() - l) + "ms");
}
private static long lastTimestamp = -1L;
private static long sequence = 1L;
public synchronized static Double nextScore() {
long timestamp = System.currentTimeMillis();
//获取当前时间戳如果小于上次时间戳,则表示时间戳获取出现异常
if (timestamp < lastTimestamp) {
System.err.printf("clock is moving backwards. Rejecting requests until %d.", lastTimestamp);
throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds",
lastTimestamp - timestamp));
}
//获取当前时间戳如果等于上次时间戳(同一毫秒内),则在序列号加一;否则序列号赋值为0,从0开始。
if (lastTimestamp == timestamp) {
sequence = sequence + 1;
if (sequence > 999) {
timestamp = tilNextMillis(lastTimestamp);
sequence = 1;
}
} else {
sequence = 1;
}
//将上次时间戳值刷新
lastTimestamp = timestamp;
Long score = timestamp * 1000 + sequence;
return new BigDecimal(score).doubleValue();
}
//获取时间戳,并与上次时间戳比较
private static long tilNextMillis(long lastTimestamp) {
long timestamp = System.currentTimeMillis();
while (timestamp <= lastTimestamp) {
timestamp = System.currentTimeMillis();
}
return timestamp;
}
}
执行List和ZSet初始化方法后,数据如下
imageimage
执行分页查询方法后,结果如下
imageimage
image
可以看到以上三种查询方式,效率上差别不大。所以,在面对需要展示最新列表、排行榜等场景时,可以采用以上方法。如果数据更新频繁,不能接受翻页时可能出现重复数据的问题,建议你优先考虑使用 Sorted Set。
四、Bitmap的二值状态统计
现在,我们再来分析下第三个场景:二值状态统计。这里的二值状态就是指集合元素的取值就只有 0 和 1 两种。在签到打卡的场景中,我们只用记录签到(1)或未签到(0),所以它就是非常典型的二值状态。
在签到统计时,每个用户一天的签到用 1 个 bit 位就能表示,一个月(假设是 31 天)的签到情况用 31 个 bit 位就可以,而一年的签到也只需要用 365 个 bit 位,根本不用太复杂的集合类型。这个时候,我们就可以选择 Bitmap。这是 Redis 提供的扩展数据类型。我来给你解释一下它的实现原理。
Bitmap 本身是用 String 类型作为底层数据结构实现的一种统计二值状态的数据类型。String 类型是会保存为二进制的字节数组,所以,Redis 就把字节数组的每个 bit 位利用起来,用来表示一个元素的二值状态。你可以把 Bitmap 看作是一个 bit 数组。
Java实现代码
@SpringBootTest
public class RedisTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// 计算用户(1000)在2021年1月签到总天数
@Test
void bitCount() {
String key = "uid:sign:1000:202101";
// 代表用户1000在2021年1月1日签到了
redisTemplate.opsForValue().setBit(key, 0, true);
// 代表用户1000在2021年1月6日签到了
redisTemplate.opsForValue().setBit(key, 5, true);
// 查询用户1000在2021年1月6日是否签到了
Boolean bit = redisTemplate.opsForValue().getBit(key, 5);
System.out.println(bit); // true
// 查询用户1000在2021年1月的签到次数
Long count = redisTemplate.execute((RedisCallback<Long>) con -> con.bitCount(key.getBytes()));
System.out.println(count); // 2次
}
// 计算用户连续签到天数,假设某个7天的活动连续签到7天有奖品
@Test
void bitOp() {
// 假设5个用户参加活动,每一天的每一位是一个用户
// day1每个用户都签到了
for (int i = 0; i < 5; i++) {
redisTemplate.opsForValue().setBit("day1", i, true);
}
// day1前4个用户都签到了
for (int i = 0; i < 4; i++) {
redisTemplate.opsForValue().setBit("day2", i, true);
}
// day1前3个用户都签到了
for (int i = 0; i < 3; i++) {
redisTemplate.opsForValue().setBit("day3", i, true);
}
// day1前2个用户都签到了
for (int i = 0; i < 2; i++) {
redisTemplate.opsForValue().setBit("day4", i, true);
}
// 之后只有用户0坚持了下来
redisTemplate.opsForValue().setBit("day5", 0, true);
redisTemplate.opsForValue().setBit("day6", 0, true);
redisTemplate.opsForValue().setBit("day7", 0, true);
// 对day1~day7 按位与,存储到destination中。RedisStringCommands.BitOperation还包含其他逻辑操作
Long bitAnd = redisTemplate.execute((RedisCallback<Long>) con ->
con.bitOp(RedisStringCommands.BitOperation.AND,
"destination".getBytes(), "day1".getBytes(),
"day2".getBytes(), "day3".getBytes(), "day4".getBytes(),
"day5".getBytes(), "day6".getBytes(), "day7".getBytes()));
System.out.println(bitAnd);
// 查询结果,对应位数上的用户为ture则代表连续签到了七天
ArrayList<Boolean> destination = new ArrayList<>();
for (int i = 0; i < 5; i++) {
destination.add(redisTemplate.opsForValue().getBit("destination", i));
}
System.out.println(destination); // [true, false, false, false, false]
}
// bitpos 是用来查询一个二进制串里第一个0或者1的位置。可设置查询的位范围
@Test
void bitPos() {
redisTemplate.opsForValue().setBit("bitPos", 5, true);
Long execute = redisTemplate.execute((RedisCallback<Long>) con -> con.bitPos("bitPos".getBytes(), true));
System.out.println(execute); // 5
}
}
如果只需要统计数据的二值状态,例如商品有没有、用户在不在等,就可以使用 Bitmap,因为它只用一个 bit 位就能表示 0 或 1。在记录海量数据时,Bitmap 能够有效地节省内存空间。
五、HyperLogLog的基数统计
我们再来看一个统计场景:基数统计。基数统计就是指统计一个集合中不重复的元素个数。对应到我们刚才介绍的场景中,就是统计网页的 UV。网页 UV 的统计有个独特的地方,就是需要去重,一个用户一天内的多次访问只能算作一次。
在 Redis 的集合类型中,Set 类型默认支持去重,所以看到有去重需求时,我们可能第一时间就会想到用 Set 类型。但是,如果 UV 达到了千万,这个时候,一个页面的 Set 就要记录千万个用户 ID, 如果每个页面都用这样的一个 Set,就会消耗很大的内存空间。
HyperLogLog 是一种用于统计基数的数据集合类型,它的最大优势就在于,当集合元素数量非常多时,它计算基数所需的空间总是固定的,而且还很小。在 Redis 中,每个 HyperLogLog 只需要花费 12 KB 内存,就可以计算接近 2^64 个元素的基数。
Java实现代码
@SpringBootTest
public class RedisTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Test
void test() {
// 假设一万个用户访问了page1
long l = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
redisTemplate.opsForHyperLogLog().add("page1:uv", i);
}
System.out.println("添加完成,耗时" + (System.currentTimeMillis() - l) + "ms");
Long size = redisTemplate.opsForHyperLogLog().size("page1:uv");
System.out.println(size); // 9987个,存在标准误算 0.81%
}
}
不过,有一点需要你注意一下,HyperLogLog 的统计规则是基于概率完成的,所以它给出的统计结果是有一定误差的,标准误算率是 0.81%。这也就意味着,你使用 HyperLogLog 统计的 UV 是 100 万,但实际的 UV 可能是 101 万。虽然误差率不算大,但是,如果你需要精确统计结果的话,最好还是继续用 Set 或 Hash 类型。
六、GEO 支持 LBS 应用
在日常生活中,我们越来越依赖搜索“附近的餐馆”、在打车软件上叫车,这些都离不开基于位置信息服务(Location-Based Service,LBS)的应用。LBS 应用访问的数据是和人或物关联的一组经纬度信息,而且要能查询相邻的经纬度范围,GEO 就非常适合应用在 LBS 服务的场景中。
Java实现代码
这里模拟叫车业务,查询用户附近十公里内的车辆信息
@SpringBootTest
public class RedisTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Test
void test() {
String key = "cars:locations";
// 添加车
redisTemplate.opsForGeo().add(key, new Point(114.417265, 30.453276), "car1");
redisTemplate.opsForGeo().add(key, new Point(114.403287, 30.455175), "car2");
redisTemplate.opsForGeo().add(key, new Point(100, 20), "car3");
// 人的位置
Point people = new Point(114.408031, 30.447827);
// 查10公里内最近的10辆车
List<Result> nearCars = this.getNearCars(key, people,
new Distance(10000, RedisGeoCommands.DistanceUnit.METERS), // 10公里内
10L); // 十条记录
// 打印查询结果
nearCars.forEach(p -> System.out.println(p));
}
// 获取附近车、车坐标以及人与车的距离
public List<Result> getNearCars(String key, Point point, Distance distance, Long limit) {
// 获取附近车的集合
GeoResults<RedisGeoCommands.GeoLocation<Object>> radiusGeo = redisTemplate.opsForGeo().radius(key,
new Circle(point, distance),
RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs()
.includeDistance() // 包含距离
.includeCoordinates() // 包含坐标
.sortAscending() // 由近到远排序
.limit(limit) // 获取记录数
);
List<Result> cars = new ArrayList<>();
// 遍历附近车的集合
Iterator<GeoResult<RedisGeoCommands.GeoLocation<Object>>> result = radiusGeo.iterator();
while (result.hasNext()) {
GeoResult<RedisGeoCommands.GeoLocation<Object>> geoLocation = result.next();
// 车
String c = (String) geoLocation.getContent().getName();
// 车坐标
Point p = geoLocation.getContent().getPoint();
// 人与车的距离
Distance d = geoLocation.getDistance();
cars.add(Result.builder()
.carId(c)
.longitude(new BigDecimal(p.getX()).setScale(6, BigDecimal.ROUND_HALF_UP))
.latitude(new BigDecimal(p.getY()).setScale(6, BigDecimal.ROUND_HALF_UP))
.distance((int) d.getValue())
.build()
);
}
return cars;
}
}
@Data
@Builder
class Result {
private String carId;
private BigDecimal longitude;
private BigDecimal latitude;
private Integer distance;
}
添加车辆后,可以在下图redis中看到,车辆坐标信息是用ZSET存储的,说明GEO是用ZSET做为底层实现的
image
查询附近车辆的结果如下图,可以看出只有两个距离小于10公里的车car1和car2被查了出来
image
七、List实现消息队列
https://www.jianshu.com/p/bd70702cda00