redis-发布订阅
redis的发布订阅功能很简单,消息的生产方不直接和订阅方交互,而是通过channel渠道,和KAFKA中的topic是同样的角色。
订阅分为两种类型,一种是具体的,指定了channel的名字,比如hello;另一种是模糊的,指定的channel名字是含有正则的,如hell*。每一个client有两个属性,分别存储自己订阅的具体channel,和模糊的channel。server也有两个字段,维护了所有的channel及被订阅的client,和所有的模糊channel。
当有客户端往某个channel中发送消息时,server会有两步操作。1.向这个channel对应的所有的client发送消息;2.依次遍历所有的模糊channel,如果模糊的channel和改channel匹配的话,则向对应的client发送消息。如果一个客户端订阅了多个模糊channel,且都命中,则会收到两个消息。
当客户端正常取消订阅的时候,就从client和server对应的列表中移除。当客户端没有取消订阅就退出时,定时任务检测到客户端超时后,释放客户端时,会主动释放客户端所有的订阅。
数据结构
1 | typedef struct client { |
源码分析
freePubsubPattern
释放pubsubPattern结构体
1 | void freePubsubPattern(void *p) { |
listMatchPubsubPattern
比较两个pubsubPattern是否同一个
1 | int listMatchPubsubPattern(void *a, void *b) { |
clientSubscriptionsCount
获取客户端订阅的数量,具体+模糊
1 | int clientSubscriptionsCount(client *c) { |
pubsubSubscribeChannel
订阅具体channel
1 | int pubsubSubscribeChannel(client *c, robj *channel) { |
pubsubUnsubscribeChannel
取消订阅具体channel
1 | int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { |
pubsubSubscribePattern
订阅模糊channel
1 | int pubsubSubscribePattern(client *c, robj *pattern) { |
pubsubUnsubscribePattern
取消订阅模糊channel
1 | int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { |
pubsubUnsubscribeAllChannels
取消订阅所有的具体channel
1 | int pubsubUnsubscribeAllChannels(client *c, int notify) { |
pubsubUnsubscribeAllPatterns
取消订阅所有的模糊channel
1 | int pubsubUnsubscribeAllPatterns(client *c, int notify) { |
pubsubPublishMessage
针对某个channel发布消息
1 | int pubsubPublishMessage(robj *channel, robj *message) { |
subscribeCommand
响应subscribe订阅命令
1 | void subscribeCommand(client *c) { |
unsubscribeCommand
响应unsubscribe取消订阅命令
1 | void unsubscribeCommand(client *c) { |
psubscribeCommand
响应psubscribe模糊订阅命令
1 | void psubscribeCommand(client *c) { |
punsubscribeCommand
响应punsubscribe取消模糊订阅命令
1 | void punsubscribeCommand(client *c) { |
publishCommand
响应publish发布命令
1 | void publishCommand(client *c) { |
pubsubCommand
响应pubsub命令
1 | void pubsubCommand(client *c) { |