Redis 系列(9) Pub/Sub & Keyspace N
2020-02-10 本文已影响0人
suxin1932
https://gitlab.com/zhangxin1932/java-tools.git ((java-tools for redis5.0))
1.Pub/Sub
Pub/Sub模式的缺点:
消息的发布是无状态的,无法保证可达。对于发布者来说,消息是“即发即失”的。
此时如果某个消费者在生产者发布消息时下线,重新上线之后,
是无法接收该消息的,要解决该问题需要使用专业的消息队列,如 Kafka … 此处不再赘述。
#在redis-cli下, 开两个窗口C1, C2:
#C1执行
subscribe channel1
#C2执行
publish channel1 hello
#C1会受到消息
hello
2.Keyspace Notifications (键空间通知)
https://redis.io/topics/notifications (官网)
http://www.redis.cn/topics/notifications.html (中文网)
2.1 概述
>> 该功能需要 redis 2.8及以上版本的支持.
>> 默认情况下,键空间事件通知是不启用的,功能会消耗一些CPU。
>> 可以使用redis.conf中的notify-keyspace-events或者使用CONFIG SET命令来开启通知。
>> 如果希望监听redis中的key创建, 删除, 过期等等事件, 则可才用 keyspace 或者 keyevent.
>> Keyspace Notifications 功能基于pub/sub, 因此暂时是不可靠的, 如果断线重连, 断线期间的消息是会丢失的.
>> 各种支持的参数及命令细节详见官网.
2.2 相关配置 (详见官网)
2.2.1 基本配置
K 键空间事件,以__keyspace@<db>__前缀发布。
E 键事件事件,以__keyevent@<db>__前缀发布。
g 通用命令(非类型特定),如DEL,EXPIRE,RENAME等等
$ 字符串命令
l 列表命令
s 集合命令
h 哈希命令
z 有序集合命令
x 过期事件(每次键到期时生成的事件)
e 被驱逐的事件(当一个键由于达到最大内存而被驱逐时产生的事件)
A g$lshzxe的别名,因此字符串AKE表示所有的事件。
2.2.2 常见组合
基本组合模式是:
>> 配置对应的notify-keyspace-events (config set命令 或者 redis.conf)
>> 订阅key*或keyspace或keyevent消息+对应事件(del, set, expired等)
#1.Redis发布所有通知,客户端订阅所有消息
127.0.0.1:6379> config set notify-keyspace-events KEA
127.0.0.1:6379> psubscribe __key*@0__:*
#2.Redis仅发布keyspace通知,而客户端订阅所有key删除消息类型
127.0.0.1:6379> config set notify-keyspace-events KA
127.0.0.1:6379> psubscribe __key*@0__:del
#3.Redis发布所有通知,客户端仅订阅keyspace消息
127.0.0.1:6379> config set notify-keyspace-events KEA
127.0.0.1:6379> psubscribe __keyspace@0__:*
#4.Redis仅发布字符串特定类型的通知,客户端仅订阅keyevent消息
127.0.0.1:6379> config set notify-keyspace-events KE$
127.0.0.1:6379> psubscribe __keyevent@0__:*
2.3 示例
step1: 开启客户端1, 设置支持该功能, 并订阅key的expired事件
[root@bogon ~]# docker exec -it 2c65051356b0 redis-cli
# 这里E表示keyevent事件, x表示key过期并删除该Key的事件
# Ex 表示 redis 发布keyevent及expired事件
127.0.0.1:6379> config set notify-keyspace-events Ex
OK
#这里表示订阅keyevent的expired事件
127.0.0.1:6379> psubscribe __keyevent@0__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@0__:expired"
3) (integer) 1
step2: 开启客户端2, 设置一个含过期时间的key, 等待其过期
[root@bogon ~]# docker exec -it 2c65051356b0 redis-cli
127.0.0.1:6379>
127.0.0.1:6379> setex name 30 tom
OK
127.0.0.1:6379> get name
"tom"
step3: 等key过期后, 观察客户端1订阅到的消息为
1) "pmessage"
2) "__keyevent@0__:expired"
3) "__keyevent@0__:expired"
4) "name"
3.Redis中的批处理命令 (mget, mset --> 具备原子性)
package com.zy.redis5.single;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.List;
import java.util.Objects;
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class RedisSinglePipelineTest {
@Autowired
private JedisPool jedisPool;
@Test
public void fn03() {
Jedis jedis = jedisPool.getResource();
final String prefix = "prefix_";
if (Objects.nonNull(jedis)) {
try {
jedis.mset(prefix + "name", "tom1",
prefix + "age", "20",
prefix + "gender", "male"
);
jedis.msetnx(prefix + "name", "tom2",
prefix + "age", "22");
List<String> mget = jedis.mget(prefix + "name",
prefix + "age",
prefix + "gender"
);
System.out.println("---------------------------------------");
System.out.println(mget);
System.out.println("---------------------------------------");
} finally {
jedis.close();
}
}
}
}
4.Pipeline (不具备原子性的批处理)
Pipeline 和 Linux 的管道类似,它可以让 Redis 批量执行指令。
4.1 pipeline 效率高的原理
#普通 Redis 非批量命令的过程
Redis使用的是客户端-服务器(CS)模型和请求/响应协议的TCP服务器。
这意味着通常情况下一个请求会遵循以下步骤:
>> 客户端向服务端发送一个查询请求,并监听Socket返回,通常是以阻塞模式,等待服务端响应。
>> 服务端处理命令,并将结果返回给客户端。
Redis客户端与Redis服务器之间使用TCP协议进行连接,
一个客户端可以通过一个socket连接发起多个请求命令。
每个请求命令发出后client通常会阻塞并等待redis服务器处理,
redis处理完请求命令后会将结果通过响应报文返回给client,
因此当执行多条命令的时候都需要等待上一条命令执行完毕才能执行。
[
简言之, redis客户端执行一条命令分4个过程:
发送命令-〉命令排队-〉命令执行-〉返回结果
]
这中间不仅仅多了Round trip time(简称RTT, 往返时间),还多次使用了系统 IO。
由于通信会有网络延迟,假如client和server之间的包传输时间需要0.125秒(单程)。
那么3个命令6个报文至少需要0.75秒才能完成。
这样即使redis每秒能处理100个命令,而我们的client也只能一秒钟发出四个命令。
这显然没有充分利用 redis的处理能力。
#Redis的 pipeline 命令原理 ------- 重点!重点!重点!
管道(pipeline)可以一次性发送多条命令并在执行完后一次性将结果返回,
pipeline通过减少客户端与redis的通信次数来实现降低往返延时时间,
而且Pipeline 实现的原理是"队列",而队列的原理是时先进先出,这样就保证数据的顺序性。
Pipeline 的默认的同步的个数为53个,也就是说arges中累加到53条数据时会把数据提交。
>> client可以将多个命令放到一个tcp报文一起发送,
>> server则可以将多条命令的处理结果放到一个tcp报文返回。
4.2 适用场景&不适用场景
#适用场景
>> 命令之间的结果没有互相依赖,对结果响应也无需立即获得的系统
>> 允许一定比例的写入失败的系统.
#不适用场景
>> 对可靠性要求高,每次操作都需要立马知道这次操作是否成功的系统不适用
>> 命令之间有先后依赖性的系统不适用
4.3 注意点
4.3.1 pipeline 占用连接
#编码时请注意,pipeline期间将“独占”链接
此期间将不能进行非“管道”类型的其他操作,直到pipeline关闭;
如果你的pipeline的指令集很庞大,为了不干扰链接中的其他操作,
你可以为pipeline操作新建Client链接,让pipeline和其他正常操作分离在2个client中。
不过pipeline事实上所能容忍的操作个数,
和socket-output缓冲区大小/返回结果的数据尺寸都有很大的关系;
同时也意味着每个redis-server同时所能支撑的pipeline链接的个数也是有限的,
这将受限于server的物理内存或网络接口的缓冲能力。
5.redis中的事务问题与原子性
5.1 Redis 中命令的原子性
原子性是数据库的事务中的特性。在数据库事务的情景下,原子性指的是:
一个事务(transaction)中的所有操作,要么全部完成,要么全部不完成,不会结束在中间某个环节。
对于Redis而言,命令的原子性指的是:一个操作的不可以再分,操作要么执行,要么不执行。
>> Redis的单条命令操作是原子性的,是因为Redis是单线程的。
>> Redis的原生批命令操作(mset, mget)也是原子性的。
5.2 pipeline 批处理命令不能完全保证原子性
场景: 一次执行一堆命令(即批处理)
redis对事务的支持是部分支持
如下述:
冤头债主: 并未体现出事务的原子性
全体连坐: 则体现出了事务的原子性
5.2.1 redis命令行实现
redis-cli下:
MULTI
set k1 v1
set k2 v2
# 此时二者都已加入队列 QUEUE
#正常执行: 执行EXEC命令, 即提交事务;
EXEC
# 放弃事务: 若执行DISCARD命令, 则放弃事务, 即不会提交本次修改
DISCARD
#全体连坐: 若执行中有一个错, 则都不执行, 事务无法提交
sets k1 11
set k2 22
EXEC
#此时, 执行get k1, 仍然是 v1; get k2, 仍然是v2
#冤头债主: 若执行中某一个错, 其他对, 只有错的不执行, 其他正常执行
incr k1 2
set k3 v3
EXEC
# 此时, 执行get k3 ,是 v3, 执行get k1, 仍是v1
watch 与 unwatch
使用watch后, multi失效,事务失效
#WATCH的机制是:
在事务EXEC命令执行时,Redis会检查被WATCH的key,
只有被WATCH的key从WATCH起始时至今没有发生过变更,EXEC才会被执行。
如果WATCH的key在WATCH命令到EXEC命令之间发生过变化,则EXEC命令会返回失败。
#无加塞篡改: 同一个事务内, 执行, 无错误
# 同一个redis-cli下, 执行下述命令
set a 100
set b 0
watch a
multi
decr a 20
incr b 20
exec
# get a, 是80; get b是20, 无误
#有加塞篡改:
# cli1: 执行
set a 100
set b 0
watch a
# 此时cli2: 执行
set a 900
# 然后cli1才执行:
multi
decr a 20
incr b 20
exec # 不会提交本次事务
# 此时, get a, 是900; get b, 是0
#若是异常, 此时需要unwatch, 然后再watch 执行无加塞篡改的流程
一旦执行了exec或者unwatch命令, 之前所有的监控锁都会被取消了
5.2.2 代码实现
package com.zy.redis5.single;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class RedisSinglePipelineTest {
@Autowired
private JedisPool jedisPool;
@Autowired
private StringRedisTemplate redisTemplate;
@Test
public void fn01() {
Jedis jedis = jedisPool.getResource();
if (Objects.nonNull(jedis)) {
Pipeline pipeline = jedis.pipelined();
try {
// 1.开启事务
pipeline.multi();
// 2.编辑命令
pipeline.set("k1", "1");
pipeline.set("k2", "v2");
// 这里模拟 命令错误, 语法正确, 但是执行错误, 发现所有仅有错误的无法执行, 其他命令正常提交
pipeline.incr("k1"); // k1 由于是数字, 可以 incr, 执行成功;
pipeline.incr("k2"); // k2 由于是字符串, 不可以 incr, 执行失败;
// 3.提交事务
// pipeline.exec(); 这里如果用 exec(), 表示提交事务, 但这里的事务不完全遵循原子性:
// 原子性: 如果是命令错误, 假设有一条执行了 不存在的 abc 命令, 则上述几个正确的命令也都将执行失败.
// 非原子性: 如果是运行时错误, 如上例中对 k2 进行 incr, 则仅有该命令失败, 其他正确命令都将执行成功
pipeline.exec();
// pipeline.sync(); // 这里如果用 sync(), 将不会提交事务.
// pipeline.discard();// 这里如果用 discard(), 表示取消事务, 上述语句不会执行
} catch (Exception e) {
log.error("failed to execute transaction.", e);
} finally {
// 4.关闭 pipeline
pipeline.close();
// 5.释放 jedis 连接
jedis.close();
}
}
}
@Test
public void fn02() {
redisTemplate.executePipelined((RedisCallback<Void>) connection -> {
connection.openPipeline();
for (int i = 1; i < 4; i++) {
byte[] key = ("k" + i).getBytes(StandardCharsets.UTF_8);
byte[] value = ("v" + i).getBytes(StandardCharsets.UTF_8);
connection.setEx(key, i * 100, value);
}
connection.closePipeline();
return null;
});
}
}
5.3 原生批命令(mset, mget)与Pipeline对比
1、原生批命令是原子性,pipeline是非原子性
2、原生批命令一命令多个key, 但pipeline支持多命令(存在事务),非原子性
3、原生批命令是服务端实现,而pipeline需要服务端与客户端共同完成
参考资料
https://mp.weixin.qq.com/s/iAFtB9NJ5f2Pm0NfJq52pg (redis综合)
https://blog.csdn.net/u011489043/article/details/78769428 (pipeline)