Redis 发布订阅功能

2021-11-16  本文已影响0人  wayyyy
发布订阅

Redis 发布订阅 (pub/sub) 是一种消息通信模式:订阅者 (sub) 订阅频道,发送者 (pub) 向频道发送消息,订阅者通过该频道接收消息。

image.png

不过发布订阅模式有2点需要注意:

  1. 新开启的订阅客户端,无法接收到该频道之前的消息,因为redis不会对发布的消息进行持久化
  2. 客户端在执行订阅命令之后进入了订阅状态,只能接收订阅和退订的命令。
实例测试:

客户端1输入 SUBSCRIBE runoobChat 订阅频道 runoobChat


image.png

客户端2 在 频道 runoobChat 发布消息,订阅者就能接收到消息。


image.png

客户端1将会接收:


image.png
频道的订阅与退订

Redis 将所有频道的订阅关系保存在服务器状态的字典里面,这个字典的健是某个被订阅的频道,而健的值则是一个链表,链表里面记录了所有订阅这个频道的客户端。

struct redisServer
{
    dict *pubsub_channels;  // 保存所有频道的订阅关系
    ...
};
订阅频道

每当客户端执行``命令订阅某些频道的时候,服务器斗湖将客户端与被订阅频道在 字典中进行关联。
按照频道是否有订阅者,关联操作可以分为2种情况执行:

调用栈

(gdb) bt
#0  pubsubSubscribeChannel (c=0x5555558dd358, channel=0x5555558dc2c8) at pubsub.c:65
#1  0x00005555555b3670 in subscribeCommand (c=0x5555558dd358) at pubsub.c:480
#2  0x0000555555577528 in call (c=0x5555558dd358, flags=7) at redis.c:2441
#3  0x0000555555578044 in processCommand (c=0x5555558dd358) at redis.c:2766
#4  0x00005555555865c4 in processInputBuffer (c=0x5555558dd358) at networking.c:1539
#5  0x00005555555868b2 in readQueryFromClient (el=0x5555558666f8, fd=7, privdata=0x5555558dd358, mask=1)
    at networking.c:1631
#6  0x00005555555701bf in aeProcessEvents (eventLoop=0x5555558666f8, flags=3) at ae.c:576
#7  0x0000555555570380 in aeMain (eventLoop=0x5555558666f8) at ae.c:635
#8  0x000055555557b30a in main (argc=2, argv=0x7fffffffe4c8) at redis.c:4079
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    // 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);

        // 关联示意图
        // {
        //  频道名        订阅频道的客户端
        //  'channel-a' : [c1, c2, c3],
        // }
        /* Add the client to the channel -> list of clients hash table */
        // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
        // 如果 channel 不存在于字典,那么添加进去
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }

        // before:
        // 'channel' : [c1, c2]
        // after:
        // 'channel' : [c1, c2, c3]
        // 将客户端添加到链表的末尾
        listAddNodeTail(clients,c);
    }

    /* Notify the client */
    // 回复客户端。
    // 示例:
    // redis 127.0.0.1:6379> SUBSCRIBE xxx
    // Reading messages... (press Ctrl-C to quit)
    // 1) "subscribe"
    // 2) "xxx"
    // 3) (integer) 1
    addReply(c,shared.mbulkhdr[3]);
    // "subscribe\n" 字符串
    addReply(c,shared.subscribebulk);
    // 被订阅的客户端
    addReplyBulk(c,channel);
    // 客户端订阅的频道和模式总数
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

    return retval;
频道退订
/*  
 * 客户端 c 退订频道 channel ,如果取消成功返回 1 ,如果因为客户端未订阅频道,而造成取消失败,返回 0 。
*/
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
    dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;

    // 将频道 channel 从 client->channels 字典中移除
    incrRefCount(channel);

    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        // channel 移除成功,表示客户端订阅了这个频道,执行以下代码
        retval = 1;

        // 从 channel->clients 的 clients 链表中,移除 client
        de = dictFind(server.pubsub_channels,channel);
        redisAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);
        ln = listSearchKey(clients,c);
        redisAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);

        // 如果移除 client 之后链表为空,那么删除这个 channel 键
        if (listLength(clients) == 0) {
            dictDelete(server.pubsub_channels,channel);
        }
    }

    /* Notify the client */
    // 回复客户端
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        // "ubsubscribe" 字符串
        addReply(c,shared.unsubscribebulk);
        // 被退订的频道
        addReplyBulk(c,channel);
        // 退订频道之后客户端仍在订阅的频道和模式的总数
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }

    decrRefCount(channel); /* it is finally safe to release it */

    return retval;
}
模式的订阅

服务器将所有频道的订阅关系保存再服务器状态的,与此类似,服务器也将所有模式的订阅关系都保存在服务器状态的属性里面

struct redisServer
{
    list *pubsub_patterns;
    ...
};

pubsub_patterns 属性是一个链表,链表中的每一个节点都包含着一个pubsubPattern结构,这个结构的属性记录了被订阅的模式,而client性则记录了订阅模式的客户端。

struct pubsubPattern {
    redisClient* client;
    robj *pattern;    // 被订阅的模式
}
image.png

客户端执行命令订阅某些模式的时候,服务器会对每个被订阅的模式执行下列2个操作:

模式的退订
发送消息
查看订阅信息
上一篇 下一篇

猜你喜欢

热点阅读