Redis主从切换(单点故障)解决源码

news/2024/7/8 0:36:17

1、使用过程:

发布创建channel1消息

redis-cli> PUBLISH channel1 "Hello, world!"

redis-cli> SUBSCRIBE channel1

        优点:

                1、采用Reactor事件单线程去驱动发布订阅事件的,实时性高

                2、从redis架构去思考,拓展哨兵、master、salve都相对简单容易, 扩展性高

        缺点:

                1、可靠性一般,redis只管发送消息,不会等待订阅该频道的实例响应。

                2、高频次访问发布消息,容易阻塞挤压,说白了还是Reactor单线程驱动缺点。

 2、实现过程:主从切换通过发布订阅模式实现的。

        2-1、实例化pubsub_channels(频道)哈希表,当前为空。

initServer函数
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate(); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);

        2-2、发布命令

        初始化发布命令,往pubsub_channels添加channel和对应的订阅者列表。hash key存储了channel名称,value存储了订阅者列表。

initServerConfig函数
populateCommandTable函数加载redisCommandTable列表。
struct redisCommand redisCommandTable[] = {
    ...
   {"publish",publishCommand,3,"pltr",0,NULL,0,0,0,0,0},
   {"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},
    ...
}
publishCommand函数
void publishCommand(redisClient *c) {
    //发布命令,从pubsub_channels哈希表中查询到对应发布的消息。
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    if (server.cluster_enabled)
        //如果是cluster节点,则使用集群发布模式
        clusterPropagatePublish(c->argv[1],c->argv[2]);
    else
        //
        forceCommandPropagation(c,REDIS_PROPAGATE_REPL);
    //返回接收消息的订阅者数量
    addReplyLongLong(c,receivers);
}

       2-3、订阅消息

        从pubsub_channels适配channel对应的订阅者列表。

initServerConfig函数
populateCommandTable函数加载redisCommandTable列表。
批量订阅和退订指令加载
struct redisCommand redisCommandTable[] = {
    ...
    {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
    {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
    {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
    {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
    ...
}
void subscribeCommand(redisClient *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
}

void unsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllChannels(c,1);
    } else {
        int j;

        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
}

void psubscribeCommand(redisClient *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribePattern(c,c->argv[j]);
}

void punsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllPatterns(c,1);
    } else {
        int j;

        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribePattern(c,c->argv[j],1);
    }
}
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. 
 *
 * 设置客户端 c 订阅频道 channel 。
 *
 * 订阅成功返回 1 ,如果客户端已经订阅了该频道,那么返回 0 。
 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    // 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);

        // 关联示意图
        // {
        //  频道名        订阅频道的客户端
        //  'channel-a' : [c1, c2, c3],
        //  'channel-b' : [c5, c2, c1],
        //  'channel-c' : [c10, c2, c1]
        // }
        /* Add the client to the channel -> list of clients hash table */
        // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
        // 如果 channel 不存在于字典,那么添加进去
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }

        // before:
        // 'channel' : [c1, c2]
        // after:
        // 'channel' : [c1, c2, c3]
        // 将客户端添加到链表的末尾
        listAddNodeTail(clients,c);
    }

    /* Notify the client */
    // 回复客户端。
    // 示例:
    // redis 127.0.0.1:6379> SUBSCRIBE xxx
    // Reading messages... (press Ctrl-C to quit)
    // 1) "subscribe"
    // 2) "xxx"
    // 3) (integer) 1
    addReply(c,shared.mbulkhdr[3]);
    // "subscribe\n" 字符串
    addReply(c,shared.subscribebulk);
    // 被订阅的客户端
    addReplyBulk(c,channel);
    // 客户端订阅的频道和模式总数
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

    return retval;
}

http://lihuaxi.xjx100.cn/news/1923127.html

相关文章

Innosetup 调用c# dll 和 c# dll的函数导出

目标需求&#xff0c;基于现在安装包脚本。需要在用户安装和卸载成功时。进行数据记录,所以需要调用c#dll 主要涉及到的知识点 需要理解脚本的文件使用机制脚本的文件dll加载&#xff0c;和dll的调用c# dll的制作&#xff0c;和工具的使用 下面具体介绍 脚本的文件dll加载&…

【Python排序算法系列】—— 冒泡排序

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 &#x1f4ab;个人格言:"没有罗马,那就自己创造罗马~" 冒泡排序 过程演示&#xff1a; 做题实际遇到的图像是横向的&#xff0c;但是它位置变化和纵向是一样…

推荐Kali Linux 2023.4

今天滚动更新了Kali Linux 2023.4&#xff0c;推荐一下。 开发者们推出了该发行版在2023年的第四个也是最后一个新版本。Kali Linux 2023.4&#xff0c;同时包含15种新工具和GNOME 45。 新工具&#xff1a; cabby - TAXII 客户端实现&#xff1b; cti-taxi-client - TAXII2 …

nrm的保姆级使用教程

&#x1f4e2; 鸿蒙专栏&#xff1a;想学鸿蒙的&#xff0c;冲 &#x1f4e2; C语言专栏&#xff1a;想学C语言的&#xff0c;冲 &#x1f4e2; VUE专栏&#xff1a;想学VUE的&#xff0c;冲这里 &#x1f4e2; CSS专栏&#xff1a;想学CSS的&#xff0c;冲这里 &#x1f4…

2024年软考中级-网络工程师

网络工程师证书考到后&#xff0c;通过本级考试的合格人员能根据应用部门的要求进行网络系统的规划、设计和网络设备的软硬件安装调试工作&#xff0c;能进行网络系统的运行、维护和管理&#xff0c;能高效、可靠、安全地管理网络资源&#xff1b;作为网络专业人员对系统开发进…

循环冗余效验码的计算方法

循环冗余效验码的计算方法 G&#xff08;x&#xff09;&#xff1a; 在了解计算方法之前我们首先要明白G&#xff08;x&#xff09;表明的意思&#xff0c;这一步非常重要&#xff01; 例如&#xff0c;G&#xff08;x&#xff09; x^3 x^2 1 &#xff0c;该式子表明的编…

1.Linux快速入门

Linux快速入门 Linux操作系统简介Linux操作系统优点Linux操作系统发行版1. Red Hat Linux2. CentOS3. Ubuntu4. SUSE Linux5. Fedora Linux 32位与64位操作系统的区别Linux内核命名规则 Linux操作系统简介 Linux操作系统是基于UNIX以网络为核心的设计思想&#xff0c;是一个性…

解析Web自动化测试工具能做什么?

随着互联网的蓬勃发展&#xff0c;Web应用程序在我们生活和工作中扮演着愈发重要的角色。为确保这些Web应用程序的质量、稳定性和安全性&#xff0c;Web自动化测试工具应运而生。本文将介绍Web自动化测试工具的多重功能&#xff0c;以及它们如何在软件开发生命周期中发挥关键作…