这就是一篇专门写Redis的文章!
Redis是什么?
是完全开源免费的,用c语言编写的,是一个单线程,高性能的(key/value)内存数据库,基于内存运行并支持持久化的nosql数据库。
能干嘛?
主要是用来做缓存,但不仅仅只能做缓存,比如:redis的计数器生成分布式唯一主键,redis实现分布式锁,队列,点赞,统计网站访问量。
去哪下?
官网,也可以通过Linux yum直接下载安装
怎么玩?
-
安装
-
redis数据类型(api操作) redis5.0 新加一个数据类型
-
redis配置文件解析
-
redis的持久化
-
redis的事务
-
redis的发布订阅
-
java客户端操作(jedis)
redis的安装
-
解压
-
make 如果make报错的话,大家就可以看一下是不是报没有gcc的错,如果是报没有gcc的错,那就要先安装一个gcc,yum install gcc-c++ 安装好gcc之后执行一下make distclean 因为前面make的时候它执行了一些东西,要先把他清掉。
-
make install 查看redis默认安装位置 /usr/local/bin
redis设置外网访问
1.注释bind并且把protected-mode no
2.使用bind
3.设置密码
protected-mode它启用的条件有两个,第一是没有使用bind,第二是没有设置访问密码。
redis数据类型及api操作(http://redisdoc.com/)
key
keys *
scan 0 match * count 1
exists key:判断某个key是否存在
move key db:当前库就没有了,到指定的库中去了
expire key:为给定的key设置过期时间
ttl key:查看还有多少时间过期 -1表示永不过期 -2表示已过期
type key:查看key是什么类型
1.string
string是redis最基本的类型,你可以理解成与Memcached一模一样的类型,一个key对应一个value。
string类型是二进制安全的。意思是redis的string可以包含任何数据。比如jpg图片或者序列化的对象 。
string类型是Redis最基本的数据类型,一个redis中字符串value最多可以是512M。
set key value:设置key value
get key:查看当前key的值
del key:删除key
append key value:如果key存在,则在指定的key末尾添加,如果key存在则类似set
strlen key:返回此key的长度
以下几个命令只有在key值为数字的时候才能正常操作。
incr key:为指定key的值加一
decr key:为指定key的值减一
incrby key 数值:为指定key的值增加数值
decrby key 数值:为指定key的值减数值
getrange key 0(开始位置) -1(结束位置) 获取指定区间范围内的值,类似between......and的关系 (0 -1)表示全部
setrange key 1(开始位置,从哪里开始设置) 具体值 设置(替换)指定区间范围内的值
setex 键 秒值 真实值 设置带过期时间的key,动态设置。
setnx key value 只有在 key 不存在时设置 key 的值。
mset key1 value key2 value 同时设置一个或多个 key-value 对。
mget key1 key 2 获取所有(一个或多个)给定 key 的值。
msetnx key1 value key2 value 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在。
getset key value 将给定 key 的值设为 value ,并返回 key 的旧值(old value)。
2.list
它是一个字符串链表,left、right都可以插入添加;如果键不存在,创建新的链表;如果键已存在,新增内容;如果值全移除,对应的键也就消失了。链表的操作无论是头和尾效率都极高,但假如是对中间元素进行操作,效率就很惨淡了。
Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。它的底层实际是个链表
lpush key value1 value2 将一个或多个值加入到列表头部
rpush key value1 value2 将一个或多个值加入到列表底部
lrange key start end 获取列表指定范围的元素 (0 -1)表示全部
lpop key 移出并获取列表第一个元素
rpop key 移出并获取列表最后一个元素
lindex key index 通过索引获取列表中的元素
llen 获取列表长度
lrem key 0(数量) 值,表示删除全部给定的值。零个就是全部值 从left往right删除指定数量个值等于指定值的元素,返回的值为实际删除的数量。
ltrim key start(从哪里开始截) end(结束位置) 截取指定索引区间的元素,格式是ltrim list的key 起始索引结束索引。
3.set
Redis的Set是string类型的无序,不能重复的集合。·
sadd key value1 value 2 向集合中添加一个或多个成员
smembers key 返回集合中所有成员
sismembers key member 判断member元素是否是集合key的成员
scard key 获取集合里面的元素个数
srem key value 删除集合中指定元素
srandmember key 数值 从set集合里面随机取出指定数值个元素 如果超过最大数量就全部取出
spop key 随机移出并返回集合中某个元素
smove key1 key2 value(key1中某个值) 作用是将key1中指定的值移除 加入到key2集合中
sdiff key1 key2 在第一个set里面而不在后面任何一个set里面的项(差集)
sinter key1 key2 在第一个set和第二个set中都有的 (交集)
sunion key1 key2 两个集合所有元素(并集)
4.hash
Redis hash 是一个键值对集合。Redis hash是一个string类型的field和value的映射表,hash特别适合用于存储对象。
kv模式不变,但v是一个键值对
类似Java里面的Map
hset key (key value) 向hash表中添加一个元素
hget key key 向hash表中获取一个元素
hmset key key1 value1 key2 value2 key3 value3 向集合中添加一个或多个元素
hmget key key1 key2 key3 向集合中获取一个或多个元素
hgetall key 获取在hash列表中指定key的所有字段和值
hdel key key1 key2 删除一个或多个hash字段
hlen key 获取hash表中字段数量
hexits key key 查看hash表中,指定key(字段)是否存在
hkeys key 获取指定hash表中所有key(字段)
hvals key 获取指定hash表中所有value(值)
hincrdy key key1 数量(整数) 指定hash表中某个字段加 数量 ,和incr一个意思
hincrdyfloat key key1 数量(浮点数,小数) 指定hash表中某个字段加 数量 ,和incr一个意思
hsetnx key key1 value1 与hset作用一样,区别是不存在赋值,存在了无效。
5.zset
Redis zset 和 set 一样也是string类型元素的集合,且不允许重复的成员。不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。zset的成员是唯一的,但分数(score)却可以重复。
zadd key score 值 score 值 向集合中添加一个或多个成员
zrange key 0 -1 表示所有 返回指定集合中所有value
zrange key 0 -1 withscores 返回指定集合中所有value和score
zrangebyscore key 开始score 结束score 返回指定score间的值
zrem key score某个对应值(value),可以是多个值 删除元素
zcard key 获取集合中元素个数
zcount key 开始score 结束score 获取分数区间内元素个数
zrank key vlaue 获取value在zset中的下标位置(根据score排序)
zscore key value 按照值获得对应的分数
redis事务
什么是redis的事务?
redis事务就是一个命令执行的队列,将一系列预定义命令包装成一个整体,就是一个队列。当执行的时候,一次性按照添加顺序依次执行,中间不会被打断或者干扰。
能干嘛?
一个队列中,一次性,顺序性,排他性的执行一系列命令
redis事务基本操作
开启事务:multi 设置事务的开始位置,这个指令开启后,后面所有的指令都会加入事务中
执行事务:exec 设置事务的结束位置,同时执行事务,与multi成对出现,成对使用
取消事务:discard 终止当前事务,取消multi后,exec前的所有指令
注意:加入事务的命令并没有立马执行,而且加入队列中,exec命令后才执行
加入和执行事务有错误会怎么办?
加入事务语法报错,事务则取消
执行事务报错,则成功的返回成功,失败的返回失败,不影响报错后面的指令
注意: 已经执行完毕的命令对应的数据不会自动回滚,需要程序员自己实现
监控key
watch:对key进行监控,如果在exec执行前,监控的key发生了变化,终止事务执行
unwatch:取消对所有的key进行监控
redis发布订阅
publish:发布消息 语法:publish channel名称 “消息内存”
subscribe:订阅消息 语法:subscribe channel名称
subscribe:使用通配符订阅消息 语法:pubscribe channel*名称
punsubscribe:使用通配符退订消息。语法:punsubscribe channel*名称
unsubscribe:退订消息 语法:unsubscribe channel名称
删除策略
定时删除-->以CPU内存换redis内存
惰性删除-->以redis内存换CPU内存
定期删除
redis使用:惰性删除+定期删除
1.redis在启动的时候读取配置文件hz的值,默认为10
2.每秒执行hz次serverCron()-->databasesCron()--->actveEXpireCyle()
3.actveEXpireCyle()对每个expires[*]进行逐一检测,每次执行250ms/hz
4.对某个expires[*]检测时,随机挑选N个key检查
如果key超时,删除key
如果一轮中删除的key的数量>N*25%,循环该过程
如果一轮中删除的key的数量小于等于N*25%,检查下一个expires[ * ]
currentdb用于记录actveEXpireCyle()进入哪个expires[ * ] 执行,如果时间到了,那么下次根据currentdb继续执行。
逐出算法
相关配置:
maxmemory:最大可使用内存,占用物理内存的比例,默认值为0,表示不限制。生产环境一般根据需求设置,通常50%以上
maxmemory-policy:达到最大内存后,对挑选出来的数据进行删除策略
maxmemory-samples:每次选取待删除数据的个数,选取数据时并不会全库扫描,采用随机获取数据的方式作为待检测删除数据
redis的持久化机制
说白了,就是在指定的时间间隔内,将内存当中的数据集快照写入磁盘,它恢复时是将快照文件直接读到内存。
什么意思呢?
我们都知道,内存当中的数据,如果我们一断电,那么数据必然会丢失,但是玩过redis的同学应该都知道,我们一关机之后再启动的时候数据是还在的,所以它必然是在redis启动的时候重新去加载了持久化的文件。
redis提供两种方式进行持久化:一种是RDB持久化默认,另外一种是AOF(append only file)持久化。
1.RDB
是什么?
原理是redis会单独创建(fork)一个与当前进程一模一样的子进程来进行持久化,这个子进程的所有数据(变量。环境变量,程序程序计数器等)都和原进程一模一样,会先将数据写入到一个临时文件中,待持久化结束了,再用这个临时文件替换上次持久化好的文件,整个过程中,主进程不进行任何的io操作,这就确保了极高的性能。
1.这个持久化文件在哪里
2.他什么时候fork子进程,或者什么时候触发rdb持久化机制
shutdown时,如果没有开启aof,会触发 配置文件中默认的快照配置
执行命令save或者bgsave save是只管保存,其他不管,全部阻塞,使用主进程进行持久化 bgsave:redis会在后台异步进行快照操作,同时可以响应客户端的请求
执行flushall命令 但是里面是空的,无意义
2.aof(--fix) ls -l --block-size=M
是什么?
原理是将Reids的操作日志以追加的方式写入文件,读操作是不记录的
1.这个持久化文件在哪里
2.触发机制(根据配置文件配置项)
no:表示等操作系统进行数据缓存同步到磁盘(快,持久化没保证) always:同步持久化,每次发生数据变更时,立即记录到磁盘(慢,安全) everysec:表示每秒同步一次(默认值,很快,但可能会丢失一秒以内的数据)
3.aof重写机制
当AOF文件增长到一定大小的时候Redis能够调用 bgrewriteaof对日志文件进行重写 。当AOF文件大小的增长率大于该配置项时自动开启重写(这里指超过原大小的100%)。auto-aof-rewrite-percentage 100
当AOF文件增长到一定大小的时候Redis能够调用 bgrewriteaof对日志文件进行重写 。当AOF文件大小大于该配置项时自动开启重写
auto-aof-rewrite-min-size 64mb
4.redis4.0后混合持久化机制 (rdb+aof)对重写的优化
开启混合持久化
4.0版本的混合持久化默认关闭的,通过aof-use-rdb-preamble配置参数控制,yes则表示开启,no表示禁用,5.0之后默认开启。
混合持久化是通过bgrewriteaof完成的,不同的是当开启混合持久化时,fork出的子进程先将共享的内存副本全量的以RDB方式写入aof文件,然后在将重写缓冲区的增量命令以AOF方式写入到文件,写入完成后通知主进程更新统计信息,并将新的含有RDB格式和AOF格式的AOF文件替换旧的的AOF文件。简单的说:新的AOF文件前半段是RDB格式的全量数据后半段是AOF格式的增量数据。
优点:混合持久化结合了RDB持久化 和 AOF 持久化的优点, 由于绝大部分都是RDB格式,加载速度快,同时结合AOF,增量的数据以AOF方式保存了,数据更少的丢失。
缺点:兼容性差,一旦开启了混合持久化,在4.0之前版本都不识别该aof文件,同时由于前部分是RDB格式,阅读性较差
小总结:
1.redis提供了rdb持久化方案,为什么还要aof?
优化数据丢失问题,rdb会丢失最后一次快照后的数据,aof丢失不会超过2秒的数据
2.如果aof和rdb同时存在,听谁的?
aof
3.rdb和aof优势劣势
rdb 适合大规模的数据恢复,对数据完整性和一致性不高 , 在一定间隔时间做一次备份,如果redis意外down机的话,就会丢失最后一次快照后的所有操作 aof 根据配置项而定。
1.官方建议 两种持久化机制同时开启,如果两个同时开启 优先使用aof持久化机制
性能建议(这里只针对单机版redis持久化做性能建议):
因为RDB文件只用作后备用途,只要15分钟备份一次就够了,只保留save 900 1这条规则。
如果Enalbe AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了。代价一是带来了持续的IO,二是AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上。默认超过原大小100%大小时重写可以改到适当的数值。
redis集群专题
Redis主从复制
1.是什么
1.单机有什么问题:
单机故障
容量瓶颈
qps瓶颈
主机数据更新后根据配置和策略,自动同步到备机的master/slaver机制,mester已写为主,slaver已读为主
2.能干嘛
1.读写分离
2.容灾备份
3.怎么玩
玩法原则:
1.配从不配主
2.使用命令 SLAVEOF 动态指定主从关系 ,如果设置了密码,关联后使用 config set masterauth 密码
3.配置文件和命令混合使用时,如果混合使用,动态指定了主从,请注意一定要修改对应的配置文件
1.新建redis8000,redis8001,redis8002文件夹
2.将redis.conf文件复制在redis8000下
3.分别修改目录下的redis.conf文件
redis8000/redis.conf
1.bind 192.168.0.104 指定本机ip
2.port 8000
3.daemonize yes
4.pidfile /var/run/redis_8000.pid
5.dir /myredis/redis8000
6.requirepass 123456
4.把redis8000/redis.conf文件复制到redis8001,redis8002下
redis8001/redis.conf
1. :%s/8000/8001/g 批量替换
2. replicaof 192.168.0.104 8000
3. masterauth 123456
redis8002/redis.conf
1. `1.:%s/8000/8002/g 批量替换`
2. `2. replicaof 192.168.0.1048000`
3. `3. masterauth 123456`
5.分别启动8000.8001,8002实例
[root@localhost myredis]# /usr/local/bin/redis-server /myredis/redis8000/redis.conf [root@localhost myredis]# /usr/local/bin/redis-server /myredis/redis8001/redis.conf [root@localhost myredis]# /usr/local/bin/redis-server /myredis/redis8002/redis.conf
6.客户端连接
/usr/local/bin/redis-cli -h 192.168.0.104 -p 8000 -a 123456
/usr/local/bin/redis-cli -h 192.168.0.104 -p 8001 -a 123456
/usr/local/bin/redis-cli -h 192.168.0.104 -p 8002 -a 123456
工作流程
总体分为大三步:
建立连接
1.设置master的地址和端口,发送slaveof ip port指令,master会返回响应客户端,根据响应信息保存master ip port信息 (连接测试)
2.根据保存的信息创建连接master的socket
3.周期性发送ping,master会响应pong
4.发送指令 auth password(身份验证),master验证身份
5.发送slave端口信息,master保存slave的端口号
数据同步
1.slave发送指令 psyn2
2.master 执行bgsave
3.在第一个salve连接时,创建命令缓存区
4.生成RDB文件,通过socket发送给slave
5.slave接收RDB,清空数据,执行RDB文件恢复过程
6.发送命令告知RDB恢复已经完成(告知全量复制完成)
7.master发送复制缓冲区信息
8.slave接收信息,执行重写后恢复数据
注意: master会保存slave从我这里拿走了多少数据,保存salve的偏移量
命令传播
slave心跳:replconf ack {offset} 汇报slave自己的offset,获取最新数据指令
命令传播阶段出现断网:
-
网络闪断闪连 忽略
-
短时间断网 增量
-
长时间断网 全量
全量复制核心三个要素
-
服务器运行id
用于服务器之间通信验证身份,master首次连接slave时,会将自己的run_id发送给slave,slave保存此ID
-
主服务器积压的命令缓冲区
先进先出队列
-
主从服务器的复制偏移量
用于比对偏移量,然后判断出执行全量还是增量
4.全量复制消耗
1.bgsave时间 2.rdb文件网络传输 3.从节点请求请求数据时间 4.从节点加载rdb的时间 5.可能的aof重写时间
5.缺点
1.由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。
2.当主机宕机之后,将不能进行写操作,需要手动将从机升级为主机,从机需要重新制定master
简单总结:
一个master可以有多个Slave
一个slave只能有一个master
数据流向是单向的,只能从主到从。
redis哨兵模式
1.是什么,能干嘛?
在Redis 2.8版本开始引入。哨兵的核心功能是主节点的自动故障转移。
通俗来讲哨兵模式的出现是就是为了解决我们主从复制模式中需要我们人为操作的东西变为自动版,并且它比人为要更及时
2.哨兵主要功能(做了哪些事)
监控(Monitoring):哨兵会不断地检查主节点和从节点是否运作正常。
自动故障转移(Automatic Failover):当主节点不能正常工作时,哨兵会开始自动故障转移操作,它会将失效主节点的其中一个从节点升级为新的主节点,并让其他从节点改为复制新的主节点。
配置提供者(Configuration Provider):客户端在初始化时,通过连接哨兵来获得当前Redis服务的主节点地址。
通知(Notification):哨兵可以将故障转移的结果发送给客户端。
其中,监控和自动故障转移功能,使得哨兵可以及时发现主节点故障并完成转移;而配置提供者和通知功能,则需要在与客户端的交互中才能体现。
3.架构
哨兵节点:哨兵系统由一个或多个哨兵节点组成,哨兵节点是特殊的Redis节点,不存储数据。
数据节点:主节点和从节点都是数据节点。
4.怎么玩(实战)?
1.部署主从节点
哨兵系统中的主从节点,与普通的主从节点配置是一样的,并不需要做任何额外配置。下面分别是主节点(port=8000)和2个从节点(port=8001/8002)的配置文件;
我们刚才搭建的主从复制就是主从节点
2.部署哨兵节点
哨兵节点本质上是特殊的Redis节点。
3个哨兵节点的配置几乎是完全一样的,主要区别在于端口号的不同(26379 / 26380 / 26381)下面以26379节点为例介绍节点的配置和启动方式;配置部分尽量简化:
sentinel-26379.conf
port 26379
daemonize yes
logfile "26379.log"
sentinel monitor mymaster 192.168.0.104 6379 2
其中,sentinel monitor mymaster 192.168. 92.128 6379 2配置的含义是:该哨兵节点监92.168.0.104 6379这个主节点,该主节点的名称是mymaster,最后的2的含义与主节点的故障判定有关:至少需要2个哨兵节点同意,才能判定主节点故障并进行故障转移。
哨兵节点的启动有两种方式,二者作用是完全相同的:
redis-sentinel sentinel-26379.conf
redis-server sentinel-26379.conf --sentinel
5.故障转移演示(哨兵的监控和自动故障转移功能)
使用kill命令杀掉主节点
6.客户端(jedis)访问哨兵系统(自动故障转移功能)
1. `publicstaticvoid main(String[] args) {`
2. `Logger logger= LoggerFactory.getLogger(TestJedisSentinel.class);`
3. `Set<String> set=newHashSet<>();`
4. `set.add("192.168.0.104:8000");`
5. `set.add("192.168.0.104:8001");`
6. `set.add("192.168.0.104:8002");`
7. `JedisSentinelPool jedisSentinelPool=newJedisSentinelPool("mymaster",set,"123456");`
8. `while(true) {`
9. `Jedis jedis=null;`
10. `try{`
11. `jedis = jedisSentinelPool.getResource();`
12. `String s = UUID.randomUUID().toString();`
13. `jedis.set("k"+ s, "v"+ s);`
14. `System.out.println(jedis.get("k"+ s));`
15. `Thread.sleep(1000);`
16. `}catch(Exception e){`
17. `logger.error(e.getMessage());`
18. `}finally{`
19. `if(jedis!=null){`
20. `jedis.close();`
21. `}`
22. `}`
23. `}`
24. `}`
7.基本原理
关于哨兵的原理,关键是了解以下几个概念:
主观下线:在心跳检测的定时任务中,如果其他节点超过一定时间没有回复,哨兵节点就会将其进行主观下线。顾名思义,主观下线的意思是一个哨兵节点“主观地”判断下线;与主观下线相对应的是客观下线。
客观下线:哨兵节点在对主节点进行主观下线后,会通过sentinel is-master-down-by-addr命令询问其他哨兵节点该主节点的状态;如果判断主节点下线的哨兵数量达到一定数值,则对该主节点进行客观下线。
需要特别注意的是,客观下线是主节点才有的概念;如果从节点和哨兵节点发生故障,被哨兵主观下线后,不会再有后续的客观下线和故障转移操作。
定时任务:每个哨兵节点维护了3个定时任务。定时任务的功能分别如下:
1.每10秒通过向主从节点发送info命令获取最新的主从结构;
发现slave节点
确定主从关系
2.每2秒通过发布订阅功能获取其他哨兵节点的信息;SUBSCRIBE c2 PUBLISH c2 hello-redis
交互对节点的“看法”和自身情况
3.每1秒通过向其他节点发送ping命令进行心跳检测,判断是否下线(monitor)。
心跳检测,失败判断依据
选举领导者哨兵节点:当主节点被判断客观下线以后,各个哨兵节点会进行协商,选举出一个领导者哨兵节点,并由该领导者节点对其进行故障转移操作。
监视该主节点的所有哨兵都有可能被选为领导者,选举使用的算法是Raft算法;Raft算法的基本思路是先到先得:即在一轮选举中,哨兵A向B发送成为领导者的申请,如果B没有同意过其他哨兵,则会同意A成为领导者。选举的具体过程这里不做详细描述,一般来说,哨兵选择的过程很快,谁先完成客观下线,一般就能成为领导者。
故障转移:选举出的领导者哨兵,开始进行故障转移操作,该操作大体可以分为3个步骤:
在从节点中选择新的主节点:选择的原则是,
1.首先过滤掉不健康的从节点;
2.过滤响应慢的节点
3.过滤与master断开时间最久的
4.优先原则
先选择优先级最高的从节点(由replica-priority指定);如果优先级无法区分,
1. `则选择复制偏移量最大的从节点;如果仍无法区分,`
则选择runid最小的从节点。
更新主从状态:通过slaveof no one命令,让选出来的从节点成为主节点;并通过slaveof命令让其他节点成为其从节点。
将已经下线的主节点(即6379)保持关注,当6379从新上线后设置为新的主节点的从节点
8.实践建议
哨兵节点的数量应不止一个。一方面增加哨兵节点的冗余,避免哨兵本身成为高可用的瓶颈;另一方面减少对下线的误判。此外,这些不同的哨兵节点应部署在不同的物理机上。
哨兵节点的数量应该是奇数,便于哨兵通过投票做出“决策”:领导者选举的决策、客观下线的决策等。
各个哨兵节点的配置应一致,包括硬件、参数等;此外应保证时间准确、一致。
9.总结
在主从复制的基础上,哨兵引入了主节点的自动故障转移,进一步提高了Redis的高可用性;但是哨兵的缺陷同样很明显:哨兵无法对从节点进行自动故障转移,在读写分离场景下,从节点故障会导致读服务不可用,需要我们对从节点做额外的监控、切换操作。此外,哨兵仍然没有解决写操作无法负载均衡、及存储能力受到单机限制的问题
redis cluster高可用集群
1.redis cluster集群是什么?
redis cluster集群是一个由多个主从节点群组成的分布式服务器群,它具有复制、高可用和分片特 性。Redis cluster集群不需要sentinel哨兵也能完成节点移除和故障转移的功能。需要将每个节点 设置成集群模式,这种集群模式没有中心节点,可水平扩展,据官方文档称可以线性扩展到 1000节点。redis cluster集群的性能和高可用性均优于之前版本的哨兵模式,且集群配置非常简单。
2.redis cluster集群搭建
/usr/local/bin/redis-cli --cluster help
1.原生搭建
1.配置开启cluster节点
cluster-enabled yes(启动集群模式)
cluster-config-file nodes-8001.conf(这里800x最好和port对应上)
2.meet
cluster meet ip port
3.指派槽
查看crc16 算法算出key的槽位命令 cluster keyslot key
16384/3 0-5461 5462-10922 10923-16383 16384/4 4096
cluster addslots slot(槽位下标)
4.分配主从
cluster replicate node-id
2.使用redis提供的rb脚本
redis cluster集群需要至少要三个master节点,我们这里搭建三个master节点,并且给每个 master再搭建一个slave节点,总共6个redis节点,由于节点数较多,这里采用在一台机器 上创建6个redis实例,并将这6个redis实例配置成集群模式,所以这里搭建的是伪集群模 式,当然真正的分布式集群的配置方法几乎一样,搭建伪集群的步骤如下:
第一步:在/usr/local下创建文件夹redis-cluster,然后在其下面分别创建6个文件夾如下
(1)mkdir -p /usr/local/redis-cluster
(2)mkdir 8001、 mkdir 8002、 mkdir 8003、 mkdir 8004、 mkdir 8005、 mkdir 8006
第二步:把之前的redis.conf配置文件copy到8001下,修改如下内容:
(1)daemonize yes
(2)port 8001(分别对每个机器的端口号进行设置)
(3)bind 127.0.0.1(如果只在本机玩则可以指定为127.0.0.1 如果需要外网访问则需要指定本机真实ip) 定可能会出现循环查找集群节点机器的情况)
(4)dir /usr/local/redis-cluster/8001/(指定数据文件存放位置,必须要指定不同的目 录位置,不然会丢失数据)
(5)cluster-enabled yes(启动集群模式)
(6)cluster-config-file nodes-8001.conf(这里800x最好和port对应上)
(7)cluster-node-timeout 5000
(8)appendonly yes
第三步:把修改后的配置文件,分别 copy到各个文夹下,注意每个文件要修改第2、4、6 项里的端口号,可以用批量替换::%s/源字符串/目的字符串/g
第四步:由于 redis集群需要使用 ruby命令,所以我们需要安装 ruby(redis5.0之后省略) (1)yum install ruby
(2)yum install rubygems
(3)gem install redis --version 3.0.0(安装redis和 ruby的接囗)
第五步:分别启动6个redis实例,然后检查是否启动成功
(1)/usr/local/redis/bin/redis-server /usr/local/redis-cluster/800*/redis.conf
(2)ps -ef | grep redis 查看是否启动成功
第六步:在redis3的安装目录下执行 redis-trib.rb命令创建整个redis集群
(1)cd /usr/local/redis3/src
(2)./redis-trib.rb create --replicas 1 127.0.0.1:9000 127.0.0.1:9001 127.0.0.1:9002 127.0.0.1:9003 127.0.0.1:9004 127.0.0.1:9005
redis5.0使用/usr/local/bin/redis-cli --cluster create 192.168.0.104:7000 192.168.0.104:7001 192.168.0.104:7002 192.168.0.104:7003 192.168.0.104 :7004 192.168.0.104:7005 --cluster-replicas 1
第七步:验证集群:
(1)连接任意一个客户端即可:./redis-cli -c -h -p (-c表示集群模式,指定ip地址和端口 号)如:/usr/local/redis/bin/redis-cli -c -h 127.0.0.1 -p 800*
(2)进行验证:cluster info(查看集群信息)、cluster nodes(查看节点列表)
(3)进行数据操作验证
(4)关闭集群则需要逐个进行关闭,使用命令:/usr/local/redis/bin/redis-cli -c -h 127.0.0.1 -p 800* shutdown
3.集群伸缩
1.扩容集群
1.准备新节点
2.加入集群
使用redis-cli 语法:add-node 新节点ip 端口 已存在节点ip 端口
使用原生命令 语法:cluster meet ip port
指定主从
使用redis-cli 语法(加入时指定):add-node 新节点ip 端口 已存在节点ip 端口 --cluster-slave --cluster-master-id masterID
使用原生命令 语法:cluster replicate node-id
3.迁移槽和数据
1.槽迁移计划
语法:/redis-cli --cluster reshard 已存在节点ip :端口
/usr/local/bin/redis-cli --cluster reshard 192.168.204.188:7000
2.迁移数据
执行流程:提示要分配多少槽-》接收节点ID-》all/done
3.添加从节点
2.缩容集群
1.下线迁移槽
语法:redis-cli --cluster reshard --cluster-from 要迁出节点ID --cluster-to 接收槽节点ID --cluster-slots 迁出槽数量 已存在节点ip 端口
/usr/local/bin/redis-cli --cluster reshard --cluster-from a2fdd1359d03acacf2a6e558acbc006639445d53 --cluster-to 1794864d5f8af79e88cfc0f699f02b6341c78b5c --cluster-slots 1366 192.168.0.104 7000
2.忘记节点.关闭节点
语法:redis-cli --cluster del-node 已存在节点IP:端口 要删除的节点ID
/usr/local/bin/redis-cli --cluster del-node 192.168.0.104:7000 8de55e2a7419983184cede9daab5d36ee9da1fa3
4.cluster客户端
1.moved重定向:指我们发送命令时,会对发送的key进行crc16算法,得到一个数字,然而我们连接的客户端并不是管理这个数字的范围,所以会返回错误并告诉你此key应该对应的槽位,然后客户端需要捕获此异常,重新发起请求到对应的槽位
2.asx重定向:指在我们送发命令时,对应的客户端正在迁移槽位中,所以此时我们不能确定这个key是还在旧的节点中还是新的节点中
3.smart客户端
1.从集群中选取一个可运行节点,使用cluster slots初始化槽和节点映射。
2.将cluster slots的结果映射到本地,为每个节点创建jedispool
3.准备执行命令
5.故障转移(与哨兵相似)
1.故障发现:通过ping/pong消息实现故障发现(不依赖sentinel)
2.故障恢复
1.检查资格
1.每个从节点检查与主节点的断开时间
超过cluster-node-timeout * cluster-replica-validity-factor 时间取消资格
2.选择偏移量最大的
替换主节点
1.当前从节点取消复制变为主节点(slaveof no one)
2.撤销以前主节点的槽位,给新的主节点
3.向集群广播消息,表明已经替换了故障节点
总结
redis集群演变过程
1.单机版
核心技术:持久化
持久化是最简单的高可用方法(有时甚至不被归为高可用的手段),主要作用是数据备份,即将数据存储在硬盘,保证数据不会因进程退出而丢失。
2.主从复制
复制是高可用Redis的基础,哨兵和集群都是在复制基础上实现高可用的。复制主要实现了数据的多机备份,以及对于读操作的负载均衡和简单的故障恢复。缺陷是故障恢复无法自动化;写操作无法负载均衡;存储能力受到单机的限制。
3.哨兵
在复制的基础上,哨兵实现了自动化的故障恢复。缺陷是写操作无法负载均衡;存储能力受到单机的限制。
4.cluster集群
通过集群,Redis解决了写操作无法负载均衡,以及存储能力受到单机限制的问题,实现了较为完善的高可用方案
redis5.x新特性 Streams
基础知识
介绍:Stream是Redis的数据类型中最复杂的,尽管数据类型本身非常简单,它实现了额外的非强制性的特性:提供了一组允许消费者以阻塞的方式等待生产者向Stream中发送的新消息,此外还有一个名为消费者组的概念。
消费者组最早是由名为Kafka(TM)的流行消息系统引入的。Redis用完全不同的术语重新实现了一个相似的概念,但目标是相同的:允许一组客户端相互配合来消费同一个Stream的不同部分的消息。
写: 因为Streams是只附加数据结构,基本的写命令,叫XADD,向指定的Stream追加一个新的条目。一个Stream条目不是简单的字符串,而是由一个或多个键值对组成的。这样一来,Stream的每一个条目就已经是结构化的,就像以CSV格式写的只附加文件一样,每一行由多个逗号割开的字段组成。
1. `XADD mystream * sensor-id 1234 temperature 19.8`
2. `返回`
3. `1518951480106-0`
上面的例子中,调用了XADD命令往名为 mystream
的Stream中添加了一个条目 sensor-id:123,temperature:19.8
,使用了自动生成的条目ID,也就是命令返回的值,具体在这里是 1518951480106-0
。命令的第一个参数是key的名称 mystream
,第二个参数是用于唯一确认Stream中每个条目的条目ID。然而,在这个例子中,我们传入的参数值是 *
,因为我们希望由Redis服务器为我们自动生成一个新的ID。每一个新的ID都会单调增长,简单来讲就是,每次新添加的条目都会拥有一个比其它所有条目更大的ID。由服务器自动生成ID几乎总是我们所想要的,需要显式指定ID的情况非常少见。
获取数量: 使用XLEN命令来获取一个Stream的条目数量:
1. `XLEN mystream`
2. `返回`
3. `1`
条目 ID:
条目ID由XADD命令返回,并且可以唯一的标识给定Stream中的每一个条目,由两部分组成:
1. `<millisecondsTime>-<sequenceNumber>`
毫秒时间部分实际是生成Stream ID的Redis节点的服务器本地时间,但是如果当前毫秒时间戳比以前的条目时间戳小的话,那么会使用以前的条目时间,所以即便是服务器时钟向后跳,单调增长ID的特性仍然会保持不变。序列号用于以相同毫秒创建的条目。由于序列号是64位的,所以实际上对于在同一毫秒内生成的条目数量是没有限制的。
这样的ID格式也许最初看起来有点奇怪,也许温柔的读者会好奇为什么时间会是ID的一部分。其实是因为Redis Streams支持按ID进行范围查询。由于ID与生成条目的时间相关,因此可以很容易地按时间范围进行查询。我们在后面讲到XRANGE命令时,很快就能明白这一点。
如果由于某些原因,用户需要与时间无关但实际上与另一个外部系统ID关联的增量ID,就像前面所说的,XADD命令可以带上一个显式的ID,而不是使用通配符 *
来自动生成,如下所示:
1. `> XADD somestream 0-1 field value`
2. `0-1`
3. `> XADD somestream 0-2 foo bar`
4. `0-2`
请注意,在这种情况下,最小ID为0-1,并且命令不接受等于或小于前一个ID的ID
按范围查询: XRANGE 和 XREVRANGE
1. `XRANGE mystream - +`
2. `1) 1) "1584001577343-0"`
3. `2) 1) "sensor-id"`
4. `2) "1234"`
5. `3) "temperature"`
6. `4) "19.8"`
返回的每个条目都是有两个元素的数组:ID和键值对列表。
例如,我可能想要查询两毫秒时间,可以这样使用:
1. `> XRANGE mystream 15189514801061518951480107`
2. `1) 1) 1518951480106-0`
3. `2) 1) "sensor-id"`
4. `2) "1234"`
5. `3) "temperature"`
6. `4) "19.8"`
利用XRANGE分页查询:
XRANGE命令支持在最后放一个可选的COUNT选项。通过指定一个count,我可以只获取前面N个项目。
1. `> XRANGE mystream - + COUNT 2`
2. `1) 1) 1519073278252-0`
3. `2) 1) "foo"`
4. `2) "value_1"`
5. `2) 1) 1519073279157-0`
6. `2) 1) "foo"`
7. `2) "value_2"`
如果我想要更多,我可以拿返回的最后一个ID,在序列号部分加1,然后再次查询。
1. `> XRANGE mystream 1519073279157-1+ COUNT 2`
2. `1) 1) 1519073280281-0`
3. `2) 1) "foo"`
4. `2) "value_3"`
5. `2) 1) 1519073281432-0`
6. `2) 1) "foo"`
7. `2) "value_4"`
XREVRANGE命令与XRANGE相同
请注意:XREVRANGE命令以相反的顺序获取start 和 stop参数。
使用XREAD监听新项目
当我们不想按照Stream中的某个范围访问项目时,我们通常想要的是订阅到达Stream的新项目。这个概念可能与Redis中你订阅频道的Pub/Sub或者Redis的阻塞列表有关,在这里等待某一个key去获取新的元素,但是这跟你消费Stream有着根本的不同:
-
一个Stream可以拥有多个客户端(消费者)在等待数据。默认情况下,对于每一个新项目,都会被分发到等待给定Stream的数据的每一个消费者。这个行为与阻塞列表不同,每个消费者都会获取到不同的元素。但是,扇形分发到多个消费者的能力与Pub/Sub相似。
-
虽然在Pub/Sub中的消息是fire and forget并且从不存储,以及使用阻塞列表时,当一个客户端收到消息时,它会从列表中弹出(有效删除),Stream从跟本上以一种不同的方式工作。所有的消息都被无限期地附加到Stream中(除非用户明确地要求删除这些条目):不同的消费者通过记住收到的最后一条消息的ID,从其角度知道什么是新消息。
-
Streams 消费者组提供了一种Pub/Sub或者阻塞列表都不能实现的控制级别,同一个Stream不同的群组,显式地确认已经处理的项目,检查待处理的项目的能力,申明未处理的消息,以及每个消费者拥有连贯历史可见性,单个客户端只能查看自己过去的消息历史记录。
提供监听到达Stream的新消息的能力的命令称为XREAD。
1. `> XREAD COUNT 2 STREAMS mystream 0`
2. `1) 1) "mystream"`
3. `2) 1) 1) 1519073278252-0`
4. `2) 1) "foo"`
5. `2) "value_1"`
6. `2) 1) 1519073279157-0`
7. `2) 1) "foo"`
8. `2) "value_2"`
以上是XREAD的非阻塞形式。注意COUNT选项并不是必需的,实际上这个命令唯一强制的选项是STREAMS,指定了一组key以及调用者已经看到的每个Stream相应的最大ID,以便该命令仅向客户端提供ID大于我们指定ID的消息。
在上面的命令中,我们写了 STREAMS mystream0
,所以我们想要流 mystream
中所有ID大于 0-0
的消息。正如你在上面的例子中所看到的,命令返回了键名,因为实际上可以通过传入多个key来同时从不同的Stream中读取数据。我可以写一下,例如: STREAMS mystream otherstream00
。注意在STREAMS选项后面,我们需要提供键名称,以及之后的ID。因此,STREAMS选项必须始终是最后一个。
除了XREAD可以同时访问多个Stream这一事实,以及我们能够指定我们拥有的最后一个ID来获取之后的新消息,在个简单的形式中,这个命令并没有做什么跟XRANGE有太大区别的事情。然而,有趣的部分是我们可以通过指定BLOCK参数,轻松地将XREAD 变成一个 阻塞命令:
1. `XREAD BLOCK 0 STREAMS mystream <pre style="margin: 0px; padding: 8px 0px 6px; max-width: 100%; box-sizing: border-box; word-wrap: break-word !important; background: rgb(27, 25, 24); border-radius: 0px; overflow-y: auto; color: rgb(80, 97, 109); text-align: start; font-size: 10px; line-height: 12px; font-family: consolas, menlo, courier, monospace, "Microsoft Yahei" !important; border-width: 1px !important; border-style: solid !important; border-color: rgb(226, 226, 226) !important;"
我指定了新的BLOCK选项,超时时间为0毫秒(意味着永不超时)。此外,我并没有给流 mystream
传入一个常规的ID,而是传入了一个特殊的ID $
。这个特殊的ID意思是XREAD应该使用流 mystream
已经存储的最大ID作为最后一个ID。以便我们仅接收从我们开始监听时间以后的新消息。这在某种程度上相似于Unix命令 tail-f
请注意当使用BLOCK选项时,我们不必使用特殊ID $
。我们可以使用任意有效的ID。如果命令能够立即处理我们的请求而不会阻塞,它将执行此操作,否则它将阻止。通常如果我们想要从新的条目开始消费Stream,我们以 $
开始,接着继续使用接收到的最后一条消息的ID来发起下一次请求,依此类推。
XREAD的阻塞形式同样可以监听多个Stream,只需要指定多个键名即可。如果请求可以同步提供,因为至少有一个流的元素大于我们指定的相应ID,则返回结果。否则,该命令将阻塞并将返回获取新数据的第一个流的项目(根据提供的ID)。
跟阻塞列表的操作类似,从等待数据的客户端角度来看,阻塞流读取是公正的,由于语义是FIFO样式。阻塞给定Stream的第一个客户端是第一个在新项目可用时将被解除阻塞的客户端。
XREAD命令没有除了COUNT 和 BLOCK以外的其他选项,因此它是一个非常基本的命令,具有特定目的来攻击消费者一个或多个流。使用消费者组API可以用更强大的功能来消费Stream,但是通过消费者组读取是通过另外一个不同的命令来实现的,称为XREADGROUP。下面将介绍。
消费者组
当手头的任务是从不同的客户端消费同一个Stream,那么XREAD已经提供了一种方式可以扇形分发到N个客户端,还可以使用从节点来提供更多的读取可伸缩性。然而,在某些问题中,我们想要做的不是向许多客户端提供相同的消息流,而是从同一流向许多客户端提供不同的消息子集。这很有用的一个明显的例子是处理消息的速度很慢:能够让N个不同的客户端接收流的不同部分,通过将不同的消息路由到准备做更多工作的不同客户端来扩展消息处理工作。
实际上,假如我们想象有三个消费者C1,C2,C3,以及一个包含了消息1, 2, 3, 4, 5, 6, 7的Stream,我们想要按如下图表的方式处理消息:
1. `1-> C1`
2. `2-> C2`
3. `3-> C3`
4. `4-> C1`
5. `5-> C2`
6. `6-> C3`
7. `7-> C1`
为了获得这个效果,Redis使用了一个名为消费者组的概念。非常重要的一点是,从实现的角度来看,Redis的消费者组与Kafka (TM) 消费者组没有任何关系,它们只是从实施的概念上来看比较相似,所以我决定不改变最初普及这种想法的软件产品已有的术语。
消费者组就像一个伪消费者,从流中获取数据,实际上为多个消费者提供服务,提供某些保证:
-
每条消息都提供给不同的消费者,因此不可能将相同的消息传递给多个消费者。
-
消费者在消费者组中通过名称来识别,该名称是实施消费者的客户必须选择的区分大小写的字符串。这意味着即便断开连接过后,消费者组仍然保留了所有的状态,因为客户端会重新申请成为相同的消费者。然而,这也意味着由客户端提供唯一的标识符。
-
每一个消费者组都有一个第一个ID永远不会被消费的概念,这样一来,当消费者请求新消息时,它能提供以前从未传递过的消息。
-
消费消息需要使用特定的命令进行显式确认,表示:这条消息已经被正确处理了,所以可以从消费者组中逐出。
-
消费者组跟踪所有当前所有待处理的消息,也就是,消息被传递到消费者组的一些消费者,但是还没有被确认为已处理。由于这个特性,当访问一个Stream的历史消息的时候,每个消费者将只能看到传递给它的消息。
如果你把消费者组看成Redis Stream的辅助数据结构,很明显单个Stream可以拥有多个消费者组,每个消费者组都有一组消费者。实际上,同一个Stream甚至可以通过XREAD让客户端在没有消费者组的情况下读取,同时有客户端通过XREADGROUP在不同的消费者组中读取。
消费者组命令,具体如下:
-
XGROUP 用于创建,摧毁或者管理消费者组。
-
XREADGROUP 用于通过消费者组从一个Stream中读取。
-
XACK 是允许消费者将待处理消息标记为已正确处理的命令。
创建一个消费者组
假设我已经存在类型流的 mystream
,为了创建消费者组,我只需要做:
1. `> XGROUP CREATE mystream mygroup <pre style="margin: 0px; padding: 8px 0px 6px; max-width: 100%; box-sizing: border-box; word-wrap: break-word !important; background: rgb(27, 25, 24); border-radius: 0px; overflow-y: auto; color: rgb(80, 97, 109); text-align: start; font-size: 10px; line-height: 12px; font-family: consolas, menlo, courier, monospace, "Microsoft Yahei" !important; border-width: 1px !important; border-style: solid !important; border-color: rgb(226, 226, 226) !important;"
2. `OK`
请注意:目前还不能为不存在的Stream创建消费者组,但有可能在不久的将来我们会给XGROUP命令增加一个选项,以便在这种场景下可以创建一个空的Stream。
如你所看到的上面这个命令,当创建一个消费者组的时候,我们必须指定一个ID,在这个例子中ID是 $
。这是必要的,因为消费者组在其他状态中必须知道在第一个消费者连接时接下来要服务的消息,即消费者组创建完成时的最后消息ID是什么?如果我们就像上面例子一样,提供一个 $
,那么只有从现在开始到达Stream的新消息才会被传递到消费者组中的消费者。如果我们指定的消息ID是 0
,那么消费者组将会开始消费这个Stream中的所有历史消息。当然,你也可以指定任意其他有效的ID。你所知道的是,消费者组将开始传递ID大于你所指定的ID的消息。因为 $
表示Stream中当前最大ID的意思,指定 $
会有只消费新消息的效果。
现在消费者组创建好了,我们可以使用XREADGROUP命令立即开始尝试通过消费者组读取消息。我们会从消费者那里读到,假设指定消费者分别是Alice和Bob,来看看系统会怎样返回不同消息给Alice和Bob。
XREADGROUP和XREAD非常相似,并且提供了相同的BLOCK选项,除此以外还是一个同步命令。但是有一个强制的选项必须始终指定,那就是GROUP,并且有两个参数:消费者组的名字,以及尝试读取的消费者的名字。选项COUNT仍然是支持的,并且与XREAD命令中的用法相同。
在开始从Stream中读取之前,让我们往里面放一些消息:
1. `192.168.204.188:6379> XADD mystream * message apple`
2. `"1584343935899-0"`
3. `192.168.204.188:6379> XADD mystream * message orange`
4. `"1584343943844-0"`
5. `192.168.204.188:6379> XADD mystream * message strawberry`
6. `"1584343952864-0"`
7. `192.168.204.188:6379> XADD mystream * message apricot`
8. `"1584343960108-0"`
9. `192.168.204.188:6379> XADD mystream * message banana`
10. `"1584343967223-0"`
请注意:在这里消息是字段名称,水果是关联的值,记住Stream中的每一项都是小字典。
XREADGROUP的响应内容就像XREAD一样。但是请注意上面提供的 GROUP<group-name><consumer-name>
,这表示我想要使用消费者组 mygroup
从Stream中读取,我是消费者 Alice
。每次消费者使用消费者组中执行操作时,都必须要指定可以这个消费者组中唯一标识它的名字。
在以上命令行中还有另外一个非常重要的细节,在强制选项STREAMS之后,键 mystream
请求的ID是特殊的ID >
。这个特殊的ID只在消费者组的上下文中有效,其意思是:消息到目前为止从未传递给其他消费者。
这几乎总是你想要的,但是也可以指定一个真实的ID,比如 0
或者任何其他有效的ID,在这个例子中,我们请求XREADGROUP只提供给我们历史待处理的消息,在这种情况下,将永远不会在组中看到新消息。所以基本上XREADGROUP可以根据我们提供的ID有以下行为:
如果ID是特殊ID >
,那么命令将会返回到目前为止从未传递给其他消费者的新消息,这有一个副作用,就是会更新消费者组的最后ID。如果ID是任意其他有效的数字ID,那么命令将会让我们访问我们的历史待处理消息。即传递给这个指定消费者(由提供的名称标识)的消息集,并且到目前为止从未使用XACK进行确认。
我们可以立即测试此行为,指定ID为0,不带任何COUNT选项:我们只会看到唯一的待处理消息,即关于apples的消息:
1. `> XREADGROUP GROUP mygroup Alice STREAMS mystream 0`
2. `1) 1) "mystream"`
3. `2) 1) 1) 1526569495631-0`
4. `2) 1) "message"`
5. `2) "apple"`
但是,如果我们确认这个消息已经处理,它将不再是历史待处理消息的一部分,因此系统将不再报告任何消息:
1. `> XACK mystream mygroup 1526569495631-0`
2. `(integer) 1`
3. `> XREADGROUP GROUP mygroup Alice STREAMS mystream 0`
4. `1) 1) "mystream"`
5. `2) (empty list orset)`
如果你还不清楚XACK是如何工作的,请不用担心,这个概念只是已处理的消息不再是我们可以访问的历史记录的一部分。
现在轮到Bob来读取一些东西了:
1. `> XREADGROUP GROUP mygroup Bob COUNT 2 STREAMS mystream >`
2. `1) 1) "mystream"`
3. `2) 1) 1) 1526569498055-0`
4. `2) 1) "message"`
5. `2) "orange"`
6. `2) 1) 1526569506935-0`
7. `2) 1) "message"`
8. `2) "strawberry"`
Bob要求最多两条消息,并通过同一消费者组 mygroup
读取。所以发生的是Redis仅报告新消息。正如你所看到的,消息”apple”未被传递,因为它已经被传递给Alice,所以Bob获取到了orange和strawberry,以此类推。
这样,Alice,Bob以及这个消费者组中的任何其他消费者,都可以从相同的Stream中读取到不同的消息,读取他们尚未处理的历史消息,或者标记消息为已处理。这允许创建不同的拓扑和语义来从Stream中消费消息。
有几件事需要记住:
-
消费者是在他们第一次被提及的时候自动创建的,不需要显式创建。
-
即使使用XREADGROUP,你也可以同时从多个key中读取,但是要让其工作,你需要给每一个Stream创建一个名称相同的消费者组。这并不是一个常见的需求,但是需要说明的是,这个功能在技术上是可以实现的。
-
XREADGROUP命令是一个写命令,因为当它从Stream中读取消息时,消费者组被修改了,所以这个命令只能在master节点调用。
这里的想法是开始消费历史消息,即我们的待处理消息列表。这很有用,因为消费者可能已经崩溃,因此在重新启动时,我们想要重新读取那些已经传递给我们但还没有确认的消息。通过这种方式,我们可以多次或者一次处理消息(至少在消费者失败的场景中是这样,但是这也受到Redis持久化和复制的限制,请参阅有关此主题的特定部分)。消耗历史消息后,我们将得到一个空的消息列表,我们可以切换到 >
,使用特殊ID来消费新消息。
从永久性失败中恢复
上面的例子允许我们编写多个消费者参与同一个消费者组,每个消费者获取消息的一个子集进行处理,并且在故障恢复时重新读取各自的待处理消息。然而在现实世界中,消费者有可能永久地失败并且永远无法恢复。由于任何原因停止后,消费者的待处理消息会发生什么呢?
Redis的消费者组提供了一个专门针对这种场景的特性,用以认领给定消费者的待处理消息,这样一来,这些消息就会改变他们的所有者,并且被重新分配给其他消费者。这个特性是非常明确的,消费者必须检查待处理消息列表,并且必须使用特殊命令来认领特定的消息,否则服务器将把待处理的消息永久分配给旧消费者,这样不同的应用程序就可以选择是否使用这样的特性,以及使用它的方式。
这个过程的第一步是使用一个叫做XPENDING的命令,这个命令提供消费者组中待处理条目的可观察性。这是一个只读命令,它总是可以安全地调用,不会改变任何消息的所有者。在最简单的形式中,调用这个命令只需要两个参数,即Stream的名称和消费者组的名称。
1. `> XPENDING mystream mygroup`
2. `1) (integer) 2`
3. `2) 1526569498055-0`
4. `3) 1526569506935-0`
5. `4) 1) 1) "Bob"`
6. `2) "2"`
当以这种方式调用的时候,命令只会输出给定消费者组的待处理消息总数(在本例中是两条消息),所有待处理消息中的最小和最大的ID,最后是消费者列表和每个消费者的待处理消息数量。我们只有Bob有两条待处理消息,因为Alice请求的唯一一条消息已使用XACK确认了。
我们可以通过给XPENDING命令传递更多的参数来获取更多信息,完整的命令签名如下:
1. `XPENDING <key><groupname>[<start-id> <end-id> <count>[<conusmer-name>]]`
通过提供一个开始和结束ID(可以只是 -
和 +
,就像XRANGE一样),以及一个控制命令返回的信息量的数字,我们可以了解有关待处理消息的更多信息。如果我们想要将输出限制为仅针对给定使用者组的待处理消息,可以使用最后一个可选参数,即消费者组的名称,但我们不会在以下示例中使用此功能。
1. `> XPENDING mystream mygroup - + 10`
2. `1) 1) 1526569498055-0`
3. `2) "Bob"`
4. `3) (integer) 74170458`
5. `4) (integer) 1`
6. `2) 1) 1526569506935-0`
7. `2) "Bob"`
8. `3) (integer) 74170458`
9. `4) (integer) 1`
现在我们有了每一条消息的详细信息:消息ID,消费者名称,空闲时间(单位是毫秒,意思是:自上次将消息传递给某个消费者以来经过了多少毫秒),以及每一条给定的消息被传递了多少次。我们有来自Bob的两条消息,它们空闲了74170458毫秒,大概20个小时。
请注意,没有人阻止我们检查第一条消息内容是什么,使用XRANGE即可。
1. `> XRANGE mystream 1526569498055-01526569498055-0`
2. `1) 1) 1526569498055-0`
3. `2) 1) "message"`
4. `2) "orange"`
我们只需要在参数中重复两次相同的ID。现在我们有了一些想法,Alice可能会根据过了20个小时仍然没有处理这些消息,来判断Bob可能无法及时恢复,所以现在是时候认领这些消息,并继续代替Bob处理了。为了做到这一点,我们使用XCLAIM命令。
这个命令非常的复杂,并且在其完整形式中有很多选项,因为它用于复制消费者组的更改,但我们只使用我们通常需要的参数。在这种情况下,它就像调用它一样简单:
1. `XCLAIM <key><group><consumer><min-idle-time> <ID-1> <ID-2> ... <ID-N>`
基本上我们说,对于这个特定的Stream和消费者组,我希望指定的ID的这些消息可以改变他们的所有者,并将被分配到指定的消费者 <consumer>
。但是,我们还提供了最小空闲时间,因此只有在上述消息的空闲时间大于指定的空闲时间时,操作才会起作用。这很有用,因为有可能两个客户端会同时尝试认领一条消息:
1. `Client1: XCLAIM mystream mygroup Alice36000001526569498055-0`
2. `Clinet2: XCLAIM mystream mygroup Lora36000001526569498055-0`
然而认领一条消息的副作用是会重置它的闲置时间!并将增加其传递次数的计数器,所以上面第二个客户端的认领会失败。通过这种方式,我们可以避免对消息进行简单的重新处理(即使是在一般情况下,你仍然不能获得准确的一次处理)。
下面是命令执行的结果:
1. `> XCLAIM mystream mygroup Alice36000001526569498055-0`
2. `1) 1) 1526569498055-0`
3. `2) 1) "message"`
4. `2) "orange"`
Alice成功认领了该消息,现在可以处理并确认消息,尽管原来的消费者还没有恢复,也能往前推动。
从上面的例子很明显能看到,作为成功认领了指定消息的副作用,XCLAIM命令也返回了消息数据本身。但这不是强制性的。可以使用JUSTID选项,以便仅返回成功认领的消息的ID。如果你想减少客户端和服务器之间的带宽使用量的话,以及考虑命令的性能,这会很有用,并且你不会对消息感兴趣,因为稍后你的消费者的实现方式将不时地重新扫描历史待处理消息。
认领也可以通过一个独立的进程来实现:这个进程只负责检查待处理消息列表,并将空闲的消息分配给看似活跃的消费者。可以通过Redis Stream的可观察特性获得活跃的消费者。这是下一个章节的主题。
消息认领及交付计数器
在XPENDING的输出中,你所看到的计数器是每一条消息的交付次数。这样的计数器以两种方式递增:消息通过XCLAIM成功认领时,或者调用XREADGROUP访问历史待处理消息时。
当出现故障时,消息被多次传递是很正常的,但最终它们通常会得到处理。但有时候处理特定的消息会出现问题,因为消息会以触发处理代码中的bug的方式被损坏或修改。在这种情况下,消费者处理这条特殊的消息会一直失败。因为我们有传递尝试的计数器,所以我们可以使用这个计数器来检测由于某些原因根本无法处理的消息。所以一旦消息的传递计数器达到你给定的值,比较明智的做法是将这些消息放入另外一个Stream,并给系统管理员发送一条通知。这基本上是Redis Stream实现的dead letter概念的方式。
Streams 的可观察性
缺乏可观察性的消息系统很难处理。不知道谁在消费消息,哪些消息待处理,不知道给定Stream的活跃消费者组的集合,使得一切都不透明。因此,Redis Stream和消费者组都有不同的方式来观察正在发生的事情。我们已经介绍了XPENDING,它允许我们检查在给定时刻正在处理的消息列表,以及它们的空闲时间和传递次数。
但是,我们可能希望做更多的事情,XINFO命令是一个可观察性接口,可以与子命令一起使用,以获取有关Stream或消费者组的信息。
这个命令使用子命令来显示有关Stream和消费者组的状态的不同信息,比如使用*XINFO STREAM *可以报告关于Stream本身的信息。
1. `> XINFO STREAM mystream`
2. `1) length`
3. `2) (integer) 13`
4. `3) radix-tree-keys`
5. `4) (integer) 1`
6. `5) radix-tree-nodes`
7. `6) (integer) 2`
8. `7) groups`
9. `8) (integer) 2`
10. `9) first-entry`
11. `10) 1) 1524494395530-0`
12. `2) 1) "a"`
13. `2) "1"`
14. `3) "b"`
15. `4) "2"`
16. `11) last-entry`
17. `12) 1) 1526569544280-0`
18. `2) 1) "message"`
19. `2) "banana"`
输出显示了有关如何在内部编码Stream的信息,以及显示了Stream的第一条和最后一条消息。另一个可用的信息是与这个Stream相关联的消费者组的数量。我们可以进一步挖掘有关消费者组的更多信息。
1. `> XINFO GROUPS mystream`
2. `1) 1) name`
3. `2) "mygroup"`
4. `3) consumers`
5. `4) (integer) 2`
6. `5) pending`
7. `6) (integer) 2`
8. `2) 1) name`
9. `2) "some-other-group"`
10. `3) consumers`
11. `4) (integer) 1`
12. `5) pending`
13. `6) (integer) 0`
正如你在这里和前面的输出中看到的,XINFO命令输出一系列键值对。因为这是一个可观察性命令,允许人类用户立即了解报告的信息,并允许命令通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他更高带宽效率的命令,比如XPENDING,只报告没有字段名称的信息。
上面例子中的输出(使用了子命令GROUPS)应该能清楚地观察字段名称。我们可以通过检查在此类消费者组中注册的消费者,来更详细地检查特定消费者组的状态。
1. `> XINFO CONSUMERS mystream mygroup`
2. `1) 1) name`
3. `2) "Alice"`
4. `3) pending`
5. `4) (integer) 1`
6. `5) idle`
7. `6) (integer) 9104628`
8. `2) 1) name`
9. `2) "Bob"`
10. `3) pending`
11. `4) (integer) 1`
12. `5) idle`
13. `6) (integer) 83841983`
如果你不记得命令的语法,只需要查看命令本身的帮助:
1. `> XINFO HELP`
2. `1) XINFO <subcommand> arg arg ... arg. Subcommands are:`
3. `2) CONSUMERS <key><groupname>-- Show consumer groups of group<groupname>.`
4. `3) GROUPS <key>-- Show the stream consumer groups.`
5. `4) STREAM <key>-- Show information about the stream.`
6. `5) HELP -- Printthis help.`
与Kafka(TM)分区的差异
Redis Stream的消费者组可能类似于基于Kafka(TM)分区的消费者组,但是要注意Redis Stream实际上非常不同。分区仅仅是逻辑的,并且消息只是放在一个Redis键中,因此不同客户端的服务方式取决于谁准备处理新消息,而不是从哪个分区客户端读取。例如,如果消费者C3在某一点永久故障,Redis会继续服务C1和C2,将新消息送达,就像现在只有两个逻辑分区一样。
类似地,如果一个给定的消费者在处理消息方面比其他消费者快很多,那么这个消费者在相同单位时间内按比例会接收更多的消息。这是有可能的,因为Redis显式地追踪所有未确认的消息,并且记住了谁接收了哪些消息,以及第一条消息的ID从未传递给任何消费者。
但是,这也意味着在Redis中,如果你真的想把同一个Stream的消息分区到不同的Redis实例中,你必须使用多个key和一些分区系统,比如Redis集群或者特定应用程序的分区系统。单个Redis Stream不会自动分区到多个实例上。
我们可以说,以下是正确的:
-
如果你使用一个Stream对应一个消费者,则消息是按顺序处理的。
-
如果你使用N个Stream对应N个消费者,那么只有给定的消费者hits N个Stream的子集,你可以扩展上面的模型来实现。
-
如果你使用一个Stream对应多个消费者,则对N个消费者进行负载平衡,但是在那种情况下,有关同一逻辑项的消息可能会无序消耗,因为给定的消费者处理消息3可能比另一个消费者处理消息4要快。
所以基本上Kafka分区更像是使用了N个不同的Redis键。而Redis消费者组是一个将给定Stream的消息负载均衡到N个不同消费者的服务端负载均衡系统。
设置Streams的上限
许多应用并不希望将数据永久收集到一个Stream。有时在Stream中指定一个最大项目数很有用,之后一旦达到给定的大小,将数据从Redis中移到不那么快的非内存存储是有用的,适合用来记录未来几十年的历史数据。Redis Stream对此有一定的支持。这就是XADD命令的MAXLEN选项,这个选项用起来很简单:
1. `> XADD mystream MAXLEN 2* value 1`
2. `1526654998691-0`
3. `> XADD mystream MAXLEN 2* value 2`
4. `1526654999635-0`
5. `> XADD mystream MAXLEN 2* value 3`
6. `1526655000369-0`
7. `> XLEN mystream`
8. `(integer) 2`
9. `> XRANGE mystream - +`
10. `1) 1) 1526654999635-0`
11. `2) 1) "value"`
12. `2) "2"`
13. `2) 1) 1526655000369-0`
14. `2) 1) "value"`
15. `2) "3"`
如果使用MAXLEN选项,当Stream的达到指定长度后,老的条目会自动被驱逐,因此Stream的大小是恒定的。目前还没有选项让Stream只保留给定数量的条目,因为为了一致地运行,这样的命令必须为了驱逐条目而潜在地阻塞很长时间。比如可以想象一下如果存在插入尖峰,然后是长暂停,以及另一次插入,全都具有相同的最大时间。Stream会阻塞来驱逐在暂停期间变得太旧的数据。因此,用户需要进行一些规划并了解Stream所需的最大长度。此外,虽然Stream的长度与内存使用是成正比的,但是按时间来缩减不太容易控制和预测:这取决于插入速率,该变量通常随时间变化(当它不变化时,那么按尺寸缩减是微不足道的)。
然而使用MAXLEN进行修整可能很昂贵:Stream由宏节点表示为基数树,以便非常节省内存。改变由几十个元素组成的单个宏节点不是最佳的。因此可以使用以下特殊形式提供命令:
1. `XADD mystream MAXLEN ~ 1000* ... entry fields here ...`
在选项MAXLEN和实际计数中间的参数 ~
的意思是,我不是真的需要精确的1000个项目。它可以是1000或者1010或者1030,只要保证至少保存1000个项目就行。通过使用这个参数,仅当我们移除整个节点的时候才执行修整。这使得命令更高效,而且这也是我们通常想要的。
还有XTRIM命令可用,它做的事情与上面讲到的MAXLEN选项非常相似,但是这个命令不需要添加任何其他参数,可以以独立的方式与Stream一起使用。
1. `> XTRIM mystream MAXLEN 10`
或者,对于XADD选项:
1. `> XTRIM mystream MAXLEN ~ 10`
但是,XTRIM旨在接受不同的修整策略,虽然现在只实现了MAXLEN。鉴于这是一个明确的命令,将来有可能允许按时间来进行修整,因为以独立的方式调用这个命令的用户应该知道她或者他正在做什么。
一个有用的驱逐策略是,XTRIM应该具有通过一系列ID删除的能力。目前这是不可能的,但在将来可能会实现,以便更方便地使用XRANGE 和 XTRIM来将Redis中的数据移到其他存储系统中(如果需要)。
持久化,复制和消息安全性
与任何其他Redis数据结构一样,Stream会异步复制到从节点,并持久化到AOF和RDB文件中。但可能不那么明显的是,消费者组的完整状态也会传输到AOF,RDB和从节点,因此如果消息在主节点是待处理的状态,在从节点也会是相同的信息。同样,节点重启后,AOF文件会恢复消费者组的状态。
但是请注意,Redis Stream和消费者组使用Redis默认复制来进行持久化和复制,所以:
-
如果消息的持久性在您的应用程序中很重要,则AOF必须与强大的fsync策略一起使用。
-
默认情况下,异步复制不能保证复制XADD命令或者消费者组的状态更改:在故障转移后,可能会丢失某些内容,具体取决于从节点从主节点接收数据的能力。
-
WAIT命令可以用于强制将更改传输到一组从节点上。但请注意,虽然这使得数据不太可能丢失,但由Sentinel或Redis群集运行的Redis故障转移过程仅执行尽力检查以故障转移到最新的从节点,并且在某些特定故障下可能会选举出缺少一些数据的从节点。因此,在使用Redis Stream和消费者组设计应用程序时,确保了解你的应用程序在故障期间应具有的语义属性,并进行相应地配置,评估它是否足够安全地用于您的用例。
从Stream中删除单个项目
Stream还有一个特殊的命令可以通过ID从中间移除项目。一般来讲,对于一个只附加的数据结构来说,这也许看起来是一个奇怪的特征,但实际上它对于涉及例如隐私法规的应用程序是有用的。这个命令称为XDEL,调用的时候只需要传递Stream的名称,在后面跟着需要删除的ID即可:
1. `> XRANGE mystream - + COUNT 2`
2. `1) 1) 1526654999635-0`
3. `2) 1) "value"`
4. `2) "2"`
5. `2) 1) 1526655000369-0`
6. `2) 1) "value"`
7. `2) "3"`
8. `> XDEL mystream 1526654999635-0`
9. `(integer) 1`
10. `> XRANGE mystream - + COUNT 2`
11. `1) 1) 1526655000369-0`
12. `2) 1) "value"`
13. `2) "3"`
但是在当前的实现中,在宏节点完全为空之前,内存并没有真正回收,所以你不应该滥用这个特性。
零长度Stream
Stream与其他Redis数据结构有一个不同的地方在于,当其他数据结构没有元素的时候,调用删除元素的命令会把key本身删掉。举例来说就是,当调用ZREM命令将有序集合中的最后一个元素删除时,这个有序集合会被彻底删除。但Stream允许在没有元素的时候仍然存在,不管是因为使用MAXLEN选项的时候指定了count为零(在XADD和XTRIM命令中),或者因为调用了XDEL命令。
存在这种不对称性的原因是因为,Stream可能具有相关联的消费者组,以及我们不希望因为Stream中没有项目而丢失消费者组定义的状态。当前,即使没有相关联的消费者组,Stream也不会被删除,但这在将来有可能会发生变化。