网站首页 > 技术文章 正文
说起发布订阅肯定会第一个想到的是观察者模式,该模式的典型特性就是,几个订阅者会同时订阅一个发布者,会观察发布者的变化从而执行相应动作。观察者模式有几个非常金典的实现,其中一个就是Redis的Pub/Sub,Redis的发布订阅功能由核心的三个命令组成,分别是PUBLISH、SUBSCRIBE、PSUBSCRIBE命令。
订阅结构组成
Redis的发布订阅模式由以下几个重要部分组成:Publisher(发布者)、Subscriber(订阅者)、Channel(频道)、Pattern(模式)。
- Publisher(发布者):消息发布者,通过PUBLISH命令向指定渠道发送Message。
- Subscriber(订阅者):订阅频道的客户端。
- Channel(频道):一个标识,发布者与订阅者通过频道关联。
- Pattern(模式):一个标识,关联了多个频道,当其关联的任意频道收到消息,都会通知其订阅者
频道的订阅
- pubsub_channels:用于频道订阅,一个字典结构,key是频道的标识,value的是一个订阅该频道的客户端列表。
- 订阅:客户端通过SUBSCRIBE <channel_id>命令订阅某个或者某些频道,发送命令的客户端将被加入到pubsub_channels字典所订阅频道对应的链表末尾。
- 退订:客户端通过UNSUBSCRIBE <channel_id>命令取消订阅关系,将会把频道客户端对应列表中相应客户端删除。
模式的订阅
- pubsub_patterns/pubsub_patterns_dict:用于模式订阅,也是字典结构,key为模式的标识,value为模式对象列表(pubsub_patterns为列表结构,存储了所有的模式对象)。
- 订阅:客户端通过PSUBSCRIBE <pattern_id>订阅指定模式,请求到服务器后首先会创建一个pubsubPattern对象并加入到pubsub_patterns中,同时也会加入到pubsub_patterns_dict字典中。
- 退订:客户端通过PUNSUBSCRIBE <pattern_id>取消订阅模式,客户端信息会从对应的结构中删除。
消息发布执行过程
- 服务端接收到客户端的PUBLISH命令,调用PUBLISH命令实现。
- 获取pubsub_channels字典中对应的channel客户端列表,循环发送频道消息至客户端socket。
- 获取模式字典中的所有模式信息,循环匹配每一个模式是否与请求中的模式信息匹配,如果匹配则获取字典中对应的客户端列表,循环发送模式消息至客户端socket。
- 后处理,回复客户端。
注:如果不了解Redis命令执行过程的,可阅读我的另外一篇文章《图解Redis-命令系统设计》
核心代码如下:
// PUBLISH命令的实现
void publishCommand(client *c) {
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}
// 命令核心逻辑
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
// 获取频道对应的客户端列表
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
// 发送频道消息至对应客户端
addReplyPubsubMessage(c,channel,message);
receivers++;
}
}
/* Send to clients listening to matching channels */
// 获取所有模式字典
di = dictGetIterator(server.pubsub_patterns_dict);
if (di) {
channel = getDecodedObject(channel);
// 循环每一个模式
while((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de);
// 请求的模式是否与当前模式匹配
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) continue;
listRewind(clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
// 发送模式消息至对应客户端
addReplyPubsubPatMessage(c,pattern,channel,message);
receivers++;
}
}
decrRefCount(channel);
dictReleaseIterator(di);
}
return receivers;
}
以上就是Redis Pub/Subd的介绍,如果各位还想了解更多,欢迎转发+评论+关注,Redis图解系列专栏持续更新中,欢迎进入我的主页查看。
猜你喜欢
- 2024-09-20 Redis这些知识你了解吗?(redis知识点)
- 2024-09-20 本地启动redis控制台 & 安装redis服务(用于调试)
- 2024-09-20 redis安装4步轻松解决,你知道吗?
- 2024-09-20 redis快速入门(redis教程详解)
- 2024-09-20 Redis学习之路1-Redis的安装(redis 安装步骤)
- 2024-09-20 Redis系列-入门及安装(redis安装及使用)
- 2024-09-20 redis数据库使用教程(redis 数据库)
- 2024-09-20 一探Redis究竟:超火爆入门指南,你竟然还没看?
- 2024-09-20 redis 一主二从三哨兵的搭建测试(redis三主三从如何选举)
- 2024-09-20 费时3个月啃烂了这份Redis技术笔记,我成功上岸进了字节
- 1514℃桌面软件开发新体验!用 Blazor Hybrid 打造简洁高效的视频处理工具
- 563℃Dify工具使用全场景:dify-sandbox沙盒的原理(源码篇·第2期)
- 508℃MySQL service启动脚本浅析(r12笔记第59天)
- 486℃服务器异常重启,导致mysql启动失败,问题解决过程记录
- 485℃启用MySQL查询缓存(mysql8.0查询缓存)
- 465℃「赵强老师」MySQL的闪回(赵强iso是哪个大学毕业的)
- 445℃mysql服务怎么启动和关闭?(mysql服务怎么启动和关闭)
- 442℃MySQL server PID file could not be found!失败
- 最近发表
- 标签列表
-
- c++中::是什么意思 (83)
- 标签用于 (65)
- 主键只能有一个吗 (66)
- c#console.writeline不显示 (75)
- pythoncase语句 (81)
- es6includes (73)
- windowsscripthost (67)
- apt-getinstall-y (86)
- node_modules怎么生成 (76)
- c++int转char (75)
- static函数和普通函数 (76)
- el-date-picker开始日期早于结束日期 (70)
- js判断是否是json字符串 (67)
- checkout-b (67)
- c语言min函数头文件 (68)
- asynccallback (71)
- localstorage.removeitem (74)
- vector线程安全吗 (70)
- & (66)
- java (73)
- js数组插入 (83)
- mac安装java (72)
- eacces (67)
- 查看mysql是否启动 (70)
- 无效的列索引 (74)