redis-发布订阅

redis的发布订阅功能很简单,消息的生产方不直接和订阅方交互,而是通过channel渠道,和KAFKA中的topic是同样的角色。

订阅分为两种类型,一种是具体的,指定了channel的名字,比如hello;另一种是模糊的,指定的channel名字是含有正则的,如hell*。每一个client有两个属性,分别存储自己订阅的具体channel,和模糊的channel。server也有两个字段,维护了所有的channel及被订阅的client,和所有的模糊channel。

当有客户端往某个channel中发送消息时,server会有两步操作。1.向这个channel对应的所有的client发送消息;2.依次遍历所有的模糊channel,如果模糊的channel和改channel匹配的话,则向对应的client发送消息。如果一个客户端订阅了多个模糊channel,且都命中,则会收到两个消息。

当客户端正常取消订阅的时候,就从client和server对应的列表中移除。当客户端没有取消订阅就退出时,定时任务检测到客户端超时后,释放客户端时,会主动释放客户端所有的订阅。

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
typedef struct client {
dict *pubsub_channels; //客户端订阅的具体channel
list *pubsub_patterns; //客户端订阅的模糊匹配channel
}

typedef struct server {
dict *pubsub_channels; //server保存的所有channel->clients
list *pubsub_patterns; //server保存所有的模糊匹配,列表的元素是pubsubPattern
}

//模糊匹配用到
typedef struct pubsubPattern {
client *client; //客户端
robj *pattern; //模糊匹配的内容
}

源码分析

freePubsubPattern

释放pubsubPattern结构体

1
2
3
4
5
6
void freePubsubPattern(void *p) {
pubsubPattern *pat = p;

decrRefCount(pat->pattern);
zfree(pat);
}

listMatchPubsubPattern

比较两个pubsubPattern是否同一个

1
2
3
4
5
6
int listMatchPubsubPattern(void *a, void *b) {
pubsubPattern *pa = a, *pb = b;

return (pa->client == pb->client) &&
(equalStringObjects(pa->pattern,pb->pattern));
}

clientSubscriptionsCount

获取客户端订阅的数量,具体+模糊

1
2
3
4
int clientSubscriptionsCount(client *c) {
return dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns);
}

pubsubSubscribeChannel

订阅具体channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;

//维护client自己的channel列表
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);

//维护server的channel列表
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}

//返回响应信息
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}

pubsubUnsubscribeChannel

取消订阅具体channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;

incrRefCount(channel);

//从client的列表里删除
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;

//从server的列表里删除
de = dictFind(server.pubsub_channels,channel);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);

//改channel没人订阅了,从哈希表里删掉
if (listLength(clients) == 0) {
dictDelete(server.pubsub_channels,channel);
}
}

//通知
if (notify) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));

}
decrRefCount(channel);
return retval;
}

pubsubSubscribePattern

订阅模糊channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int pubsubSubscribePattern(client *c, robj *pattern) {
int retval = 0;

//维护client自己的
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
pubsubPattern *pat;
listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern);

//维护server的模糊列表
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
}

//通知
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}

pubsubUnsubscribePattern

取消订阅模糊channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
listNode *ln;
pubsubPattern pat;
int retval = 0;

incrRefCount(pattern);

//确定模糊搜索是否存在
if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
retval = 1;

//从client自己的列表里删除
listDelNode(c->pubsub_patterns,ln);
pat.client = c;
pat.pattern = pattern;

//如果client能找到,server中一定能找到,删掉server中的
ln = listSearchKey(server.pubsub_patterns,&pat);
listDelNode(server.pubsub_patterns,ln);
}

//通知
if (notify) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.punsubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
decrRefCount(pattern);
return retval;
}

pubsubUnsubscribeAllChannels

取消订阅所有的具体channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int pubsubUnsubscribeAllChannels(client *c, int notify) {
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
dictEntry *de;
int count = 0;

//依次删除
while((de = dictNext(di)) != NULL) {
robj *channel = dictGetKey(de);

count += pubsubUnsubscribeChannel(c,channel,notify);
}

//一个也没有的回复
if (notify && count == 0) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReply(c,shared.nullbulk);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
dictReleaseIterator(di);
return count;
}

