Redis的Pub/Sub(发布/订阅)
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。redis内置了发布/订阅功能,可以作为消息机制使用。在使用Jedis的Publish/Subscribe功能之前,我们先来看原生版的发布订阅命令。
原生版Redis 发布订阅命令
1.SUBSCRIBE channel [channel ...]
订阅给定的一个或多个频道的信息。
时间复杂度:
O(N),其中 N 是订阅的频道的数量。
返回值:
接收到的信息(请参见下面的代码说明)。
# 订阅 msg 和 chat_room 两个频道
# 1 - 6 行是执行 subscribe 之后的反馈信息
# 第 7 - 9 行才是接收到的第一条信息
# 第 10 - 12 行是第二条
redis> subscribe msg chat_room
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 返回值的类型:显示订阅成功
2) "msg" # 订阅的频道名字
3) (integer) 1 # 目前已订阅的频道数量
4) "subscribe"
5) "chat_room"
6) (integer) 2
7) "message" # 返回值的类型:信息
8) "msg" # 来源(从那个频道发送过来)
9) "hello moto" # 信息内容
10) "message"
11) "chat_room"
12) "testing...haha"
2.PSUBSCRIBE pattern [pattern ...]
订阅一个或多个符合给定模式的频道。每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等), news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。
时间复杂度:
O(N), N 是订阅的模式的数量。
返回值:
接收到的信息(请参见下面的代码说明)。
# 订阅 news.* 和 tweet.* 两个模式
# 第 1 - 6 行是执行 psubscribe 之后的反馈信息
# 第 7 - 10 才是接收到的第一条信息
# 第 11 - 14 是第二条
# 以此类推。。。
redis> psubscribe news.* tweet.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe" # 返回值的类型:显示订阅成功
2) "news.*" # 订阅的模式
3) (integer) 1 # 目前已订阅的模式的数量
4) "psubscribe"
5) "tweet.*"
6) (integer) 2
7) "pmessage" # 返回值的类型:信息
8) "news.*" # 信息匹配的模式
9) "news.it" # 信息本身的目标频道
10) "Google buy Motorola" # 信息的内容
11) "pmessage"
12) "tweet.*"
13) "tweet.huangz"
14) "hello"
15) "pmessage"
16) "tweet.*"
17) "tweet.joe"
18) "@huangz morning"
19) "pmessage"
20) "news.*"
21) "news.life"
22) "An apple a day, keep doctors away"
3.UNSUBSCRIBE [channel [channel ...]]
指示客户端退订给定的频道。如果没有频道被指定,即一个无参数的
UNSUBSCRIBE
调用被执行,那么客户端使用SUBSCRIBE
命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
时间复杂度:
O(N) , N 是客户端已订阅的频道的数量。
返回值:
这个命令在不同的客户端中有不同的表现
4.PUNSUBSCRIBE [pattern [pattern ...]]
指示客户端退订所有给定模式。如果没有模式被指定,也即是,一个无参数的
PUNSUBSCRIBE
调用被执行,那么客户端使用PSUBSCRIBE
命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
时间复杂度:
O(N+M) ,其中 N是客户端已订阅的模式的数量, M则是系统中所有客户端订阅的模式的数量。
返回值:
这个命令在不同的客户端中有不同的表现。
5.PUBSUB CHANNELS [pattern]
列出当前的活跃频道。活跃频道指的是那些至少有一个订阅者的频道, 订阅模式的客户端不计算在内。
pattern 参数是可选的:
如果不给出 pattern 参数,那么列出订阅与发布系统中的所有活跃频道。
如果给出 pattern 参数,那么只列出和给定模式 pattern 相匹配的那些活跃频道。
复杂度: O(N) ,N 为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常数)。
返回值: 一个由活跃频道组成的列表。
# client-1 订阅 news.it 和 news.sport 两个频道
client-1> SUBSCRIBE news.it news.sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.sport"
3) (integer) 2
# client-2 订阅 news.it 和 news.internet 两个频道
client-2> SUBSCRIBE news.it news.internet
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.internet"
3) (integer) 2
# 首先, client-3 打印所有活跃频道
# 注意,即使一个频道有多个订阅者,它也只输出一次,比如 news.it
client-3> PUBSUB CHANNELS
1) "news.sport"
2) "news.internet"
3) "news.it"
# 接下来, client-3 打印那些与模式 news.i* 相匹配的活跃频道
# 因为 news.sport 不匹配 news.i* ,所以它没有被打印
redis> PUBSUB CHANNELS news.i*
1) "news.internet"
2) "news.it"
6.PUBSUB NUMSUB [channel-1 ... channel-N]
返回给定频道的订阅者数量, 订阅模式的客户端不计算在内。
复杂度: O(N) , N 为给定频道的数量。
返回值:一个多条批量回复(Multi-bulk reply),回复中包含给定的频道,以及频道的订阅者数量。 格式为:频道 channel-1 , channel-1 的订阅者数量,频道 channel-2 , channel-2 的订阅者数量,诸如此类。 回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。 不给定任何频道而直接调用这个命令也是可以的, 在这种情况下, 命令只返回一个空列表。
# client-1 订阅 news.it 和 news.sport 两个频道
client-1> SUBSCRIBE news.it news.sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.sport"
3) (integer) 2
# client-2 订阅 news.it 和 news.internet 两个频道
client-2> SUBSCRIBE news.it news.internet
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.internet"
3) (integer) 2
# client-3 打印各个频道的订阅者数量
client-3> PUBSUB NUMSUB news.it news.internet news.sport news.music
1) "news.it" # 频道
2) "2" # 订阅该频道的客户端数量
3) "news.internet"
4) "1"
5) "news.sport"
6) "1"
7) "news.music" # 没有任何订阅者
8) "0"
7.PUBSUB NUMPAT
返回订阅模式的数量。
注意,这个命令返回的不是订阅模式的客户端的数量, 而是客户端订阅的所有模式的数量总和。
复杂度: O(1) 。
返回值: 一个整数回复(Integer reply)。
# client-1 订阅 news.* 和 discount.* 两个模式
client-1> PSUBSCRIBE news.* discount.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.*"
3) (integer) 1
1) "psubscribe"
2) "discount.*"
3) (integer) 2
# client-2 订阅 tweet.* 一个模式
client-2> PSUBSCRIBE tweet.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "tweet.*"
3) (integer) 1
# client-3 返回当前订阅模式的数量为 3
client-3> PUBSUB NUMPAT
(integer) 3
# 注意,当有多个客户端订阅相同的模式时,相同的订阅也被计算在 PUBSUB NUMPAT 之内
# 比如说,再新建一个客户端 client-4 ,让它也订阅 news.* 频道
client-4> PSUBSCRIBE news.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.*"
3) (integer) 1
# 这时再计算被订阅模式的数量,就会得到数量为 4
client-3> PUBSUB NUMPAT
(integer) 4
Jedis发布订阅命令
要使用Jedis
的Publish/Subscribe
功能,必须编写对JedisPubSub的自己的实现,其中的函数的功能如下:
public class PubSubListener extends JedisPubSub{
// 取得订阅的消息后的处理
public void onMessage(String channel, String message) {
//TODO:接收订阅频道消息后,业务处理逻辑
System.out.println(channel + "=" + message);
}
// 初始化订阅时候的处理
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "=" + subscribedChannels);
}
// 取消订阅时候的处理
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(channel + "=" + subscribedChannels);
}
// 初始化按表达式的方式订阅时候的处理
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println(pattern + "=" + subscribedChannels);
}
// 取消按表达式的方式订阅时候的处理
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println(pattern + "=" + subscribedChannels);
}
// 取得按表达式的方式订阅的消息后的处理
public void onPMessage(String pattern, String channel, String message) {
System.out.println(pattern + "=" + channel + "=" + message);
}
}
Jedis有两种订阅模式:subsribe(一般模式设置频道)和psubsribe(使用模式匹配来设置频道)。不管是那种模式都可以设置个数不定的频道。订阅得到信息在将会lister的onMessage(…)方法或者onPMessage(…)中进行进行处理,这里我们只是做了简单的输出。 具体代码见GitHup:https://github.com/granett/Redis/tree/master/src/main/java/com/redis/pubsub
public class Subscribe {
private Jedis jedis = new Jedis("192.168.1.207",6379);
/**
* SUBSCRIBE channel [channel ...]
* 订阅给定的一个或多个频道的信息
*/
@Test
public void subscribe(){
final PubSubListener listener = new PubSubListener();
jedis.subscribe(listener, "channel");
}
/**
* UNSUBSCRIBE [channel [channel ...]]
* 指示客户端退订给定的频道
* 如果没有频道被指定,即一个无参数的 UNSUBSCRIBE 调用被执行,
* 那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。
* 在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
*/
@Test
public void unsubscribe(){
final PubSubListener listener = new PubSubListener();
listener.unsubscribe("channel");
}
/**
* PSUBSCRIBE pattern [pattern ...]
* 订阅一个或多个符合给定模式的频道
* 每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等),
* news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。
*/
@Test
public void psubscribe(){
final PubSubListener listener = new PubSubListener();
jedis.psubscribe(listener, "ch*");
}
/**
* PUNSUBSCRIBE [pattern [pattern ...]]
* 指示客户端退订所有给定模式
* 如果没有模式被指定,即一个无参数的 PUNSUBSCRIBE 调用被执行,
* 那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。
* 在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
*/
@Test
public void punsubscribe(){
final PubSubListener listener = new PubSubListener();
listener.punsubscribe("ch*");
}
/**
* PUBLISH channel message
* 将信息 message 发送到指定的频道 channel
* 返回值:接收到信息 message 的订阅者数量
*/
@Test
public void publish(){
jedis.publish("channel", "bar123");
System.out.println("发布消息");
}
/**
* PUBSUB CHANNELS [pattern]
* 列出当前的活跃频道。 活跃频道指的是那些至少有一个订阅者的频道, 订阅模式的客户端不计算在内。
* pattern 参数是可选的:
* 如果不给出 pattern 参数,那么列出订阅与发布系统中的所有活跃频道。
* 如果给出 pattern 参数,那么只列出和给定模式 pattern 相匹配的那些活跃频道。
*/
@Test
public void PUBSUB(){
List<String> list = jedis.pubsubChannels("*");
System.out.println(list);
}
/**
* PUBSUB NUMSUB [channel-1 ... channel-N]
* 返回给定频道的订阅者数量, 订阅模式的客户端不计算在内。
*/
@Test
public void pubsubNumSub(){
Map<String,String> map = jedis.pubsubNumSub();
System.out.println(map);
}
/**
* PUBSUB NUMPAT
* 返回订阅模式的数量。
* 注意, 这个命令返回的不是订阅模式的客户端的数量, 而是客户端订阅的所有模式的数量总和。
*/
@Test
public void pubsubNumPat(){
Long count = jedis.pubsubNumPat();
System.out.println(count);
}
}