pubsubUnsubscribeAllPatterns

取消订阅所有的模糊channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int pubsubUnsubscribeAllPatterns(client *c, int notify) {
listNode *ln;
listIter li;
int count = 0;

//维护client自己的列表
listRewind(c->pubsub_patterns,&li);
while ((ln = listNext(&li)) != NULL) {
robj *pattern = ln->value;

count += pubsubUnsubscribePattern(c,pattern,notify);
}

//通知
if (notify && count == 0) {
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.punsubscribebulk);
addReply(c,shared.nullbulk);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
return count;
}

pubsubPublishMessage

针对某个channel发布消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;

//向订阅具体channel的client发送消息
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;

listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;

addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
//想订阅模糊channel的client发送消息
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;

//正则匹配
if (stringmatchlen((char*)pat->pattern->ptr,
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}

subscribeCommand

响应subscribe订阅命令

1
2
3
4
5
6
7
void subscribeCommand(client *c) {
int j;

for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;
}

unsubscribeCommand

响应unsubscribe取消订阅命令

1
2
3
4
5
6
7
8
9
10
11
12
13
void unsubscribeCommand(client *c) {

//不带参数的情况下,取消所有的订阅
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;

for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}

psubscribeCommand

响应psubscribe模糊订阅命令

1
2
3
4
5
6
7
void psubscribeCommand(client *c) {
int j;

for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;
}

punsubscribeCommand

响应punsubscribe取消模糊订阅命令

1
2
3
4
5
6
7
8
9
10
11
12
void punsubscribeCommand(client *c) {
//不带参数的情况下,取消所有的模糊订阅
if (c->argc == 1) {
pubsubUnsubscribeAllPatterns(c,1);
} else {
int j;

for (j = 1; j < c->argc; j++)
pubsubUnsubscribePattern(c,c->argv[j],1);
}
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}

publishCommand

响应publish发布命令

1
2
3
4
5
6
7
8
9
10
11
void publishCommand(client *c) {
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);

//集群模式
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
//其余模式把该命令传播给从节点
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}

pubsubCommand

响应pubsub命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void pubsubCommand(client *c) {

//返回命令help信息
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
"CHANNELS [<pattern>] -- Return the currently active channels matching a pattern (default: all).",
"NUMPAT -- Return number of subscriptions to patterns.",
"NUMSUB [channel-1 .. channel-N] -- Returns the number of subscribers for the specified channels (excluding patterns, default: none).",
NULL
};
addReplyHelp(c, help);
} else if (!strcasecmp(c->argv[1]->ptr,"channels") &&
//获取正则channel信息
(c->argc == 2 || c->argc == 3))
{
/* PUBSUB CHANNELS [<pattern>] */
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
dictIterator *di = dictGetIterator(server.pubsub_channels);
dictEntry *de;
long mblen = 0;
void *replylen;

replylen = addDeferredMultiBulkLength(c);
while((de = dictNext(di)) != NULL) {
robj *cobj = dictGetKey(de);
sds channel = cobj->ptr;

if (!pat || stringmatchlen(pat, sdslen(pat),
channel, sdslen(channel),0))
{
addReplyBulk(c,cobj);
mblen++;
}
}
dictReleaseIterator(di);
setDeferredMultiBulkLength(c,replylen,mblen);
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
//获取channel订阅的数量
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
int j;

addReplyMultiBulkLen(c,(c->argc-2)*2);
for (j = 2; j < c->argc; j++) {
list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);

addReplyBulk(c,c->argv[j]);
addReplyLongLong(c,l ? listLength(l) : 0);
}
} else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
//获取模糊channel的数量
/* PUBSUB NUMPAT */
addReplyLongLong(c,listLength(server.pubsub_patterns));
} else {
addReplySubcommandSyntaxError(c);
}
}