redis集群

redis集群是由多个节点实例组成的网状拓扑结构,每一个节点有两个对外服务的端口,一个是面向普通客户端的正常服务(默认端口是6379),另一个则是用于集群内节点交流的服务(默认端口是6379+10000),主要包括增加节点、删除节点、分配哈希槽点、迁移槽点、故障修复等功能。集群的实现和哨兵的实现有相同的地方,也有不一样的地方。相同点是二者都支持主从模式,并且在master挂掉之后,能够自动发现并修复。不一样的地方是哨兵模式下的故障修复是从哨兵中选出一个leader,参与投票的也是哨兵节点,然后由leader哨兵主导完成整个修复过程。在集群模式下,则不存在哨兵这样的领导层角色节点,故障修复过程则是由挂掉master下的slave节点发起,其余master参与投票来选出一个slave来主导修复过程。

redis集群的实现引入了哈希槽的概念,将整个集群分为16384个槽点,集群中每个节点负责其中的一部分。通过对命令中key进行运算,结果对总的槽点取余来决定应该访问哪个槽点。当集群稳定后,集群中每个槽点都会有一份完整的槽点路由表,记录每个槽点应该去哪个节点。

集群组成

节点

一个节点有以下信息,是其他节点都知道的:

  1. IP、端口、集群端口
  2. 标识位,比如是master、是slave、客观下线、主观下线、握手中
  3. 节点使用的槽点情况
  4. 最近一次发送ping的时间
  5. 最近一次收到ping的时间
  6. slave节点的数量
  7. 主节点的信息

在使用redis-cli创建集群的时候,会选取第一个master节点作为标杆,其余的master节点向第一个节点发送MEET消息,消息内会带上自身的信息,ip、端口、名称等信息。节点接收到MEET信息后,会把该节点作为集群中的一部分记录下来,然后在定时任务中,对自己已知的节点发送PING命令来进行心跳检测和交流,收到PING命令的节点则会回复PONG命令。在MEETPINGPONG命令中,都会附带当前节点的部分已知节点信息,收到命令的一方,则会把该节点添加到自己的已知节点中。在经过多次交流后,每两个节点都会知道彼此的全部已知节点信息。

集群是由多个master主节点组成,为了提升集群的高可用性,解决某个master挂掉之后,部分槽点变得不可用的情况,通常会为每个master配置一个或多个slave从节点。当master挂掉之后,从其slave节点中提取一个成为新的master。
集群中节点都是直接连接的,不存在向哨兵那样的管理节点。在由N个节点组成的集群中,每个节点都有N-1个连接,从结构上来看,像是一个图结构。节点之间交流的的时候,会带上自身的信息和自己的槽点路由表。因此在构建集群的时候,我们只需要把所有节点连接成一个树就好了,在经过一段时间的交流后,最终形成一个完整的图。比如A知道B,B知道C,B跟A交流的时候,会告诉A自己知道C,然后A知道了C,同样的B跟C交流的时候,C也会知道A。下图所示的就是我们在构建集群开始的拓扑,和集群稳定后的拓扑。

master节点的数量最低为3,因为当master挂掉之后,一个slave节点能够成为新的master,是需要获得一半以上master票数的支持的。一个或两个master节点组成的集群,其一半以上的数量就是总的master节点数量,在某个master挂掉后,是永远都不满足的这个条件的。而三个节点的话,一半以上的数量最低是2个,是能够满足的。

创建集群

创建集群分为两步,第一步是节点通过MEET命令构建成一个连通图,第二步是节点通过gossip协议交流,完善各自的拓扑结构。在第一步中,是需要一个额外的工具来完成的,可以是纯手工操作、redis-cli cluster命令或者redis-trib.rb脚本。

握手过程

  1. 脚本向节点A发送MEET命令,要求节点A去会见节点B,此时A创建一个node,填充上ip、port、cport,标记CLUSTER_NODE_HANDSHAKE & CLUSTER_NODE_MEETnode->link为空,node->name是随机生成
  2. 节点A在定时任务里,发现此node->link为空,创建根据node->ip,node->cport创建连接,指定连接的读响应处理为clusterReadHandler,此时link->node=node;node->link=link;,link和node完成关联
  3. 节点BclusterAcceptHandler收到链接请求,创建一个link,link->node为空,指定连接的读响应处理同样为clusterReadHandler
  4. 节点A发送MEET消息,消息里有节点A的名字
  5. 节点B接受到完整信息后,调用clusterProcessPacket处理MEET消息。创建一个node填充ip、port、cport,并标记CLUSTER_NODE_HANDSHAKE,处理gossip信息并回复PONG
  6. 节点A收到PONG回复,因为node还在握手阶段,所以会根据PONG回复里的节点B名字来更新node->name。node结束握手阶段,给node打上master或者slave标签
  7. 节点B在定时任务中,发现node->link为空,创建跟节点A的link,此时link->node=node;node->link=link;,link和node完成关联连接后续处理为clusterReadHandler,发送PING消息给A
  8. 节点A回复PONG
  9. 节点B收到完成消息,调用clusterProcessPacket处理PONG消息,更新node->name。节点B的node握手结束,打上master或者slave标签

整个过程如下图所示

最终的,节点A跟节点B各有两个link,一个是主动发起的link,和node互相绑定;一个是被动连接的link,根据消息里的sender查找本地对应的node。这两个连接也有特殊的用途,当一个节点的ip、port、cport发生变化时,我们可以通过主动的link来告知其他节点,我们的连接信息发生变化了。比如节点A和B互相建立的连接,节点A在某一刻ip发生变化了,在后续发给节点B的PING消息中,会带上自己的最新ip。节点B在收到消息后,发现消息中节点A自报的ip和自己已知nodeA中的ip信息不一致,这时就会删除nodeA中的link,也就是主动的link,并更新nodeA的ip。在下一次定时任务时,会使用更新后的信息,建立跟节点A的连接。

因此在clusterProcessPacket中看到if (link->node),表示是我们发起的主动连接,if (sender)则表示消息里的name对应的节点是我们已知的节点。

槽点(哈希槽)

redis集群的实现没有使用一致性哈希,而是采用了槽点的概念。整个集群共有16384个槽点,每个master节点分别负责其中的一部分,每个节点都有一份完整的槽点路由表,记录着每个槽点对应的节点的信息。这样客户端只需要连接集群中任意一个节点,该节点就能正确告知客户端应该去哪个真正的节点进行操作。

槽点哈希

当我们在集群中执行命令的时候,服务端会把键值进行CRC16计算,然后对总槽点数进行取余,来决定该命令应该落到哪个槽点,然后根据槽点路由表,找到集群中对应的节点来处理命令。

针对键值的计算,有以下规则:

  1. 如果键值里包含”{}”, 且第一对”{}”中间的内容不为空,那么只有第一个”{}”之间的内容会参与CRC16计算。eg: key = {user}100{qaq}, 参与计算的数据为 user
  2. 如果键值里包含”{}”, 且第一对”{}”中间的内容为空,整个键值都会参与计算。eg: key = {}user100,参与计算的数据为{}user100
  3. 如果键值里不包含”{}”,整个键值都会参与计算。eg : key = user100,参与计算的数据为user100

总的来说就是,当且仅当键值中第一对”{}”中间不为空时,才会使用第一对”{}”之间的内容进行计算,否则会使用整个键值的内容。

如果命令涉及到多个key,当且仅当所有key都分布在同一个槽点才能正常响应。比如mget,当key分布在不同槽点的话,会返回一个错误(error) CROSSSLOT Keys in request don't hash to the same slot。我们可以利用上面的规则使用”{}”规则,强制把多个key分布在同一个节点上,这样就能正常响应多key请求了。

槽点迁移

由于某些原因(人为故意的、碎片整理),我们需要对槽点进行节点间的迁移。槽点在迁移的过程中有两个状态MIGRATINGIMPORTING,比如我们要把槽点1从节点A迁移到节点B上。需要执行以下两步:

  1. 向目标节点B发送命令 :CLUSTER SETSLOT 1 IMPORTING A ,表示从A节点迁移槽点1
  2. 向源节点A发送命令 :CLUSTER SETSLOT 1 MIGRATING B , 表示把槽点1迁移到B

故障修复

在一个良好的redis集群拓扑中,至少有3个master节点组成,每个master至少有一个salve节点。在某个master挂掉之后,其下面的salve节点检测到master不可用,会将该master标记成PFAIL主观下线状态。在后续跟其他节点交流的过程中,统计其他节点是否也检测到该master不可用。当标记master不可用的节点数量达到集群中总节点数量一半以上时,slave将其标记成FAIL客观下线。然后会发起一个投票,希望自己能够成为故障修复的执行者,请求其他master投自己一票。当一段时间内,有超过一半以上的master同意了,那么就可以成功当选,否则会在下一次再尝试发起。

成功选举后,slave会进行以下操作:

  1. 把自己变成master,去掉salve标记,打上master标记,把自己从之前master节点下的salve列表中删除,更新自己的纪元
  2. 删除掉主从同步中master标记,断开跟master以及自己下面所有salve的连接
  3. 把之前master负责的槽点划分到自己下面
  4. 广播告知其他节点,更新拓扑结构
  5. 其余节点在收到slave的后续交流消息时,发现旧的master负责的槽点,现在被新的master宣布说他负责,则更新自己的拓扑结构。如果是旧master下的slave节点收到消息的话,则会更改自己的master,成为新master的slave

这个过程也可以手动执行,当我们想强制提升一个slave为master的时候

客户端查询

连接单点的实例时,我们通常用redis-cli -h xxxx -p xxxx去连接,针对于集群的话,我们需要加上-c参数,在该命令下,如果服务端返回ASK,MOVED命令的话,能够自动切换到对应的节点进行查询,否则的话,会看到一个异常报错。

源码分析

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
#define CLUSTER_SLOTS 16384 //槽点的个数
#define CLUSTER_OK 0 //集群节点状态正常
#define CLUSTER_FAIL 1 //集群节点状态不正常
#define CLUSTER_NAMELEN 40 //集群节点名称长度
#define CLUSTER_PORT_INCR 10000 //集群服务端口偏移量,默认是正常端口加上该偏移量

//时间相关
#define CLUSTER_DEFAULT_NODE_TIMEOUT 15000 //出现故障时,修复的超时时间
#define CLUSTER_DEFAULT_SLAVE_VALIDITY 10 //从节点和主节点最大数据差因子,当从节点和主节点数据相差很大时,该从节点是不能提升为新的主节点,
#define CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE 1 //计算集群状态时,是否需要考虑所有槽点是否正常
#define CLUSTER_DEFAULT_SLAVE_NO_FAILOVER 0 //故障修复默认状态,正常
#define CLUSTER_FAIL_REPORT_VALIDITY_MULT 2 //节点汇报失败消息的有效性因子,如果距离节点上一次汇报失败是 CLUSTER_DEFAULT_NODE_TIMEOUT*CLUSTER_FAIL_REPORT_VALIDITY_MULT以上,那么认为该消息是无效的
#define CLUSTER_FAIL_UNDO_TIME_MULT 2 //有效master节点挂掉失效时间因子,如果master挂掉的时间距离现在有CLUSTER_FAIL_UNDO_TIME_MULT*CLUSTER_DEFAULT_NODE_TIMEOUT,那么清除挂掉的标记
#define CLUSTER_FAIL_UNDO_TIME_ADD 10 //没有用到
#define CLUSTER_FAILOVER_DELAY 5 //开始故障修复的延迟时间,现在没有用到
#define CLUSTER_DEFAULT_MIGRATION_BARRIER 1 //master故障时,进行修复所需要最低良好状态的slave节点数量
#define CLUSTER_MF_TIMEOUT 5000 //手动故障转义超时时间
#define CLUSTER_MF_PAUSE_MULT 2 //没有用到
#define CLUSTER_SLAVE_MIGRATION_DELAY 5000 //故障迁移开始的延迟时间

//命令参数映射到槽点的错误
#define CLUSTER_REDIR_NONE 0 //可以正常服务
#define CLUSTER_REDIR_CROSS_SLOT 1 //命令涉及到跨槽点
#define CLUSTER_REDIR_UNSTABLE 2 //涉及到的槽点正在处于迁入/迁出状态,稍后再试
#define CLUSTER_REDIR_ASK 3 //槽点已经开始迁移
#define CLUSTER_REDIR_MOVED 4 //槽点不属于当前节点
#define CLUSTER_REDIR_DOWN_STATE 5 //节点状态不正常,全方面
#define CLUSTER_REDIR_DOWN_UNBOUND 6 //节点状态不正常,没有负责任何槽点

struct clusterNode;

//节点间交流的连接
typedef struct clusterLink {
mstime_t ctime; //连接创建时间
int fd; //socket fd
sds sndbuf; //发送的内容
sds rcvbuf; //收到的内容
struct clusterNode *node; //相关的节点
} clusterLink;

//节点标识
#define CLUSTER_NODE_MASTER 1 //master
#define CLUSTER_NODE_SLAVE 2 //slave
#define CLUSTER_NODE_PFAIL 4 //主观下线,自己认为节点下线, possible fail
#define CLUSTER_NODE_FAIL 8 //客观下线,大多数其他master也认为下线了
#define CLUSTER_NODE_MYSELF 16 //自己
#define CLUSTER_NODE_HANDSHAKE 32 //握手中,第一次连接交流
#define CLUSTER_NODE_NOADDR 64 //不知道ip
#define CLUSTER_NODE_MEET 128 //发送meet消息
#define CLUSTER_NODE_MIGRATE_TO 256 //
#define CLUSTER_NODE_NOFAILOVER 512 //slave不会发起故障修复
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" //节点空名字填充

#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER) //节点是否是master
#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE) //节点是否是slave
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE) //节点是否在握手中
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR)) //节点有地址
#define nodeWithoutAddr(n) ((n)->flags & CLUSTER_NODE_NOADDR) //节点没地址
#define nodeTimedOut(n) ((n)->flags & CLUSTER_NODE_PFAIL) //节点超时
#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL) //节点下线
#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER) //节点能否发起故障修复

//slave节点不能发起故障修复的原因
#define CLUSTER_CANT_FAILOVER_NONE 0 //没有原因
#define CLUSTER_CANT_FAILOVER_DATA_AGE 1 //跟master的数据量相差太大
#define CLUSTER_CANT_FAILOVER_WAITING_DELAY 2 //还没有到延迟时间
#define CLUSTER_CANT_FAILOVER_EXPIRED 3 //尝试发起,然后过期了
#define CLUSTER_CANT_FAILOVER_WAITING_VOTES 4 //等待其余master投票
#define CLUSTER_CANT_FAILOVER_RELOG_PERIOD (60*5) //相同错误原因记录的间隔,在此期间只记录一条

//事件循环前要做的事情标识
#define CLUSTER_TODO_HANDLE_FAILOVER (1<<0) //发起故障修复
#define CLUSTER_TODO_UPDATE_STATE (1<<1) //更新自己总的状态ok/fail
#define CLUSTER_TODO_SAVE_CONFIG (1<<2) //保存集群配置
#define CLUSTER_TODO_FSYNC_CONFIG (1<<3) //flush配置到磁盘

//节点间消息类型
#define CLUSTERMSG_TYPE_PING 0 //ping
#define CLUSTERMSG_TYPE_PONG 1 //pong,对ping的回复
#define CLUSTERMSG_TYPE_MEET 2 //meet,和ping一样,不一样的是,接收方会把发送方当做集群中的节点保存下来
#define CLUSTERMSG_TYPE_FAIL 3 //标记某个节点fail
#define CLUSTERMSG_TYPE_PUBLISH 4 //发布
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 //slave节点询问自己能否主导故障修复
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 //master节点回复slave可以的
#define CLUSTERMSG_TYPE_UPDATE 7 //节点配置有更新
#define CLUSTERMSG_TYPE_MFSTART 8 //暂停手动故障修复
#define CLUSTERMSG_TYPE_MODULE 9 //模块消息
#define CLUSTERMSG_TYPE_COUNT 10 //消息类型的总数量

//模块的标识
#define CLUSTER_MODULE_FLAG_NONE 0
#define CLUSTER_MODULE_FLAG_NO_FAILOVER (1<<1)
#define CLUSTER_MODULE_FLAG_NO_REDIRECTION (1<<2)

//汇报节点下线消息
typedef struct clusterNodeFailReport {
struct clusterNode *node; //下线状态的节点
mstime_t time; //上一次汇报的时间
} clusterNodeFailReport;

//节点
typedef struct clusterNode {
mstime_t ctime; //创建时间
char name[CLUSTER_NAMELEN]; //节点名称
int flags; //节点标志,参见CLUSTER_NODE_*
uint64_t configEpoch; //配置纪元
unsigned char slots[CLUSTER_SLOTS/8]; //节点负责的槽点bitmap
int numslots; //负责槽点数量
int numslaves; //从节点个数
struct clusterNode **slaves; //当前是主节点的话,对应从节点列表
struct clusterNode *slaveof; //当前是从节点的话,对应的主节点
mstime_t ping_sent; //最近发送ping的时间
mstime_t pong_received; //最近收到pong的时间
mstime_t fail_time; //标记下线的时间
mstime_t voted_time; //当前是master, 上一次对其slave投票时间
mstime_t repl_offset_time; //最近收到同步的时间
mstime_t orphaned_time; //对于masterl来说,所有slave都挂掉的时间,
long long repl_offset; //最近的同步便宜量
char ip[NET_IP_STR_LEN]; //ip
int port; //正常端口
int cport; //集群端口
clusterLink *link; //交流连接
list *fail_reports; //汇报当前节点挂掉的报告
} clusterNode;

//集群状态
typedef struct clusterState {
clusterNode *myself; //自己
uint64_t currentEpoch; //当前纪元
int state; //状态,ok/fail
int size; //至少有一个槽点的master数量
dict *nodes; //所有节点 name->node
dict *nodes_black_list; //不会重新加入到nodes的节点
clusterNode *migrating_slots_to[CLUSTER_SLOTS]; //迁出的槽点映射
clusterNode *importing_slots_from[CLUSTER_SLOTS]; //迁入的槽点映射
clusterNode *slots[CLUSTER_SLOTS]; //所有槽点路由表
uint64_t slots_keys_count[CLUSTER_SLOTS]; //每个槽点的key数量
rax *slots_to_keys;

//slave选举用到的
mstime_t failover_auth_time; //发起选举的时间
int failover_auth_count; //同意自己选举的数量
int failover_auth_sent; //是否已经发送选举自己请求
int failover_auth_rank; //选举排名
uint64_t failover_auth_epoch; //选举的纪元
int cant_failover_reason; //不能发起故障修复的原因
mstime_t mf_end; //手动发起修复的结束时间,0标识没有

//master
clusterNode *mf_slave; //主导故障修复的slave
//slave
long long mf_master_offset; //手动修复同步开始的偏移量
int mf_can_start; //是否可以手动修复

//master记录自己投票情况
uint64_t lastVoteEpoch; //最近一次投票纪元
int todo_before_sleep; //clusterBeforeSleep要做的事情标记

//统计信息
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT]; //发送的各种类型消息数量
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT]; //接收的各种类型消息数量
long long stats_pfail_nodes; //被标记成下线的节点数量,包括没有地址的
} clusterState;

//节点通信gossip消息,字段参照clusterNode
typedef struct {
char nodename[CLUSTER_NAMELEN];
uint32_t ping_sent;
uint32_t pong_received;
char ip[NET_IP_STR_LEN];
uint16_t port;
uint16_t cport;
uint16_t flags;
uint32_t notused1;
} clusterMsgDataGossip;

//节点挂掉消息
typedef struct {
char nodename[CLUSTER_NAMELEN];
} clusterMsgDataFail;

//发布消息
typedef struct {
uint32_t channel_len;
uint32_t message_len;
unsigned char bulk_data[8];
} clusterMsgDataPublish;

//更新配置消息
typedef struct {
uint64_t configEpoch; //配置纪元
char nodename[CLUSTER_NAMELEN];
unsigned char slots[CLUSTER_SLOTS/8];
} clusterMsgDataUpdate;

//模块消息
typedef struct {
uint64_t module_id;
uint32_t len;
uint8_t type;
unsigned char bulk_data[3];
} clusterMsgModule;

//消息内容
union clusterMsgData {
//ping,pong,meet三个都是一样的
struct {
clusterMsgDataGossip gossip[1];
} ping;

//挂掉
struct {
clusterMsgDataFail about;
} fail;

//发布
struct {
clusterMsgDataPublish msg;
} publish;

//更新配置
struct {
clusterMsgDataUpdate nodecfg;
} update;

//模块
struct {
clusterMsgModule msg;
} module;
};

#define CLUSTER_PROTO_VER 1 //消息协议版本

//完整的消息体
typedef struct {
char sig[4]; //固定的"RCmb" (Redis Cluster message bus)
uint32_t totlen; //消息总长度
uint16_t ver; //协议版本
uint16_t port;
uint16_t type; //消息类型
uint16_t count; //消息具体内容的数量,只针对部分消息
uint64_t currentEpoch; //发送方的纪元
uint64_t configEpoch; //配置纪元
uint64_t offset; //同步偏移量
char sender[CLUSTER_NAMELEN];
unsigned char myslots[CLUSTER_SLOTS/8]; //发送方负责的槽点
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; //发送方ip
char notused1[34]; //未使用
uint16_t cport; //集群端口
uint16_t flags; //发送方标志
unsigned char state;
unsigned char mflags[3];
union clusterMsgData data; //具体消息
} clusterMsg;

#define CLUSTERMSG_MIN_LEN (sizeof(clusterMsg)-sizeof(union clusterMsgData)) //消息最低长度

#define CLUSTERMSG_FLAG0_PAUSED (1<<0) //master暂停,由于手动修复
#define CLUSTERMSG_FLAG0_FORCEACK (1<<1) //master正常的话,强制回复

初始化

clusterLoadConfig

从文件中加载集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
int clusterLoadConfig(char *filename) {
FILE *fp = fopen(filename,"r");
struct stat sb;
char *line;
int maxline, j;

if (fp == NULL) {
if (errno == ENOENT) {
return C_ERR;
} else {
serverLog(LL_WARNING,
"Loading the cluster node config from %s: %s",
filename, strerror(errno));
exit(1);
}
}

//检查0长度文件
if (fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
fclose(fp);
return C_ERR;
}

maxline = 1024+CLUSTER_SLOTS*128;
line = zmalloc(maxline);
while(fgets(line,maxline,fp) != NULL) {
int argc;
sds *argv;
clusterNode *n, *master;
char *p, *s;

//跳过无效行
if (line[0] == '\n' || line[0] == '\0') continue;

argv = sdssplitargs(line,&argc);
if (argv == NULL) goto fmterr;

//处理变量
if (strcasecmp(argv[0],"vars") == 0) {
for (j = 1; j < argc; j += 2) {
if (strcasecmp(argv[j],"currentEpoch") == 0) {
server.cluster->currentEpoch =
strtoull(argv[j+1],NULL,10);
} else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
server.cluster->lastVoteEpoch =
strtoull(argv[j+1],NULL,10);
} else {
serverLog(LL_WARNING,
"Skipping unknown cluster config variable '%s'",
argv[j]);
}
}
sdsfreesplitres(argv,argc);
continue;
}

//最小长度
if (argc < 8) goto fmterr;

//创建节点
n = clusterLookupNode(argv[0]);
if (!n) {
n = createClusterNode(argv[0],0);
clusterAddNode(n);
}

//ip&port
if ((p = strrchr(argv[1],':')) == NULL) goto fmterr;
*p = '\0';
memcpy(n->ip,argv[1],strlen(argv[1])+1);
char *port = p+1;
char *busp = strchr(port,'@');
if (busp) {
*busp = '\0';
busp++;
}
n->port = atoi(port);

//集群端口
n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;

//标志位
p = s = argv[2];
while(p) {
p = strchr(s,',');
if (p) *p = '\0';
if (!strcasecmp(s,"myself")) {
serverAssert(server.cluster->myself == NULL);
myself = server.cluster->myself = n;
n->flags |= CLUSTER_NODE_MYSELF;
} else if (!strcasecmp(s,"master")) {
n->flags |= CLUSTER_NODE_MASTER;
} else if (!strcasecmp(s,"slave")) {
n->flags |= CLUSTER_NODE_SLAVE;
} else if (!strcasecmp(s,"fail?")) {
n->flags |= CLUSTER_NODE_PFAIL;
} else if (!strcasecmp(s,"fail")) {
n->flags |= CLUSTER_NODE_FAIL;
n->fail_time = mstime();
} else if (!strcasecmp(s,"handshake")) {
n->flags |= CLUSTER_NODE_HANDSHAKE;
} else if (!strcasecmp(s,"noaddr")) {
n->flags |= CLUSTER_NODE_NOADDR;
} else if (!strcasecmp(s,"nofailover")) {
n->flags |= CLUSTER_NODE_NOFAILOVER;
} else if (!strcasecmp(s,"noflags")) {
/* nothing to do */
} else {
serverPanic("Unknown flag in redis cluster config file");
}
if (p) s = p+1;
}

//设置slave对应master
if (argv[3][0] != '-') {
master = clusterLookupNode(argv[3]);
if (!master) {
master = createClusterNode(argv[3],0);
clusterAddNode(master);
}
n->slaveof = master;
clusterNodeAddSlave(master,n);
}

//ping/pong time
if (atoi(argv[4])) n->ping_sent = mstime();
if (atoi(argv[5])) n->pong_received = mstime();

n->configEpoch = strtoull(argv[6],NULL,10);

//负责的哈希槽
for (j = 8; j < argc; j++) {
int start, stop;

if (argv[j][0] == '[') {
/* Here we handle migrating / importing slots */
int slot;
char direction;
clusterNode *cn;

p = strchr(argv[j],'-');
serverAssert(p != NULL);
*p = '\0';
direction = p[1]; /* Either '>' or '<' */
slot = atoi(argv[j]+1);
if (slot < 0 || slot >= CLUSTER_SLOTS) goto fmterr;
p += 3;
cn = clusterLookupNode(p);
if (!cn) {
cn = createClusterNode(p,0);
clusterAddNode(cn);
}
if (direction == '>') {
server.cluster->migrating_slots_to[slot] = cn;
} else {
server.cluster->importing_slots_from[slot] = cn;
}
continue;
} else if ((p = strchr(argv[j],'-')) != NULL) {
*p = '\0';
start = atoi(argv[j]);
stop = atoi(p+1);
} else {
start = stop = atoi(argv[j]);
}
if (start < 0 || start >= CLUSTER_SLOTS) goto fmterr;
if (stop < 0 || stop >= CLUSTER_SLOTS) goto fmterr;
while(start <= stop) clusterAddSlot(n, start++);
}

sdsfreesplitres(argv,argc);
}

if (server.cluster->myself == NULL) goto fmterr;

zfree(line);
fclose(fp);

serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);

if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
server.cluster->currentEpoch = clusterGetMaxEpoch();
}
return C_OK;

fmterr:
serverLog(LL_WARNING,
"Unrecoverable error: corrupted cluster config file.");
zfree(line);
if (fp) fclose(fp);
exit(1);
}
clusterSaveConfig

保存集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int clusterSaveConfig(int do_fsync) {
sds ci;
size_t content_size;
struct stat sb;
int fd;

server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;

ci = clusterGenNodesDescription(CLUSTER_NODE_HANDSHAKE);
ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
(unsigned long long) server.cluster->currentEpoch,
(unsigned long long) server.cluster->lastVoteEpoch);
content_size = sdslen(ci);

if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT,0644))
== -1) goto err;

if (fstat(fd,&sb) != -1) {
if (sb.st_size > (off_t)content_size) {
ci = sdsgrowzero(ci,sb.st_size);
memset(ci+content_size,'\n',sb.st_size-content_size);
}
}
if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
if (do_fsync) {
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
fsync(fd);
}

if (content_size != sdslen(ci) && ftruncate(fd,content_size) == -1) {

}
close(fd);
sdsfree(ci);
return 0;

err:
if (fd != -1) close(fd);
sdsfree(ci);
return -1;
}
clusterSaveConfigOrDie

保存配置文件,失败的话退出

1
2
3
4
5
6
void clusterSaveConfigOrDie(int do_fsync) {
if (clusterSaveConfig(do_fsync) == -1) {
serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
exit(1);
}
}
clusterLockConfig

对配置文件进行加锁,防止多个实例使用同一个集群配置文件,使用文件锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int clusterLockConfig(char *filename) {
#if !defined(__sun)
//使用文件所
int fd = open(filename,O_WRONLY|O_CREAT,0644);
if (fd == -1) {
serverLog(LL_WARNING,
"Can't open %s in order to acquire a lock: %s",
filename, strerror(errno));
return C_ERR;
}

if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
if (errno == EWOULDBLOCK) {
serverLog(LL_WARNING,
"Sorry, the cluster configuration file %s is already used "
"by a different Redis Cluster node. Please make sure that "
"different nodes use different cluster configuration "
"files.", filename);
} else {
serverLog(LL_WARNING,
"Impossible to lock %s: %s", filename, strerror(errno));
}
close(fd);
return C_ERR;
}
#endif /* __sun */

return C_OK;
}
clusterUpdateMyselfFlags

更新自身的标志

1
2
3
4
5
6
7
8
9
10
11
void clusterUpdateMyselfFlags(void) {
int oldflags = myself->flags;
int nofailover = server.cluster_slave_no_failover ?
CLUSTER_NODE_NOFAILOVER : 0;
myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
myself->flags |= nofailover;
if (myself->flags != oldflags) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
}
clusterInit

集群初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
void clusterInit(void) {
int saveconf = 0;

server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;
server.cluster->state = CLUSTER_FAIL;
server.cluster->size = 1;
server.cluster->todo_before_sleep = 0;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster->nodes_black_list =
dictCreate(&clusterNodesBlackListDictType,NULL);
server.cluster->failover_auth_time = 0;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_rank = 0;
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
server.cluster->stats_bus_messages_sent[i] = 0;
server.cluster->stats_bus_messages_received[i] = 0;
}
server.cluster->stats_pfail_nodes = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots();

//确保每个实例使用单独的配置
if (clusterLockConfig(server.cluster_configfile) == C_ERR)
exit(1);

//加载配置
if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
//创建自身节点
myself = server.cluster->myself =
createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
myself->name);
clusterAddNode(myself);
saveconf = 1;
}
if (saveconf) clusterSaveConfigOrDie(1);

server.cfd_count = 0;

//检查集群端口上限
if (server.port > (65535-CLUSTER_PORT_INCR)) {
serverLog(LL_WARNING, "Redis port number too high. "
"Cluster communication port is 10,000 port "
"numbers higher than your Redis port. "
"Your Redis port number must be "
"lower than 55535.");
exit(1);
}

//监听端口
if (listenToPort(server.port+CLUSTER_PORT_INCR,
server.cfd,&server.cfd_count) == C_ERR)
{
exit(1);
} else {
int j;

//连接处理
for (j = 0; j < server.cfd_count; j++) {
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR)
serverPanic("Unrecoverable error creating Redis Cluster "
"file event.");
}
}

//槽点路由表
server.cluster->slots_to_keys = raxNew();
memset(server.cluster->slots_keys_count,0,
sizeof(server.cluster->slots_keys_count));

myself->port = server.port;
myself->cport = server.port+CLUSTER_PORT_INCR;
if (server.cluster_announce_port)
myself->port = server.cluster_announce_port;
if (server.cluster_announce_bus_port)
myself->cport = server.cluster_announce_bus_port;

server.cluster->mf_end = 0;
resetManualFailover();
clusterUpdateMyselfFlags();
}
clusterReset

集群重置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void clusterReset(int hard) {
dictIterator *di;
dictEntry *de;
int j;

//重置成master
if (nodeIsSlave(myself)) {
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();
//清空db
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
}

//关闭槽点&故障修复
clusterCloseAllSlots();
resetManualFailover();

for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);

//剔除所有节点
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (node == myself) continue;
clusterDelNode(node);
}
dictReleaseIterator(di);

//hard模式,设置纪元为0,清空节点名称
if (hard) {
sds oldname;

server.cluster->currentEpoch = 0;
server.cluster->lastVoteEpoch = 0;
myself->configEpoch = 0;
serverLog(LL_WARNING, "configEpoch set to 0 via CLUSTER RESET HARD");

oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
dictDelete(server.cluster->nodes,oldname);
sdsfree(oldname);
getRandomHexChars(myself->name, CLUSTER_NAMELEN);
clusterAddNode(myself);
serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
}

clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
}

节点交流连接

创建连接

1
2
3
4
5
6
7
8
9
clusterLink *createClusterLink(clusterNode *node) {
clusterLink *link = zmalloc(sizeof(*link));
link->ctime = mstime();
link->sndbuf = sdsempty();
link->rcvbuf = sdsempty();
link->node = node;
link->fd = -1;
return link;
}

释放连接

1
2
3
4
5
6
7
8
9
10
11
void freeClusterLink(clusterLink *link) {
if (link->fd != -1) {
aeDeleteFileEvent(server.el, link->fd, AE_READABLE|AE_WRITABLE);
}
sdsfree(link->sndbuf);
sdsfree(link->rcvbuf);
if (link->node)
link->node->link = NULL;
close(link->fd);
zfree(link);
}
clusterAcceptHandler

客户端初次连接处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd;
int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
clusterLink *link;
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);

//加载中
if (server.masterhost == NULL && server.loading) return;

while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_VERBOSE,
"Error accepting cluster node: %s", server.neterr);
return;
}

//更改连接为不阻塞&无延迟
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);

serverLog(LL_VERBOSE,"Accepted cluster node %s:%d", cip, cport);

//创建连接
link = createClusterLink(NULL);
link->fd = cfd;
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
}
}

键值哈希处理

keyHashSlot

key到槽点的映射,当且仅当key中第一对{}中间是非空的, 才会使用{}中间的内容做计算,否则使用整个key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */

for (s = 0; s < keylen; s++)
if (key[s] == '{') break;

/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;

/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;

/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;

/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}

节点相关

createClusterNode

创建节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
clusterNode *createClusterNode(char *nodename, int flags) {
clusterNode *node = zmalloc(sizeof(*node));

if (nodename)
memcpy(node->name, nodename, CLUSTER_NAMELEN);
else
getRandomHexChars(node->name, CLUSTER_NAMELEN);
node->ctime = mstime();
node->configEpoch = 0;
node->flags = flags;
memset(node->slots,0,sizeof(node->slots));
node->numslots = 0;
node->numslaves = 0;
node->slaves = NULL;
node->slaveof = NULL;
node->ping_sent = node->pong_received = 0;
node->fail_time = 0;
node->link = NULL;
memset(node->ip,0,sizeof(node->ip));
node->port = 0;
node->cport = 0;
node->fail_reports = listCreate();
node->voted_time = 0;
node->orphaned_time = 0;
node->repl_offset_time = 0;
node->repl_offset = 0;
listSetFreeMethod(node->fail_reports,zfree);
return node;
}
clusterNodeAddFailureReport

处理新来的汇报某节点挂掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
list *l = failing->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;

//重复汇报的话,更新汇报时间
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (fr->node == sender) {
fr->time = mstime();
return 0;
}
}

//插入新的
fr = zmalloc(sizeof(*fr));
fr->node = sender;
fr->time = mstime();
listAddNodeTail(l,fr);
return 1;
}
clusterNodeCleanupFailureReports

清理汇报时间在很久之前的汇报

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void clusterNodeCleanupFailureReports(clusterNode *node) {
list *l = node->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;

//上一次到现在的最大汇报时间
mstime_t maxtime = server.cluster_node_timeout *
CLUSTER_FAIL_REPORT_VALIDITY_MULT;
mstime_t now = mstime();

listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (now - fr->time > maxtime) listDelNode(l,ln);
}
}
clusterNodeDelFailureReport

删除某个来源对某个节点的汇报

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
list *l = node->fail_reports;
listNode *ln;
listIter li;
clusterNodeFailReport *fr;

//该来源没有汇报过
listRewind(l,&li);
while ((ln = listNext(&li)) != NULL) {
fr = ln->value;
if (fr->node == sender) break;
}
if (!ln) return 0;

//删除
listDelNode(l,ln);
clusterNodeCleanupFailureReports(node);
return 1;
}
clusterNodeFailureReportsCount

获取节点被汇报的有效数量

1
2
3
4
5
int clusterNodeFailureReportsCount(clusterNode *node) {
//清理无效的汇报
clusterNodeCleanupFailureReports(node);
return listLength(node->fail_reports);
}
clusterNodeRemoveSlave

删除master下的某个slave

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
int j;

for (j = 0; j < master->numslaves; j++) {
if (master->slaves[j] == slave) {
if ((j+1) < master->numslaves) {
int remaining_slaves = (master->numslaves - j) - 1;
memmove(master->slaves+j,master->slaves+(j+1),
(sizeof(*master->slaves) * remaining_slaves));
}
master->numslaves--;
if (master->numslaves == 0)
master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
return C_OK;
}
}
return C_ERR;
}
clusterNodeAddSlave

master增加slave

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
int j;

//判重
for (j = 0; j < master->numslaves; j++)
if (master->slaves[j] == slave) return C_ERR;

master->slaves = zrealloc(master->slaves,
sizeof(clusterNode*)*(master->numslaves+1));
master->slaves[master->numslaves] = slave;
master->numslaves++;
master->flags |= CLUSTER_NODE_MIGRATE_TO;
return C_OK;
}
clusterCountNonFailingSlaves

统计master状态良好的slave数量

1
2
3
4
5
6
7
int clusterCountNonFailingSlaves(clusterNode *n) {
int j, okslaves = 0;

for (j = 0; j < n->numslaves; j++)
if (!nodeFailed(n->slaves[j])) okslaves++;
return okslaves;
}
freeClusterNode

释放节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void freeClusterNode(clusterNode *n) {
sds nodename;
int j;

//slave设置其master为null
for (j = 0; j < n->numslaves; j++)
n->slaves[j]->slaveof = NULL;

//自身是slave,从其master的slave中删除
if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);

//从节点集中删除
nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
sdsfree(nodename);

//释放连接
if (n->link) freeClusterLink(n->link);
listRelease(n->fail_reports);
zfree(n->slaves);
zfree(n);
}
clusterAddNode

增加节点

1
2
3
4
5
6
7
int clusterAddNode(clusterNode *node) {
int retval;

retval = dictAdd(server.cluster->nodes,
sdsnewlen(node->name,CLUSTER_NAMELEN), node);
return (retval == DICT_OK) ? C_OK : C_ERR;
}
clusterDelNode

删除节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void clusterDelNode(clusterNode *delnode) {
int j;
dictIterator *di;
dictEntry *de;

//处理迁出、迁入、槽点路由
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (server.cluster->importing_slots_from[j] == delnode)
server.cluster->importing_slots_from[j] = NULL;
if (server.cluster->migrating_slots_to[j] == delnode)
server.cluster->migrating_slots_to[j] = NULL;
if (server.cluster->slots[j] == delnode)
clusterDelSlot(j);
}

//删除挂掉汇报
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (node == delnode) continue;
clusterNodeDelFailureReport(node,delnode);
}
dictReleaseIterator(di);

//释放节点&连接
freeClusterNode(delnode);
}
clusterLookupNode

根据名称查找节点

1
2
3
4
5
6
7
8
9
clusterNode *clusterLookupNode(const char *name) {
sds s = sdsnewlen(name, CLUSTER_NAMELEN);
dictEntry *de;

de = dictFind(server.cluster->nodes,s);
sdsfree(s);
if (de == NULL) return NULL;
return dictGetVal(de);
}
clusterRenameNode

节点重命名

1
2
3
4
5
6
7
8
9
10
11
12
void clusterRenameNode(clusterNode *node, char *newname) {
int retval;
sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);

serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
node->name, newname);
retval = dictDelete(server.cluster->nodes, s);
sdsfree(s);
serverAssert(retval == DICT_OK);
memcpy(node->name, newname, CLUSTER_NAMELEN);
clusterAddNode(node);
}

配置纪元

clusterGetMaxEpoch

获取所有节点中最大的配置纪元

1
2
3
4
5
6
7
8
9
10
11
12
13
14
uint64_t clusterGetMaxEpoch(void) {
uint64_t max = 0;
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->configEpoch > max) max = node->configEpoch;
}
dictReleaseIterator(di);
if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
return max;
}
clusterBumpConfigEpochWithoutConsensus

从所有节点中获取最大的纪元,来更新自己的配置纪元,一步到位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int clusterBumpConfigEpochWithoutConsensus(void) {
uint64_t maxEpoch = clusterGetMaxEpoch();

if (myself->configEpoch == 0 ||
myself->configEpoch != maxEpoch)
{
server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);
serverLog(LL_WARNING,
"New configEpoch set to %llu",
(unsigned long long) myself->configEpoch);
return C_OK;
} else {
return C_ERR;
}
}
clusterHandleConfigEpochCollision

根据某个节点的配置纪元,来更新自己的配置纪元,自增

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void clusterHandleConfigEpochCollision(clusterNode *sender) {

//相同纪元 且 都是master才行
if (sender->configEpoch != myself->configEpoch ||
!nodeIsMaster(sender) || !nodeIsMaster(myself)) return;

//只有新来的名称比自己大才行
if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;

server.cluster->currentEpoch++;
myself->configEpoch = server.cluster->currentEpoch;
clusterSaveConfigOrDie(1);
serverLog(LL_VERBOSE,
"WARNING: configEpoch collision with node %.40s."
" configEpoch set to %llu",
sender->name,
(unsigned long long) myself->configEpoch);
}

后备列表

当我们删除一个节点的时候,为了控制在一段时间内,忽略掉该节点的重新加入,引入了一个后背列表的概念。就是当节点删除后,会把节点放在后背列表中,有新节点要加入的时候,我们会先看一下是否在后备列表中,在的话就忽略了。节点存在于这个列表中的时效是1分钟。

clusterBlacklistCleanup

清理列表中过期的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define CLUSTER_BLACKLIST_TTL 60 //节点在列表中存活时间

void clusterBlacklistCleanup(void) {
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes_black_list);
while((de = dictNext(di)) != NULL) {
int64_t expire = dictGetUnsignedIntegerVal(de);

if (expire < server.unixtime)
dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
}
dictReleaseIterator(di);
}
clusterBlacklistAddNode

向列表中增加节点,如果节点已经存在,则更新过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void clusterBlacklistAddNode(clusterNode *node) {
dictEntry *de;
sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);

clusterBlacklistCleanup();
if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
id = sdsdup(id);
}

de = dictFind(server.cluster->nodes_black_list,id);

//设置过期时间
dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
sdsfree(id);
}
clusterBlacklistExists

查找节点

1
2
3
4
5
6
7
8
9
10
int clusterBlacklistExists(char *nodeid) {
sds id = sdsnewlen(nodeid,CLUSTER_NAMELEN);
int retval;

//先清理过期
clusterBlacklistCleanup();
retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
sdsfree(id);
return retval;
}

集群消息

markNodeAsFailingIfNeeded

标记节点故障

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;
//最低票数,所有有效master的一半以上
int needed_quorum = (server.cluster->size / 2) + 1;

//心跳检测ok
if (!nodeTimedOut(node)) return;

//已经标记故障
if (nodeFailed(node)) return;

//该节点被其余节点汇报故障的数量
failures = clusterNodeFailureReportsCount(node);

//加上自己一票
if (nodeIsMaster(myself)) failures++;

//需要满足最低票数
if (failures < needed_quorum) return;

serverLog(LL_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);

//从主观下线->客观下线
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();

//告知其他节点
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
clearNodeFailureIfNeeded

清除故障标记

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void clearNodeFailureIfNeeded(clusterNode *node) {
mstime_t now = mstime();

serverAssert(nodeFailed(node));

//从节点直接清除
if (nodeIsSlave(node) || node->numslots == 0) {
serverLog(LL_NOTICE,
"Clear FAIL state for node %.40s: %s is reachable again.",
node->name,
nodeIsSlave(node) ? "replica" : "master without slots");
node->flags &= ~CLUSTER_NODE_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}

//master的话,则需要满足至少管理一个槽点,且在一段时间内没有其他节点接手其槽点
if (nodeIsMaster(node) && node->numslots > 0 &&
(now - node->fail_time) >
(server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
{
serverLog(LL_NOTICE,
"Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.",
node->name);
node->flags &= ~CLUSTER_NODE_FAIL;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
}
clusterHandshakeInProgress

是否有对应的ip&port&cport在握手中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int clusterHandshakeInProgress(char *ip, int port, int cport) {
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (!nodeInHandshake(node)) continue;
if (!strcasecmp(node->ip,ip) &&
node->port == port &&
node->cport == cport) break;
}
dictReleaseIterator(di);
return de != NULL;
}
clusterStartHandshake

开始握手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int clusterStartHandshake(char *ip, int port, int cport) {
clusterNode *n;
char norm_ip[NET_IP_STR_LEN];
struct sockaddr_storage sa;

//ip校验
if (inet_pton(AF_INET,ip,
&(((struct sockaddr_in *)&sa)->sin_addr)))
{
sa.ss_family = AF_INET;
} else if (inet_pton(AF_INET6,ip,
&(((struct sockaddr_in6 *)&sa)->sin6_addr)))
{
sa.ss_family = AF_INET6;
} else {
errno = EINVAL;
return 0;
}

//端口校验
if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
errno = EINVAL;
return 0;
}

memset(norm_ip,0,NET_IP_STR_LEN);
if (sa.ss_family == AF_INET)
inet_ntop(AF_INET,
(void*)&(((struct sockaddr_in *)&sa)->sin_addr),
norm_ip,NET_IP_STR_LEN);
else
inet_ntop(AF_INET6,
(void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
norm_ip,NET_IP_STR_LEN);

//去重
if (clusterHandshakeInProgress(norm_ip,port,cport)) {
errno = EAGAIN;
return 0;
}

//创建新的节点,打上标记,在下一次定时时,会创建连接
n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port;
n->cport = cport;
clusterAddNode(n);
return 1;
}
clusterProcessGossipSection

处理ping/pong/meet消息中的gossip信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
//gossip数量
uint16_t count = ntohs(hdr->count);
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);

while(count--) {
uint16_t flags = ntohs(g->flags);
clusterNode *node;
sds ci;

if (server.verbosity == LL_DEBUG) {
ci = representClusterNodeFlags(sdsempty(), flags);
serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
g->nodename,
g->ip,
ntohs(g->port),
ntohs(g->cport),
ci);
sdsfree(ci);
}

//查找节点
node = clusterLookupNode(g->nodename);
if (node) {
//该节点两边都知道

//消息来源是master
if (sender && nodeIsMaster(sender) && node != myself) {

//标记故障状态
if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
//新增故障汇报
if (clusterNodeAddFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
//打上标记
markNodeAsFailingIfNeeded(node);
} else {
//删除故障标记
if (clusterNodeDelFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}

//发送方认为该节点没有任何故障,自己这边也没有任何关于其的故障汇报,自己也没发送过ping给该节点,更新最近pong回复
if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
node->ping_sent == 0 &&
clusterNodeFailureReportsCount(node) == 0)
{
mstime_t pongtime = ntohl(g->pong_received);
pongtime *= 1000;

//收到回复时间比我们要大,且不是未来的
if (pongtime <= (server.mstime+500) &&
pongtime > node->pong_received)
{
node->pong_received = pongtime;
}
}

//该节点在我们看是故障的,但是发送方看来是ok的,且发送方汇报的ip和port不一样,我们更新其信息,并断开旧连接,使用新信息进行连接
if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
!(flags & CLUSTER_NODE_NOADDR) &&
!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
(strcasecmp(node->ip,g->ip) ||
node->port != ntohs(g->port) ||
node->cport != ntohs(g->cport)))
{
if (node->link) freeClusterLink(node->link);
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port);
node->cport = ntohs(g->cport);
node->flags &= ~CLUSTER_NODE_NOADDR;
}
} else {
//发送方知道节点,我们不知道,看看是否在后备列表中,不在的话,建立连接

if (sender &&
!(flags & CLUSTER_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename))
{
clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
}
}

g++;
}
}
nodeIp2String

转换ip到字符串

1
2
3
4
5
6
7
8
void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
if (announced_ip[0] != '\0') {
memcpy(buf,announced_ip,NET_IP_STR_LEN);
buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
} else {
anetPeerToString(link->fd, buf, NET_IP_STR_LEN, NULL);
}
}
nodeUpdateAddressIfNeeded

更新节点地址,接收到其余节点的发送的gossip消息时,根据其消息里自己的地址和当前连接的地址来更新自己存储的信息,优先级为:消息 > 当前连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
clusterMsg *hdr)
{
char ip[NET_IP_STR_LEN] = {0};
int port = ntohs(hdr->port);
int cport = ntohs(hdr->cport);

if (link == node->link) return 0;

nodeIp2String(ip,link,hdr->myip);
if (node->port == port && node->cport == cport &&
strcmp(ip,node->ip) == 0) return 0;

memcpy(node->ip,ip,sizeof(ip));
node->port = port;
node->cport = cport;
if (node->link) freeClusterLink(node->link);
node->flags &= ~CLUSTER_NODE_NOADDR;
serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
node->name, node->ip, node->port);

//该节点正好是自己的master
if (nodeIsSlave(myself) && myself->slaveof == node)
replicationSetMaster(node->ip, node->port);
return 1;
}
clusterSetNodeAsMaster

设置节点为master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void clusterSetNodeAsMaster(clusterNode *n) {
if (nodeIsMaster(n)) return;

if (n->slaveof) {
clusterNodeRemoveSlave(n->slaveof,n);
if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
}
n->flags &= ~CLUSTER_NODE_SLAVE;
n->flags |= CLUSTER_NODE_MASTER;
n->slaveof = NULL;

/* Update config and state. */
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
clusterUpdateSlotsConfigWith

更新槽点配置,节点交流时,会汇报自己管理的槽点,接收方需要根据此来更新自己的路由表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
int j;
clusterNode *curmaster, *newmaster = NULL;

uint16_t dirty_slots[CLUSTER_SLOTS];
int dirty_slots_count = 0;

curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;

if (sender == myself) {
serverLog(LL_WARNING,"Discarding UPDATE message about myself.");
return;
}

for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots,j)) {
if (server.cluster->slots[j] == sender) continue;

//迁入中
if (server.cluster->importing_slots_from[j]) continue;

//槽点没有归属或者自己的槽点配置太低了,则更新
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch < senderConfigEpoch)
{
//脏槽点
if (server.cluster->slots[j] == myself &&
countKeysInSlot(j) &&
sender != myself)
{
dirty_slots[dirty_slots_count] = j;
dirty_slots_count++;
}

//删除旧槽点归属,建立新归属
if (server.cluster->slots[j] == curmaster)
newmaster = sender;
clusterDelSlot(j);
clusterAddSlot(sender,j);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
}
}
}

if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return;

if (newmaster && curmaster->numslots == 0) {
serverLog(LL_WARNING,
"Configuration change detected. Reconfiguring myself "
"as a replica of %.40s", sender->name);
clusterSetMaster(sender);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
} else if (dirty_slots_count) {
for (j = 0; j < dirty_slots_count; j++)
delKeysInSlot(dirty_slots[j]);
}
}
clusterProcessPacket

处理数据包,读取集群节点发送的数据,解析消息执行。在该方法中,需要注意一点,link->node->name和hdr->sender。节点在发送消息时,会在消息里填上自己的name也就是hdr->sender。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
int clusterProcessPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
uint32_t totlen = ntohl(hdr->totlen);
uint16_t type = ntohs(hdr->type);

//无效消息
if (type < CLUSTERMSG_TYPE_COUNT)
server.cluster->stats_bus_messages_received[type]++;
serverLog(LL_DEBUG,"--- Processing packet of type %d, %lu bytes",
type, (unsigned long) totlen);

//一个消息最小长度,小于16的话,我们还不能处理
if (totlen < 16) return 1;
//一个完整的消息还没接收完
if (totlen > sdslen(link->rcvbuf)) return 1;

//校验版本号
if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
return 1;
}

uint16_t flags = ntohs(hdr->flags);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
clusterNode *sender;

//针对不同消息,获取期望的长度
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
uint16_t count = ntohs(hdr->count);
uint32_t explen;

explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += (sizeof(clusterMsgDataGossip)*count);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_FAIL) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

explen += sizeof(clusterMsgDataFail);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_PUBLISH) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

explen += sizeof(clusterMsgDataPublish) -
8 +
ntohl(hdr->data.publish.msg.channel_len) +
ntohl(hdr->data.publish.msg.message_len);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
type == CLUSTERMSG_TYPE_MFSTART)
{
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

explen += sizeof(clusterMsgDataUpdate);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_MODULE) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);

explen += sizeof(clusterMsgDataPublish) -
3 + ntohl(hdr->data.module.msg.len);
if (totlen != explen) return 1;
}

//查找消息来源节点
sender = clusterLookupNode(hdr->sender);

//正常的通信
if (sender && !nodeInHandshake(sender)) {
//更新纪元
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
senderConfigEpoch = ntohu64(hdr->configEpoch);
if (senderCurrentEpoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = senderCurrentEpoch;

if (senderConfigEpoch > sender->configEpoch) {
sender->configEpoch = senderConfigEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);
}

//更新同步偏移量
sender->repl_offset = ntohu64(hdr->offset);
sender->repl_offset_time = mstime();

//我们是从节点,手动故障修复的情况,收到master的消息,我们记录下暂停的偏移量

if (server.cluster->mf_end &&
nodeIsSlave(myself) &&
myself->slaveof == sender &&
hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
server.cluster->mf_master_offset == 0)
{
server.cluster->mf_master_offset = sender->repl_offset;
serverLog(LL_WARNING,
"Received replication offset for paused "
"master manual failover: %lld",
server.cluster->mf_master_offset);
}
}

//ping&pong
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
serverLog(LL_DEBUG,"Ping packet received: %p", (void*)link->node);

//其他节点meet我们时,初始化自己的ip信息
if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
server.cluster_announce_ip == NULL)
{
char ip[NET_IP_STR_LEN];

if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
strcmp(ip,myself->ip))
{
memcpy(myself->ip,ip,NET_IP_STR_LEN);
serverLog(LL_WARNING,"IP address for this node updated to %s",
myself->ip);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}

//meet消息,发送方是我们不认识的,则把其当做集群中的一个节点,有且只有meet消息能建立拓扑
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;

node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
nodeIp2String(node->ip,link,hdr->myip);
node->port = ntohs(hdr->port);
node->cport = ntohs(hdr->cport);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}

//只要消息是meet,我们就相信他发的拓扑信息
if (!sender && type == CLUSTERMSG_TYPE_MEET)
clusterProcessGossipSection(hdr,link);

//回复pong
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
}

//处理ping,pong,meet配置信息
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
serverLog(LL_DEBUG,"%s packet received: %p",
type == CLUSTERMSG_TYPE_PING ? "ping" : "pong",
(void*)link->node);

//link->node表示是我们的主动连接
if (link->node) {
//握手节点的处理
if (nodeInHandshake(link->node)) {
//握手阶段我们是不知道该节点的,如果知道的话,是有问题的,尝试更新节点对应的ip&port
if (sender) {
serverLog(LL_VERBOSE,
"Handshake: we already know node %.40s, "
"updating the address if needed.", sender->name);
if (nodeUpdateAddressIfNeeded(sender,link,hdr))
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
clusterDelNode(link->node);
return 0;
}

//正常握手结束,命名节点名称 & 去除握手标记
clusterRenameNode(link->node, hdr->sender);
serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
link->node->name);
link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
} else if (memcmp(link->node->name,hdr->sender,
CLUSTER_NAMELEN) != 0)
{
//我们已知节点名和消息里自报名称不一样,可能是节点发生了重启,id发生了变化,重置连接
serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
link->node->name,
(int)(mstime()-(link->node->ctime)),
link->node->flags);
link->node->flags |= CLUSTER_NODE_NOADDR;
link->node->ip[0] = '\0';
link->node->port = 0;
link->node->cport = 0;
freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
return 0;
}
}

//更新故障标志
if (sender) {
int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
sender->flags |= nofailover;
}

//更新尝试更新ip
if (sender && type == CLUSTERMSG_TYPE_PING &&
!nodeInHandshake(sender) &&
nodeUpdateAddressIfNeeded(sender,link,hdr))
{
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}

//其余节点对我们主动发出的心跳检测回复
if (link->node && type == CLUSTERMSG_TYPE_PONG) {

//更新最新pong回复时间,ping发送时间清零
link->node->pong_received = mstime();
link->node->ping_sent = 0;

//去除节点的客观下线标记
if (nodeTimedOut(link->node)) {
link->node->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
} else if (nodeFailed(link->node)) {
//尝试剔除主观下线标记
clearNodeFailureIfNeeded(link->node);
}
}

//检查master/slave角色变化
if (sender) {

//节点自报salveof为空,是master
if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
sizeof(hdr->slaveof)))
{
clusterSetNodeAsMaster(sender);
} else {
//自报是slave

//查找对应的master
clusterNode *master = clusterLookupNode(hdr->slaveof);

//我们已知是master,现在自报是slave
if (nodeIsMaster(sender)) {
//删除槽点管理
clusterDelNodeSlots(sender);
sender->flags &= ~(CLUSTER_NODE_MASTER|
CLUSTER_NODE_MIGRATE_TO);
sender->flags |= CLUSTER_NODE_SLAVE;

clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}

//master发生变化
if (master && sender->slaveof != master) {
//从旧的master中剔除该slave
if (sender->slaveof)
clusterNodeRemoveSlave(sender->slaveof,sender);
//新的master增加slave
clusterNodeAddSlave(master,sender);
sender->slaveof = master;

clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
}
}

//更新槽点路由表
clusterNode *sender_master = NULL; //消息节点对应的master
int dirty_slots = 0; //和我们已知的槽点映射不同的数量

if (sender) {
sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;

//master已知的话,直接比对槽点位图
if (sender_master) {
dirty_slots = memcmp(sender_master->slots,
hdr->myslots,sizeof(hdr->myslots)) != 0;
}
}

//消息来源是master(我们也只更新master对应的槽点),并且消息汇报和我们已知的映射有不一样的,更新节点位图和路由表
if (sender && nodeIsMaster(sender) && dirty_slots)
clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);

//如果发送方的信息是滞后的,我们告知其update
if (sender && dirty_slots) {
int j;

for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(hdr->myslots,j)) {
if (server.cluster->slots[j] == sender ||
server.cluster->slots[j] == NULL) continue;

//信息滞后
if (server.cluster->slots[j]->configEpoch >
senderConfigEpoch)
{
serverLog(LL_VERBOSE,
"Node %.40s has old slots configuration, sending "
"an UPDATE message about %.40s",
sender->name, server.cluster->slots[j]->name);

//发送update
clusterSendUpdate(sender->link,
server.cluster->slots[j]);
break;
}
}
}
}

//更新配置纪元
if (sender &&
nodeIsMaster(myself) && nodeIsMaster(sender) &&
senderConfigEpoch == myself->configEpoch)
{
clusterHandleConfigEpochCollision(sender);
}

//相信已知节点的拓扑消息
if (sender) clusterProcessGossipSection(hdr,link);
} else if (type == CLUSTERMSG_TYPE_FAIL) {
//节点客观下线消息
clusterNode *failing;

if (sender) {
failing = clusterLookupNode(hdr->data.fail.about.nodename);

//只要节点是我们已知的,直接将其标记客观下线,不经过主管->客观了
if (failing &&
!(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
{
serverLog(LL_NOTICE,
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
failing->flags |= CLUSTER_NODE_FAIL;
failing->fail_time = mstime();
failing->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
}
} else {
serverLog(LL_NOTICE,
"Ignoring FAIL message from unknown node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}
} else if (type == CLUSTERMSG_TYPE_PUBLISH) {
//发布消息
robj *channel, *message;
uint32_t channel_len, message_len;

//只有当目前频道不为空的时候,才处理
if (dictSize(server.pubsub_channels) ||
listLength(server.pubsub_patterns))
{
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject(
(char*)hdr->data.publish.msg.bulk_data,channel_len);
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
message_len);
pubsubPublishMessage(channel,message);
decrRefCount(channel);
decrRefCount(message);
}
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
//请求投票
if (!sender) return 1; //不知道节点的话,不回复
clusterSendFailoverAuthIfNeeded(sender,hdr);
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
//收到投票的同意
if (!sender) return 1;

//有效的master才算票数
if (nodeIsMaster(sender) && sender->numslots > 0 &&
senderCurrentEpoch >= server.cluster->failover_auth_epoch)
{
server.cluster->failover_auth_count++;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
} else if (type == CLUSTERMSG_TYPE_MFSTART) {
//手动故障修复,只能发生在自己是master,消息来源是我们的一个slave
if (!sender || sender->slaveof != myself) return 1;

resetManualFailover();
//故障修复时间截止
server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
//故障修复主导的slave
server.cluster->mf_slave = sender;
//暂停客户端
pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*2));
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
sender->name);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
//更新槽点信息
clusterNode *n;
uint64_t reportedConfigEpoch =
ntohu64(hdr->data.update.nodecfg.configEpoch);

if (!sender) return 1;
n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
if (!n) return 1;
if (n->configEpoch >= reportedConfigEpoch) return 1;

if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);

n->configEpoch = reportedConfigEpoch;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_FSYNC_CONFIG);

//更新槽点
clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
hdr->data.update.nodecfg.slots);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
//模块消息
if (!sender) return 1;

uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
uint32_t len = ntohl(hdr->data.module.msg.len);
uint8_t type = hdr->data.module.msg.type;
unsigned char *payload = hdr->data.module.msg.bulk_data;
moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
} else {
serverLog(LL_WARNING,"Received unknown packet type: %d", type);
}
return 1;
}
handleLinkIOError

处理连接IO错误

1
2
3
void handleLinkIOError(clusterLink *link) {
freeClusterLink(link);
}
clusterWriteHandler

处理连接写操作,将缓冲区内容发送出去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
clusterLink *link = (clusterLink*) privdata;
ssize_t nwritten;
UNUSED(el);
UNUSED(mask);

nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf));
if (nwritten <= 0) {
serverLog(LL_DEBUG,"I/O error writing to node link: %s",
(nwritten == -1) ? strerror(errno) : "short write");
handleLinkIOError(link);
return;
}
sdsrange(link->sndbuf,nwritten,-1);

//消息发送完了,剔除写事件监听
if (sdslen(link->sndbuf) == 0)
aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE);
}
clusterReadHandler

处理连接读操作,在第一次建立完连接后,所有的消息处理都是在这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[sizeof(clusterMsg)];
ssize_t nread;
clusterMsg *hdr;
clusterLink *link = (clusterLink*) privdata;
unsigned int readlen, rcvbuflen;
UNUSED(el);
UNUSED(mask);

while(1) {
//已接收到未处理的长度
rcvbuflen = sdslen(link->rcvbuf);
//我们需要根据头8个字节来判断消息类型和总的消息长度,所以不够8字节的话,要先读够8字节
if (rcvbuflen < 8) {
readlen = 8 - rcvbuflen;
} else {
hdr = (clusterMsg*) link->rcvbuf;
if (rcvbuflen == 8) {
//读够8字节,先校验协议和数据长度是否满足最小要求
if (memcmp(hdr->sig,"RCmb",4) != 0 ||
ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
{
serverLog(LL_WARNING,
"Bad message length or signature received "
"from Cluster bus.");
handleLinkIOError(link);
return;
}
}
//确定一个完成消息,还需要读取多少字节
readlen = ntohl(hdr->totlen) - rcvbuflen;

//消息太长的话,分批处理,一次只读固定长度
if (readlen > sizeof(buf)) readlen = sizeof(buf);
}

nread = read(fd,buf,readlen);
if (nread == -1 && errno == EAGAIN) return;

if (nread <= 0) {
/* I/O error... */
serverLog(LL_DEBUG,"I/O error reading from node link: %s",
(nread == 0) ? "connection closed" : strerror(errno));
handleLinkIOError(link);
return;
} else {
//把数据放到link缓冲区
link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread);
hdr = (clusterMsg*) link->rcvbuf;
rcvbuflen += nread;
}

//已经读取到完成的消息了,开始处理
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
if (clusterProcessPacket(link)) {
sdsfree(link->rcvbuf);
link->rcvbuf = sdsempty();
} else {
return;
}
}
}
}
clusterSendMessage

向连接发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
//当连接没有数据时,建立写监听事件
if (sdslen(link->sndbuf) == 0 && msglen != 0)
aeCreateFileEvent(server.el,link->fd,AE_WRITABLE|AE_BARRIER,
clusterWriteHandler,link);

//写入数据到缓冲区
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);

//命令统计信息
clusterMsg *hdr = (clusterMsg*) msg;
uint16_t type = ntohs(hdr->type);
if (type < CLUSTERMSG_TYPE_COUNT)
server.cluster->stats_bus_messages_sent[type]++;
}
clusterBroadcastMessage

向所有有效节点发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void clusterBroadcastMessage(void *buf, size_t len) {
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

//过滤没有连接和非握手中的节点
if (!node->link) continue;
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
clusterSendMessage(node->link,buf,len);
}
dictReleaseIterator(di);
}
clusterBuildMessageHdr

构建消息头

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
int totlen = 0;
uint64_t offset;
clusterNode *master;

master = (nodeIsSlave(myself) && myself->slaveof) ?
myself->slaveof : myself;

//格式化,清零
memset(hdr,0,sizeof(*hdr));

//消息版本
hdr->ver = htons(CLUSTER_PROTO_VER);

//头四位标记
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
hdr->sig[2] = 'm';
hdr->sig[3] = 'b';
hdr->type = htons(type);

//发送方名称
memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);

//ip
memset(hdr->myip,0,NET_IP_STR_LEN);
if (server.cluster_announce_ip) {
strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
hdr->myip[NET_IP_STR_LEN-1] = '\0';
}

int announced_port = server.cluster_announce_port ?
server.cluster_announce_port : server.port;
int announced_cport = server.cluster_announce_bus_port ?
server.cluster_announce_bus_port :
(server.port + CLUSTER_PORT_INCR);

//master对应的槽点位图
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));

//aster名称
memset(hdr->slaveof,0,CLUSTER_NAMELEN);
if (myself->slaveof != NULL)
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);

//发送方普通端口、集群端口,标记、状态
hdr->port = htons(announced_port);
hdr->cport = htons(announced_cport);
hdr->flags = htons(myself->flags);
hdr->state = server.cluster->state;

//纪元和配置纪元
hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
hdr->configEpoch = htonu64(master->configEpoch);

//主从同步偏移量
if (nodeIsSlave(myself))
offset = replicationGetSlaveOffset();
else
offset = server.master_repl_offset;
hdr->offset = htonu64(offset);

//消息标志位,客户端暂停标志,目前正进行故障修复
if (nodeIsMaster(myself) && server.cluster->mf_end)
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;

//计算总的消息长度,这里只计算了FAIL和UPDATE类型,其余类型需要会涉及到消息体数量的问题,所以需要自己计算
if (type == CLUSTERMSG_TYPE_FAIL) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataUpdate);
}
hdr->totlen = htonl(totlen);
}
clusterNodeIsInGossipSection

判断某个节点是否在gossip协议中

1
2
3
4
5
6
7
8
int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) {
int j;
for (j = 0; j < count; j++) {
if (memcmp(hdr->data.ping.gossip[j].nodename,n->name,
CLUSTER_NAMELEN) == 0) break;
}
return j != count;
}
clusterSetGossipEntry

设置goosip第一个消息

1
2
3
4
5
6
7
8
9
10
11
12
void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
clusterMsgDataGossip *gossip;
gossip = &(hdr->data.ping.gossip[i]);
memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
gossip->ping_sent = htonl(n->ping_sent/1000);
gossip->pong_received = htonl(n->pong_received/1000);
memcpy(gossip->ip,n->ip,sizeof(n->ip));
gossip->port = htons(n->port);
gossip->cport = htons(n->cport);
gossip->flags = htons(n->flags);
gossip->notused1 = 0;
}
clusterSendPing

发送心跳检测命令,根据type决定是ping/pong/meet,因为这三者只有类型不同,其余都一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
void clusterSendPing(clusterLink *link, int type) {
unsigned char *buf;
clusterMsg *hdr;
int gossipcount = 0;
int wanted; //期望的goosip数量
int totlen; //数据总长度

//有效节点数:总的节点数量-自己-发送目标节点
int freshnodes = dictSize(server.cluster->nodes)-2;

//期望发送的节点数量,1/10或者三个
wanted = floor(dictSize(server.cluster->nodes)/10);
if (wanted < 3) wanted = 3;
if (wanted > freshnodes) wanted = freshnodes;

//主观下线的节点都加上,为了更快的完成从主观下线到客观下线
int pfail_wanted = server.cluster->stats_pfail_nodes;

//计算总的消息长度
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));

if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
buf = zcalloc(totlen);
hdr = (clusterMsg*) buf;

//记录ping发送时间
if (link->node && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();

//构建消息头
clusterBuildMessageHdr(hdr,type);

//生成gossip内容
int maxiterations = wanted*3; //因为是随机取的节点,所以会有重复的,我们提高随机获取的数量,尽可能的保证接近wanted个不同的节点
while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
//随机获取
dictEntry *de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);

//剔除自己,因为消息头里有自己
if (this == myself) continue;

//主观下线的会在后面
if (this->flags & CLUSTER_NODE_PFAIL) continue;

//剔除握手&没有地址的节点,以及非有效节点
if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
(this->link == NULL && this->numslots == 0))
{
freshnodes--;
continue;
}

//去重
if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue;

//增加
clusterSetGossipEntry(hdr,gossipcount,this);
freshnodes--;
gossipcount++;
}

//添加全部的主观下线节点
if (pfail_wanted) {
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
clusterNode *node = dictGetVal(de);
if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
if (node->flags & CLUSTER_NODE_NOADDR) continue;
if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
clusterSetGossipEntry(hdr,gossipcount,node);
freshnodes--;
gossipcount++;
pfail_wanted--;
}
dictReleaseIterator(di);
}

//计算总的消息长度
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
hdr->count = htons(gossipcount);
hdr->totlen = htonl(totlen);

//发送消息
clusterSendMessage(link,buf,totlen);
zfree(buf);
}
clusterBroadcastPong

向所有节点广播pong消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#define CLUSTER_BROADCAST_ALL 0 //所有的节点
#define CLUSTER_BROADCAST_LOCAL_SLAVES 1 //我们的salve节点
void clusterBroadcastPong(int target) {
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (!node->link) continue;
if (node == myself || nodeInHandshake(node)) continue;
if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
int local_slave =
nodeIsSlave(node) && node->slaveof &&
(node->slaveof == myself || node->slaveof == myself->slaveof);
if (!local_slave) continue;
}
clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
}
dictReleaseIterator(di);
}
clusterSendPublish

发布消息,link为空时,目标为所有节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
unsigned char buf[sizeof(clusterMsg)], *payload;
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
uint32_t channel_len, message_len;

channel = getDecodedObject(channel);
message = getDecodedObject(message);
channel_len = sdslen(channel->ptr);
message_len = sdslen(message->ptr);

clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;

hdr->data.publish.msg.channel_len = htonl(channel_len);
hdr->data.publish.msg.message_len = htonl(message_len);
hdr->totlen = htonl(totlen);

if (totlen < sizeof(buf)) {
payload = buf;
} else {
payload = zmalloc(totlen);
memcpy(payload,hdr,sizeof(*hdr));
hdr = (clusterMsg*) payload;
}
memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
message->ptr,sdslen(message->ptr));

if (link)
clusterSendMessage(link,payload,totlen);
else
clusterBroadcastMessage(payload,totlen);

decrRefCount(channel);
decrRefCount(message);
if (payload != buf) zfree(payload);
}
clusterSendFail

广播某个节点客观下线

1
2
3
4
5
6
7
8
void clusterSendFail(char *nodename) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;

clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
clusterBroadcastMessage(buf,ntohl(hdr->totlen));

clusterSendUpdate

告诉某个节点其配置需要更新

1
2
3
4
5
6
7
8
9
10
11
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;

if (link == NULL) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
clusterSendMessage(link,buf,ntohl(hdr->totlen));
}
clusterSendModule

发送模块消息,link为空时,目标为所有节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
unsigned char *payload, uint32_t len) {
unsigned char buf[sizeof(clusterMsg)], *heapbuf;
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;

clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgModule) - 3 + len;

hdr->data.module.msg.module_id = module_id;
hdr->data.module.msg.type = type;
hdr->data.module.msg.len = htonl(len);
hdr->totlen = htonl(totlen);

if (totlen < sizeof(buf)) {
heapbuf = buf;
} else {
heapbuf = zmalloc(totlen);
memcpy(heapbuf,hdr,sizeof(*hdr));
hdr = (clusterMsg*) heapbuf;
}
memcpy(hdr->data.module.msg.bulk_data,payload,len);

if (link)
clusterSendMessage(link,heapbuf,totlen);
else
clusterBroadcastMessage(heapbuf,totlen);

if (heapbuf != buf) zfree(heapbuf);
}
clusterSendModuleMessageToTarget

向目标发送模块消息

1
2
3
4
5
6
7
8
9
10
11
12
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len) {
clusterNode *node = NULL;

if (target != NULL) {
node = clusterLookupNode(target);
if (node == NULL || node->link == NULL) return C_ERR;
}

clusterSendModule(target ? node->link : NULL,
module_id, type, payload, len);
return C_OK;
}
clusterPropagatePublish

广播发布消息

1
2
3
void clusterPropagatePublish(robj *channel, robj *message) {
clusterSendPublish(NULL, channel, message);
}

slave节点相关

clusterRequestFailoverAuth

向所有节点发起投票请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void clusterRequestFailoverAuth(void) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;

clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);

//手动开始修复的话,那么这个投票是必须要回应的,哪怕master是良好的
if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);

//广播消息
clusterBroadcastMessage(buf,totlen);
}
clusterSendFailoverAuth

回复投票确认,不投票的节点,是不发这个消息的

1
2
3
4
5
6
7
8
9
10
11
void clusterSendFailoverAuth(clusterNode *node) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;

if (!node->link) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterSendMessage(node->link,buf,totlen);
}
clusterSendMFStart

向某个节点发送手动修复消息

1
2
3
4
5
6
7
8
9
10
11
void clusterSendMFStart(clusterNode *node) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;

if (!node->link) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterSendMessage(node->link,buf,totlen);
}
clusterSendFailoverAuthIfNeeded

收到请求投票的请求,处理要不要为其投票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
clusterNode *master = node->slaveof;
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
unsigned char *claimed_slots = request->myslots;
int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
int j;

//我们不是有效的master
if (nodeIsSlave(myself) || myself->numslots == 0) return;

//拒绝为纪元低于我们的投票
if (requestCurrentEpoch < server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
node->name,
(unsigned long long) requestCurrentEpoch,
(unsigned long long) server.cluster->currentEpoch);
return;
}

//同一个纪元已经投过票了
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: already voted for epoch %llu",
node->name,
(unsigned long long) server.cluster->currentEpoch);
return;
}

if (nodeIsMaster(node) || master == NULL ||
(!nodeFailed(master) && !force_ack))
{
//投票发起只能是slave
if (nodeIsMaster(node)) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: it is a master node",
node->name);
} else if (master == NULL) {
//我们不知道其master
serverLog(LL_WARNING,
"Failover auth denied to %.40s: I don't know its master",
node->name);
} else if (!nodeFailed(master)) {
//master是良好的,拒绝投
serverLog(LL_WARNING,
"Failover auth denied to %.40s: its master is up",
node->name);
}
return;
}

//投票还没有过期
if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
{
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"can't vote about this master before %lld milliseconds",
node->name,
(long long) ((server.cluster_node_timeout*2)-
(mstime() - node->slaveof->voted_time)));
return;
}

//slave保存的master槽点映射是旧的
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(claimed_slots, j) == 0) continue;
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
{
continue;
}
/* If we reached this point we found a slot that in our current slots
* is served by a master with a greater configEpoch than the one claimed
* by the slave requesting our vote. Refuse to vote for this slave. */
serverLog(LL_WARNING,
"Failover auth denied to %.40s: "
"slot %d epoch (%llu) > reqEpoch (%llu)",
node->name, j,
(unsigned long long) server.cluster->slots[j]->configEpoch,
(unsigned long long) requestConfigEpoch);
return;
}

//记录投票纪元和时间
server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
node->slaveof->voted_time = mstime();
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
clusterSendFailoverAuth(node);
serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
node->name, (unsigned long long) server.cluster->currentEpoch);
}
clusterGetSlaveRank

当自己是salve时,其他slave中同步偏移量比自己大的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int clusterGetSlaveRank(void) {
long long myoffset;
int j, rank = 0;
clusterNode *master;

serverAssert(nodeIsSlave(myself));
master = myself->slaveof;
if (master == NULL) return 0;

myoffset = replicationGetSlaveOffset();
for (j = 0; j < master->numslaves; j++)
if (master->slaves[j] != myself &&
!nodeCantFailover(master->slaves[j]) &&
master->slaves[j]->repl_offset > myoffset) rank++;
return rank;
}
clusterLogCantFailover

记录不能发起故障修复的原因日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void clusterLogCantFailover(int reason) {
char *msg;
static time_t lastlog_time = 0;
mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;

//同个原因一段时间内只记录一次
if (reason == server.cluster->cant_failover_reason &&
time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
return;

server.cluster->cant_failover_reason = reason;

//master客观下线没多久,也不记录
if (myself->slaveof &&
nodeFailed(myself->slaveof) &&
(mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;

switch(reason) {
case CLUSTER_CANT_FAILOVER_DATA_AGE:
msg = "Disconnected from master for longer than allowed. "
"Please check the 'cluster-replica-validity-factor' configuration "
"option.";
break;
case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
msg = "Waiting the delay before I can start a new failover.";
break;
case CLUSTER_CANT_FAILOVER_EXPIRED:
msg = "Failover attempt expired.";
break;
case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
msg = "Waiting for votes, but majority still not reached.";
break;
default:
msg = "Unknown reason code.";
break;
}
lastlog_time = time(NULL);
serverLog(LL_WARNING,"Currently unable to failover: %s", msg);
}
clusterFailoverReplaceYourMaster

从节点在选举成功后,用自己替换掉master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void clusterFailoverReplaceYourMaster(void) {
int j;
clusterNode *oldmaster = myself->slaveof;

if (nodeIsMaster(myself) || oldmaster == NULL) return;

//salve提升为master,master变为slave
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();

//槽点管理变更
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (clusterNodeGetSlotBit(oldmaster,j)) {
clusterDelSlot(j);
clusterAddSlot(myself,j);
}
}

//更新状态&配置
clusterUpdateState();
clusterSaveConfigOrDie(1);

//通知其他节点
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

//清空故障修复状态
resetManualFailover();
}
clusterHandleSlaveFailover

slave处理故障修复入口,包括发起、修正、重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
int needed_quorum = (server.cluster->size / 2) + 1; //需要获取的最低票数
int manual_failover = server.cluster->mf_end != 0 && //是否是手动修复
server.cluster->mf_can_start;
mstime_t auth_timeout, auth_retry_time;

server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;

//选举超时时间
auth_timeout = server.cluster_node_timeout*2;
if (auth_timeout < 2000) auth_timeout = 2000;
auth_retry_time = auth_timeout*2;

//自动发起的条件 1.自己是salve 2.master挂掉了 3.master是有效的槽点管理者
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
(server.cluster_slave_no_failover && !manual_failover) ||
myself->slaveof->numslots == 0)
{
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return;
}

//记录最近一次更新数据的时间
if (server.repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
}

if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;

if (server.cluster_slave_validity_factor &&
data_age >
(((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * server.cluster_slave_validity_factor)))
{
if (!manual_failover) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
return;
}
}

//发起重试
if (auth_age > auth_retry_time) {
//加上额外时间,是让其他节点能够收到fail消息
server.cluster->failover_auth_time = mstime() +
500 +
random() % 500;
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_auth_rank = clusterGetSlaveRank();

server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;

//手动修复,不要延迟
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
}
serverLog(LL_WARNING,
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(),
server.cluster->failover_auth_rank,
replicationGetSlaveOffset());

//广播信息,通知自己的offset
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
return;
}

//设置自动情况下,发起选举的时间
if (server.cluster->failover_auth_sent == 0 &&
server.cluster->mf_end == 0)
{
int newrank = clusterGetSlaveRank();
if (newrank > server.cluster->failover_auth_rank) {
long long added_delay =
(newrank - server.cluster->failover_auth_rank) * 1000;
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_auth_rank = newrank;
serverLog(LL_WARNING,
"Replica rank updated to #%d, added %lld milliseconds of delay.",
newrank, added_delay);
}
}

//发起时间校验
if (mstime() < server.cluster->failover_auth_time) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
return;
}

//选举过期
if (auth_age > auth_timeout) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
return;
}

//发起选举
if (server.cluster->failover_auth_sent == 0) {
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);

//广播消息,希望其他master为自己投票
clusterRequestFailoverAuth();
server.cluster->failover_auth_sent = 1;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
return;
}

//检查票数是否满足
if (server.cluster->failover_auth_count >= needed_quorum) {

serverLog(LL_WARNING,
"Failover election won: I'm the new master.");

//更新选举纪元到自己的配置纪元
if (myself->configEpoch < server.cluster->failover_auth_epoch) {
myself->configEpoch = server.cluster->failover_auth_epoch;
serverLog(LL_WARNING,
"configEpoch set to %llu after successful failover",
(unsigned long long) myself->configEpoch);
}

//替换master
clusterFailoverReplaceYourMaster();
} else {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
}
}

slave迁移

clusterHandleSlaveMigration

把salve变成另一个master的salve,为了解决部分master下有多个状态良好的salve,一些master下一个状态良好的salve都没有的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
void clusterHandleSlaveMigration(int max_slaves) {
int j, okslaves = 0;
clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
dictIterator *di;
dictEntry *de;

//检查状态
if (server.cluster->state != CLUSTER_OK) return;

if (mymaster == NULL) return;

//状态良好的从节点数量
for (j = 0; j < mymaster->numslaves; j++)
if (!nodeFailed(mymaster->slaves[j]) &&
!nodeTimedOut(mymaster->slaves[j])) okslaves++;

//数量是否满足要求
if (okslaves <= server.cluster_migration_barrier) return;

//选中的slave
candidate = myself;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
int okslaves = 0, is_orphaned = 1;

//迁移的对象之能是master,且有迁入标识
if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;

//迁移的对象必须是没有一个状态良好的slave
if (nodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
if (okslaves > 0) is_orphaned = 0;


if (is_orphaned) {
//确定迁移的目标和时间
if (!target && node->numslots > 0) target = node;

if (!node->orphaned_time) node->orphaned_time = mstime();
} else {
node->orphaned_time = 0;
}

//确定一个id最小的slave
if (okslaves == max_slaves) {
for (j = 0; j < node->numslaves; j++) {
if (memcmp(node->slaves[j]->name,
candidate->name,
CLUSTER_NAMELEN) < 0)
{
candidate = node->slaves[j];
}
}
}
}
dictReleaseIterator(di);

//时间过了延迟等待后,开始迁移
if (target && candidate == myself &&
(mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
{
serverLog(LL_WARNING,"Migrating to orphaned master %.40s",
target->name);
clusterSetMaster(target);
}
}

手动修复

resetManualFailover

重置手动修复信息

1
2
3
4
5
6
7
8
9
10
void resetManualFailover(void) {
if (server.cluster->mf_end && clientsArePaused()) {
server.clients_pause_end_time = 0;
clientsArePaused();
}
server.cluster->mf_end = 0; //结束时间清零
server.cluster->mf_can_start = 0;
server.cluster->mf_slave = NULL; //选中主导的salve
server.cluster->mf_master_offset = 0;
}
manualFailoverCheckTimeout

手动修复过程检查超时

1
2
3
4
5
6
void manualFailoverCheckTimeout(void) {
if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
serverLog(LL_WARNING,"Manual failover timed out.");
resetManualFailover();
}
}
clusterHandleManualFailover

手动修复入口,用于处理能否向下一步走

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void clusterHandleManualFailover(void) {
//等于0标识没有手动修复流程在跑
if (server.cluster->mf_end == 0) return;

if (server.cluster->mf_can_start) return;

if (server.cluster->mf_master_offset == 0) return;

//同步偏移量和master一致,可以开始
if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
server.cluster->mf_can_start = 1;
serverLog(LL_WARNING,
"All master replication stream processed, "
"manual failover can start.");
}
}

集群定时任务

clusterCron

和大部分的定时任务一样,都是由上一层的serverCron调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; //正常slave数量为0的master个数
int max_slaves; //最多的正常slave数量
int this_slaves; //自己对应master的正常slave数量
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
static unsigned long long iteration = 0;
mstime_t handshake_timeout;

iteration++; //此方法调用次数

//检查ip是否发生变化,从server.cluster_announce_ip到myself->ip
{
static char *prev_ip = NULL;
char *curr_ip = server.cluster_announce_ip;
int changed = 0;

if (prev_ip == NULL && curr_ip != NULL) changed = 1;
else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;

if (changed) {
if (prev_ip) zfree(prev_ip);
prev_ip = curr_ip;

if (curr_ip) {
prev_ip = zstrdup(prev_ip);
strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
myself->ip[NET_IP_STR_LEN-1] = '\0';
} else {
myself->ip[0] = '\0'; /* Force autodetection. */
}
}
}

//握手超时时间
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;

//更新标记
clusterUpdateMyselfFlags();

//针对没有连接的node创建连接
di = dictGetSafeIterator(server.cluster->nodes);
server.cluster->stats_pfail_nodes = 0;
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

//跳过自己和没有地址的节点
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;

if (node->flags & CLUSTER_NODE_PFAIL)
server.cluster->stats_pfail_nodes++;

//握手超时,删除节点
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
clusterDelNode(node);
continue;
}

//没有建立连接,创建
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;

fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->cport, NET_FIRST_BIND_ADDR);
if (fd == -1) {
//设置ping_sent,为了使超时检测能起作用
if (node->ping_sent == 0) node->ping_sent = mstime();
serverLog(LL_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->cport, server.neterr);
continue;
}

//node&link互相关联,设置fd读时间处理
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);

//发送握手信息
old_ping_sent = node->ping_sent;
clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
if (old_ping_sent) {
node->ping_sent = old_ping_sent;
}

//去掉需要发送meet信息标识
node->flags &= ~CLUSTER_NODE_MEET;

serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->cport);
}
}
dictReleaseIterator(di);

//每调用10次,随机选中5个节点,找其中最早的一个节点,发送ping
if (!(iteration % 10)) {
int j;

//随机5个选一个
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);

/* Don't ping nodes disconnected or with a ping currently active. */
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}

orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime();
mstime_t delay;

if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
continue;

//统计孤儿master数量
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node);

/* A master is orphaned if it is serving a non-zero number of
* slots, have no working slaves, but used to have at least one
* slave, or failed over a master that used to have slaves. */
if (okslaves == 0 && node->numslots > 0 &&
node->flags & CLUSTER_NODE_MIGRATE_TO)
{
orphaned_masters++;
}
if (okslaves > max_slaves) max_slaves = okslaves;
if (nodeIsSlave(myself) && myself->slaveof == node)
this_slaves = okslaves;
}

//心跳检测超时,释放连接,下一次会重新连接
if (node->link &&
now - node->link->ctime >
server.cluster_node_timeout &&
node->ping_sent && //已经发送ping
node->pong_received < node->ping_sent && //等待pong回复
now - node->ping_sent > server.cluster_node_timeout/2) //超时
{
freeClusterLink(node->link);
}

//因为上面发送ping是随机的一个,所以可能有一些节点,好久都没选上,这里对这情况处理,保证在超时时间一半的时候,每个节点都有ping发出
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}

//来自我们的一个slave手动修复,需要立马给其发送一个ping
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}

/* Check only if we have an active ping for this instance. */
if (node->ping_sent == 0) continue;

//检测延迟
delay = now - node->ping_sent;

if (delay > server.cluster_node_timeout) {
//在一段时间内,没有有效回复,标记主观下线
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= CLUSTER_NODE_PFAIL;
update_state = 1;
}
}
}
dictReleaseIterator(di);

//处理主从断掉的情况,重连
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
}

//手动修复超时检测
manualFailoverCheckTimeout();

if (nodeIsSlave(myself)) {
//手动修复推进
clusterHandleManualFailover();

if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
clusterHandleSlaveFailover();

//处理slave迁移的流程
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}

//更新状态
if (update_state || server.cluster->state == CLUSTER_FAIL)
clusterUpdateState();
}
clusterBeforeSleep

事件循环前要做的事情

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void clusterBeforeSleep(void) {

//自动故障修复推进
if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER)
clusterHandleSlaveFailover();

//更新状态
if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE)
clusterUpdateState();

//保存配置
if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) {
int fsync = server.cluster->todo_before_sleep &
CLUSTER_TODO_FSYNC_CONFIG;
clusterSaveConfigOrDie(fsync);
}

//重置标志
server.cluster->todo_before_sleep = 0;
}
clusterDoBeforeSleep

设置事件循环前要做的事

1
2
3
void clusterDoBeforeSleep(int flags) {
server.cluster->todo_before_sleep |= flags;
}

槽点

bitmapTestBit

判断位图中某个槽点是否命中

1
2
3
4
5
int bitmapTestBit(unsigned char *bitmap, int pos) {
off_t byte = pos/8;
int bit = pos&7;
return (bitmap[byte] & (1<<bit)) != 0;
}
bitmapSetBit

设置位图中的某个槽点

1
2
3
4
5
void bitmapSetBit(unsigned char *bitmap, int pos) {
off_t byte = pos/8;
int bit = pos&7;
bitmap[byte] |= 1<<bit;
}
bitmapClearBit

清空位图的某个槽点

1
2
3
4
5
void bitmapClearBit(unsigned char *bitmap, int pos) {
off_t byte = pos/8;
int bit = pos&7;
bitmap[byte] &= ~(1<<bit);

clusterMastersHaveSlaves

判断所有的master是否至少一个有slave

1
2
3
4
5
6
7
8
9
10
11
12
13
int clusterMastersHaveSlaves(void) {
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
dictEntry *de;
int slaves = 0;
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (nodeIsSlave(node)) continue;
slaves += node->numslaves;
}
dictReleaseIterator(di);
return slaves != 0;
}
clusterNodeSetSlotBit

节点位图增加某个槽位

1
2
3
4
5
6
7
8
9
10
11
int clusterNodeSetSlotBit(clusterNode *n, int slot) {
int old = bitmapTestBit(n->slots,slot);
bitmapSetBit(n->slots,slot);
if (!old) {
//有效槽位加1
n->numslots++;
if (n->numslots == 1 && clusterMastersHaveSlaves())
n->flags |= CLUSTER_NODE_MIGRATE_TO;
}
return old;
}
clusterNodeClearSlotBit

清空节点位图中的某个槽位

1
2
3
4
5
6
int clusterNodeClearSlotBit(clusterNode *n, int slot) {
int old = bitmapTestBit(n->slots,slot);
bitmapClearBit(n->slots,slot);
if (old) n->numslots--;
return old;
}
clusterNodeGetSlotBit

获取节点位图中是否包含某个槽位

1
2
3
int clusterNodeGetSlotBit(clusterNode *n, int slot) {
return bitmapTestBit(n->slots,slot);
}
clusterAddSlot

节点增加槽位

1
2
3
4
5
6
int clusterAddSlot(clusterNode *n, int slot) {
if (server.cluster->slots[slot]) return C_ERR;
clusterNodeSetSlotBit(n,slot);
server.cluster->slots[slot] = n;
return C_OK;
}
clusterDelSlot

节点删除槽位

1
2
3
4
5
6
7
8
int clusterDelSlot(int slot) {
clusterNode *n = server.cluster->slots[slot];

if (!n) return C_ERR;
serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
server.cluster->slots[slot] = NULL;
return C_OK;
}
clusterDelNodeSlots

删除节点的所有槽位

1
2
3
4
5
6
7
8
9
10
11
int clusterDelNodeSlots(clusterNode *node) {
int deleted = 0, j;

for (j = 0; j < CLUSTER_SLOTS; j++) {
if (clusterNodeGetSlotBit(node,j)) {
clusterDelSlot(j);
deleted++;
}
}
return deleted;
}
clusterCloseAllSlots

清空所有的迁入迁出槽点

1
2
3
4
5
6
void clusterCloseAllSlots(void) {
memset(server.cluster->migrating_slots_to,0,
sizeof(server.cluster->migrating_slots_to));
memset(server.cluster->importing_slots_from,0,
sizeof(server.cluster->importing_slots_from));
}

集群状态

clusterUpdateState
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#define CLUSTER_MAX_REJOIN_DELAY 5000
#define CLUSTER_MIN_REJOIN_DELAY 500
#define CLUSTER_WRITABLE_DELAY 2000

void clusterUpdateState(void) {
int j, new_state;
int reachable_masters = 0;
static mstime_t among_minority_time;
static mstime_t first_call_time = 0;

server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;


if (first_call_time == 0) first_call_time = mstime();

//对于master,在从fail->ok需要等待一段时间
if (nodeIsMaster(myself) &&
server.cluster->state == CLUSTER_FAIL &&
mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;

new_state = CLUSTER_OK;

//检查所有槽点是否都覆盖
if (server.cluster_require_full_coverage) {
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
{
new_state = CLUSTER_FAIL;
break;
}
}
}

//统计所有的有效master数量
{
dictIterator *di;
dictEntry *de;

server.cluster->size = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (nodeIsMaster(node) && node->numslots) {
server.cluster->size++;
if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
reachable_masters++;
}
}
dictReleaseIterator(di);
}

//根据连接情况,判断状态ok/fail
{
int needed_quorum = (server.cluster->size / 2) + 1;

if (reachable_masters < needed_quorum) {
new_state = CLUSTER_FAIL;
among_minority_time = mstime();
}
}

//记录状态变化
if (new_state != server.cluster->state) {
mstime_t rejoin_delay = server.cluster_node_timeout;

if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;

if (new_state == CLUSTER_OK &&
nodeIsMaster(myself) &&
mstime() - among_minority_time < rejoin_delay)
{
return;
}

/* Change the state and log the event. */
serverLog(LL_WARNING,"Cluster state changed: %s",
new_state == CLUSTER_OK ? "ok" : "fail");
server.cluster->state = new_state;
}
}
verifyClusterConfigWithData

验证集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
int verifyClusterConfigWithData(void) {
int j;
int update_config = 0;

//模块要求不要转发映射其他槽点
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return C_OK;

//只校验master
if (nodeIsSlave(myself)) return C_OK;

//确定只使用了db0
for (j = 1; j < server.dbnum; j++) {
if (dictSize(server.db[j].dict)) return C_ERR;
}

//查看槽点的情况
for (j = 0; j < CLUSTER_SLOTS; j++) {
//槽点没有任何key
if (!countKeysInSlot(j)) continue;

//我们负责槽点或者槽点在迁移到我们这
if (server.cluster->slots[j] == myself ||
server.cluster->importing_slots_from[j] != NULL) continue;


//到这里的话,说明槽点没有人管了
update_config++;

//槽点没有分配,我们直接管理
if (server.cluster->slots[j] == NULL) {
serverLog(LL_WARNING, "I have keys for unassigned slot %d. "
"Taking responsibility for it.",j);
clusterAddSlot(myself,j);
} else {
//槽点是别人负责的
serverLog(LL_WARNING, "I have keys for slot %d, but the slot is "
"assigned to another node. "
"Setting it to importing state.",j);
server.cluster->importing_slots_from[j] = server.cluster->slots[j];
}
}
if (update_config) clusterSaveConfigOrDie(1);
return C_OK;
}

salve节点操作

clusterSetMaster

设置节点n为我们的master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void clusterSetMaster(clusterNode *n) {
serverAssert(n != myself);
serverAssert(myself->numslots == 0);

//自己是主节点,切换标识,清空槽点
if (nodeIsMaster(myself)) {
myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
myself->flags |= CLUSTER_NODE_SLAVE;
clusterCloseAllSlots();
} else {
//从之前的master的slave列表中,把自己删掉
if (myself->slaveof)
clusterNodeRemoveSlave(myself->slaveof,myself);
}
//制定新的master
myself->slaveof = n;
clusterNodeAddSlave(n,myself);
replicationSetMaster(n->ip, n->port);
resetManualFailover();
}

节点到字符串的转换

redisNodeFlags

节点标志对应的描述说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct redisNodeFlags {
uint16_t flag;
char *name;
};

static struct redisNodeFlags redisNodeFlagsTable[] = {
{CLUSTER_NODE_MYSELF, "myself,"},
{CLUSTER_NODE_MASTER, "master,"},
{CLUSTER_NODE_SLAVE, "slave,"},
{CLUSTER_NODE_PFAIL, "fail?,"},
{CLUSTER_NODE_FAIL, "fail,"},
{CLUSTER_NODE_HANDSHAKE, "handshake,"},
{CLUSTER_NODE_NOADDR, "noaddr,"},
{CLUSTER_NODE_NOFAILOVER, "nofailover,"}
};
representClusterNodeFlags

把标志转换成对应的描述

1
2
3
4
5
6
7
8
9
10
11
12
13
sds representClusterNodeFlags(sds ci, uint16_t flags) {
size_t orig_len = sdslen(ci);
int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
for (i = 0; i < size; i++) {
struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
}

if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
//移除掉最后的一个逗号
sdsIncrLen(ci,-1);
return ci;
}
clusterGenNodeDescription

获取节点的描述情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
sds clusterGenNodeDescription(clusterNode *node) {
int j, start;
sds ci;

//名称 IP 端口
ci = sdscatprintf(sdsempty(),"%.40s %s:%d@%d ",
node->name,
node->ip,
node->port,
node->cport);

//标志位情况
ci = representClusterNodeFlags(ci, node->flags);

//主从情况
if (node->slaveof)
ci = sdscatprintf(ci," %.40s ",node->slaveof->name);
else
ci = sdscatlen(ci," - ",3);

//连接心跳情况
ci = sdscatprintf(ci,"%lld %lld %llu %s",
(long long) node->ping_sent,
(long long) node->pong_received,
(unsigned long long) node->configEpoch,
(node->link || node->flags & CLUSTER_NODE_MYSELF) ?
"connected" : "disconnected");

//槽点情况
start = -1;
for (j = 0; j < CLUSTER_SLOTS; j++) {
int bit;

if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
if (start == -1) start = j;
}
if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
if (bit && j == CLUSTER_SLOTS-1) j++;

if (start == j-1) {
ci = sdscatprintf(ci," %d",start);
} else {
ci = sdscatprintf(ci," %d-%d",start,j-1);
}
start = -1;
}
}

//迁入&迁出情况
if (node->flags & CLUSTER_NODE_MYSELF) {
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (server.cluster->migrating_slots_to[j]) {
ci = sdscatprintf(ci," [%d->-%.40s]",j,
server.cluster->migrating_slots_to[j]->name);
} else if (server.cluster->importing_slots_from[j]) {
ci = sdscatprintf(ci," [%d-<-%.40s]",j,
server.cluster->importing_slots_from[j]->name);
}
}
}
return ci;
}
clusterGenNodesDescription

排除掉包含某些标志节点,其他节点的描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sds clusterGenNodesDescription(int filter) {
sds ci = sdsempty(), ni;
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (node->flags & filter) continue;
ni = clusterGenNodeDescription(node);
ci = sdscatsds(ci,ni);
sdsfree(ni);
ci = sdscatlen(ci,"\n",1);
}
dictReleaseIterator(di);
return ci;
}

集群命令

clusterGetMessageTypeString

根据命令类型,获取命令文字描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const char *clusterGetMessageTypeString(int type) {
switch(type) {
case CLUSTERMSG_TYPE_PING: return "ping";
case CLUSTERMSG_TYPE_PONG: return "pong";
case CLUSTERMSG_TYPE_MEET: return "meet";
case CLUSTERMSG_TYPE_FAIL: return "fail";
case CLUSTERMSG_TYPE_PUBLISH: return "publish";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
case CLUSTERMSG_TYPE_UPDATE: return "update";
case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
case CLUSTERMSG_TYPE_MODULE: return "module";
}
return "unknown";
}
getSlotOrReply

校验槽点是否合法

1
2
3
4
5
6
7
8
9
10
11
int getSlotOrReply(client *c, robj *o) {
long long slot;

if (getLongLongFromObject(o,&slot) != C_OK ||
slot < 0 || slot >= CLUSTER_SLOTS)
{
addReplyError(c,"Invalid or out of range slot");
return -1;
}
return (int) slot;
}
clusterReplyMultiBulkSlots

输出所有槽点的情况,已节点为单位

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void clusterReplyMultiBulkSlots(client *c) {
/* 格式: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/

int num_masters = 0;
void *slot_replylen = addDeferredMultiBulkLength(c);

dictEntry *de;
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
int j = 0, start = -1;

//跳过非有效master
if (!nodeIsMaster(node) || node->numslots == 0) continue;

for (j = 0; j < CLUSTER_SLOTS; j++) {
int bit, i;

if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
if (start == -1) start = j;
}
if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
int nested_elements = 3; /* slots (2) + master addr (1). */
void *nested_replylen = addDeferredMultiBulkLength(c);

if (bit && j == CLUSTER_SLOTS-1) j++;

if (start == j-1) {
addReplyLongLong(c, start);
addReplyLongLong(c, start);
} else {
addReplyLongLong(c, start);
addReplyLongLong(c, j-1);
}
start = -1;

addReplyMultiBulkLen(c, 3);
addReplyBulkCString(c, node->ip);
addReplyLongLong(c, node->port);
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);

//输出salve情况
for (i = 0; i < node->numslaves; i++) {
if (nodeFailed(node->slaves[i])) continue;
addReplyMultiBulkLen(c, 3);
addReplyBulkCString(c, node->slaves[i]->ip);
addReplyLongLong(c, node->slaves[i]->port);
addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
nested_elements++;
}
setDeferredMultiBulkLength(c, nested_replylen, nested_elements);
num_masters++;
}
}
}
dictReleaseIterator(di);
setDeferredMultiBulkLength(c, slot_replylen, num_masters);
}
clusterCommand

响应cluster命令的入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
void clusterCommand(client *c) {
//是否开启集群校验
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}

//help命令
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = {
"ADDSLOTS <slot> [slot ...] -- Assign slots to current node.",
"BUMPEPOCH -- Advance the cluster config epoch.",
"COUNT-failure-reports <node-id> -- Return number of failure reports for <node-id>.",
"COUNTKEYSINSLOT <slot> - Return the number of keys in <slot>.",
"DELSLOTS <slot> [slot ...] -- Delete slots information from current node.",
"FAILOVER [force|takeover] -- Promote current replica node to being a master.",
"FORGET <node-id> -- Remove a node from the cluster.",
"GETKEYSINSLOT <slot> <count> -- Return key names stored by current node in a slot.",
"FLUSHSLOTS -- Delete current node own slots information.",
"INFO - Return onformation about the cluster.",
"KEYSLOT <key> -- Return the hash slot for <key>.",
"MEET <ip> <port> [bus-port] -- Connect nodes into a working cluster.",
"MYID -- Return the node id.",
"NODES -- Return cluster configuration seen by node. Output format:",
" <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ... <slot>",
"REPLICATE <node-id> -- Configure current node as replica to <node-id>.",
"RESET [hard|soft] -- Reset current node (default: soft).",
"SET-config-epoch <epoch> - Set config epoch of current node.",
"SETSLOT <slot> (importing|migrating|stable|node <node-id>) -- Set slot state.",
"REPLICAS <node-id> -- Return <node-id> replicas.",
"SLOTS -- Return information about slots range mappings. Each range is made of:",
" start, end, master and replicas IP addresses, ports and ids",
NULL
};
addReplyHelp(c, help);
} else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
//MEET消息,创建集群 格式:CLUSTER MEET <ip> <port> [cport]
long long port, cport;

if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
(char*)c->argv[3]->ptr);
return;
}

if (c->argc == 5) {
if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
(char*)c->argv[4]->ptr);
return;
}
} else {
cport = port + CLUSTER_PORT_INCR;
}

//创建节点,下一次定时任务会简历连接
if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
errno == EINVAL)
{
addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
(char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
} else {
addReply(c,shared.ok);
}
} else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
//查询节点情况
robj *o;
sds ci = clusterGenNodesDescription(0);

o = createObject(OBJ_STRING,ci);
addReplyBulk(c,o);
decrRefCount(o);
} else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
//获取运行id
addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN);
} else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
//获取槽点情况
clusterReplyMultiBulkSlots(c);
} else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
//情况自己管理的所有槽点
if (dictSize(server.db[0].dict) != 0) {
addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
return;
}
clusterDelNodeSlots(myself);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
} else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
!strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
{
//增加&删除槽点
/* CLUSTER ADDSLOTS <slot> [slot] ... */
/* CLUSTER DELSLOTS <slot> [slot] ... */
int j, slot;
unsigned char *slots = zmalloc(CLUSTER_SLOTS);
int del = !strcasecmp(c->argv[1]->ptr,"delslots");

memset(slots,0,CLUSTER_SLOTS);
//检查槽点是否满足增加活删除条件
for (j = 2; j < c->argc; j++) {
if ((slot = getSlotOrReply(c,c->argv[j])) == -1) {
zfree(slots);
return;
}
if (del && server.cluster->slots[slot] == NULL) {
addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
zfree(slots);
return;
} else if (!del && server.cluster->slots[slot]) {
addReplyErrorFormat(c,"Slot %d is already busy", slot);
zfree(slots);
return;
}
if (slots[slot]++ == 1) {
addReplyErrorFormat(c,"Slot %d specified multiple times",
(int)slot);
zfree(slots);
return;
}
}
//具体操作
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (slots[j]) {
int retval;

if (server.cluster->importing_slots_from[j])
server.cluster->importing_slots_from[j] = NULL;

retval = del ? clusterDelSlot(j) :
clusterAddSlot(myself,j);
serverAssertWithInfo(c,NULL,retval == C_OK);
}
}
zfree(slots);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
//设置槽点情况,包括迁出、迁入、稳定
/* SETSLOT 10 MIGRATING <node ID> */
/* SETSLOT 10 IMPORTING <node ID> */
/* SETSLOT 10 STABLE */
/* SETSLOT 10 NODE <node ID> */
int slot;
clusterNode *n;

if (nodeIsSlave(myself)) {
addReplyError(c,"Please use SETSLOT only with masters.");
return;
}

if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return;

//迁出
if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
if (server.cluster->slots[slot] != myself) {
addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
return;
}
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
server.cluster->migrating_slots_to[slot] = n;
} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
//迁入
if (server.cluster->slots[slot] == myself) {
addReplyErrorFormat(c,
"I'm already the owner of hash slot %u",slot);
return;
}
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
server.cluster->importing_slots_from[slot] = n;
} else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
//迁移结束
/* CLUSTER SETSLOT <SLOT> STABLE */
server.cluster->importing_slots_from[slot] = NULL;
server.cluster->migrating_slots_to[slot] = NULL;
} else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
//分配槽点
/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[4]->ptr);

if (!n) {
addReplyErrorFormat(c,"Unknown node %s",
(char*)c->argv[4]->ptr);
return;
}

if (server.cluster->slots[slot] == myself && n != myself) {
if (countKeysInSlot(slot) != 0) {
addReplyErrorFormat(c,
"Can't assign hashslot %d to a different node "
"while I still hold keys for this hash slot.", slot);
return;
}
}

if (countKeysInSlot(slot) == 0 &&
server.cluster->migrating_slots_to[slot])
server.cluster->migrating_slots_to[slot] = NULL;

if (n == myself &&
server.cluster->importing_slots_from[slot])
{
if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
serverLog(LL_WARNING,
"configEpoch updated after importing slot %d", slot);
}
server.cluster->importing_slots_from[slot] = NULL;
}
clusterDelSlot(slot);
clusterAddSlot(n,slot);
} else {
addReplyError(c,
"Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
return;
}
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
//更新纪元
int retval = clusterBumpConfigEpochWithoutConsensus();
sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
(retval == C_OK) ? "BUMPED" : "STILL",
(unsigned long long) myself->configEpoch);
addReplySds(c,reply);
} else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
//获取当前节点情况
char *statestr[] = {"ok","fail","needhelp"};
int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
uint64_t myepoch;
int j;

for (j = 0; j < CLUSTER_SLOTS; j++) {
clusterNode *n = server.cluster->slots[j];

if (n == NULL) continue;
slots_assigned++;
if (nodeFailed(n)) {
slots_fail++;
} else if (nodeTimedOut(n)) {
slots_pfail++;
} else {
slots_ok++;
}
}

myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
myself->slaveof->configEpoch : myself->configEpoch;

sds info = sdscatprintf(sdsempty(),
"cluster_state:%s\r\n"
"cluster_slots_assigned:%d\r\n"
"cluster_slots_ok:%d\r\n"
"cluster_slots_pfail:%d\r\n"
"cluster_slots_fail:%d\r\n"
"cluster_known_nodes:%lu\r\n"
"cluster_size:%d\r\n"
"cluster_current_epoch:%llu\r\n"
"cluster_my_epoch:%llu\r\n"
, statestr[server.cluster->state],
slots_assigned,
slots_ok,
slots_pfail,
slots_fail,
dictSize(server.cluster->nodes),
server.cluster->size,
(unsigned long long) server.cluster->currentEpoch,
(unsigned long long) myepoch
);

//统计信息
long long tot_msg_sent = 0;
long long tot_msg_received = 0;

for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
info = sdscatprintf(info,
"cluster_stats_messages_%s_sent:%lld\r\n",
clusterGetMessageTypeString(i),
server.cluster->stats_bus_messages_sent[i]);
}
info = sdscatprintf(info,
"cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);

for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
if (server.cluster->stats_bus_messages_received[i] == 0) continue;
tot_msg_received += server.cluster->stats_bus_messages_received[i];
info = sdscatprintf(info,
"cluster_stats_messages_%s_received:%lld\r\n",
clusterGetMessageTypeString(i),
server.cluster->stats_bus_messages_received[i]);
}
info = sdscatprintf(info,
"cluster_stats_messages_received:%lld\r\n", tot_msg_received);

addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
(unsigned long)sdslen(info)));
addReplySds(c,info);
addReply(c,shared.crlf);
} else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
//保存配置到磁盘
int retval = clusterSaveConfig(1);

if (retval == 0)
addReply(c,shared.ok);
else
addReplyErrorFormat(c,"error saving the cluster node config: %s",
strerror(errno));
} else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
//获取key对应的槽点
/* CLUSTER KEYSLOT <key> */
sds key = c->argv[2]->ptr;

addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
} else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
//获取槽点中key的数量
/* CLUSTER COUNTKEYSINSLOT <slot> */
long long slot;

if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
return;
if (slot < 0 || slot >= CLUSTER_SLOTS) {
addReplyError(c,"Invalid slot");
return;
}
addReplyLongLong(c,countKeysInSlot(slot));
} else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
//获取槽点中key的明细
/* CLUSTER GETKEYSINSLOT <slot> <count> */
long long maxkeys, slot;
unsigned int numkeys, j;
robj **keys;

if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
return;
if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
!= C_OK)
return;
if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
addReplyError(c,"Invalid slot or number of keys");
return;
}

/* Avoid allocating more than needed in case of large COUNT argument
* and smaller actual number of keys. */
unsigned int keys_in_slot = countKeysInSlot(slot);
if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;

keys = zmalloc(sizeof(robj*)*maxkeys);
numkeys = getKeysInSlot(slot, keys, maxkeys);
addReplyMultiBulkLen(c,numkeys);
for (j = 0; j < numkeys; j++) {
addReplyBulk(c,keys[j]);
decrRefCount(keys[j]);
}
zfree(keys);
} else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
//删除某个节点
/* CLUSTER FORGET <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr);

if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
} else if (n == myself) {
addReplyError(c,"I tried hard but I can't forget myself...");
return;
} else if (nodeIsSlave(myself) && myself->slaveof == n) {
addReplyError(c,"Can't forget my master!");
return;
}
clusterBlacklistAddNode(n);
clusterDelNode(n);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
//设置master节点信息
/* CLUSTER REPLICATE <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr);

//检验节点有效
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
}

//自己校验
if (n == myself) {
addReplyError(c,"Can't replicate myself");
return;
}

//目标master校验
if (nodeIsSlave(n)) {
addReplyError(c,"I can only replicate a master, not a replica.");
return;
}

//自己是有效master,不能扔下负责的槽点
if (nodeIsMaster(myself) &&
(myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
addReplyError(c,
"To set a master the node must be empty and "
"without assigned slots.");
return;
}

clusterSetMaster(n);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
} else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
!strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
//获取master下所有的slave描述
/* CLUSTER SLAVES <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
int j;

/* Lookup the specified node in our table. */
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
}

if (nodeIsSlave(n)) {
addReplyError(c,"The specified node is not a master");
return;
}

addReplyMultiBulkLen(c,n->numslaves);
for (j = 0; j < n->numslaves; j++) {
sds ni = clusterGenNodeDescription(n->slaves[j]);
addReplyBulkCString(c,ni);
sdsfree(ni);
}
} else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
c->argc == 3)
{
//获取某个节点的掉线汇报数量
/* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr);

if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
} else {
addReplyLongLong(c,clusterNodeFailureReportsCount(n));
}
} else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
(c->argc == 2 || c->argc == 3))
{
//手动开始故障修复
/* CLUSTER FAILOVER [FORCE|TAKEOVER] */
int force = 0, takeover = 0;

if (c->argc == 3) {
if (!strcasecmp(c->argv[2]->ptr,"force")) {
force = 1;
} else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
takeover = 1;
force = 1; /* Takeover also implies force. */
} else {
addReply(c,shared.syntaxerr);
return;
}
}

/* Check preconditions. */
if (nodeIsMaster(myself)) {
addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
return;
} else if (myself->slaveof == NULL) {
addReplyError(c,"I'm a replica but my master is unknown to me");
return;
} else if (!force &&
(nodeFailed(myself->slaveof) ||
myself->slaveof->link == NULL))
{
addReplyError(c,"Master is down or failed, "
"please use CLUSTER FAILOVER FORCE");
return;
}
resetManualFailover();
server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;

if (takeover) {
serverLog(LL_WARNING,"Taking over the master (user request).");
clusterBumpConfigEpochWithoutConsensus();
clusterFailoverReplaceYourMaster();
} else if (force) {
//是否强制
serverLog(LL_WARNING,"Forced failover user request accepted.");
server.cluster->mf_can_start = 1;
} else {
serverLog(LL_WARNING,"Manual failover user request accepted.");
clusterSendMFStart(myself->slaveof);
}
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
{
//设置配置纪元
// CLUSTER SET-CONFIG-EPOCH <epoch>
long long epoch;

if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
return;

if (epoch < 0) {
addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
} else if (dictSize(server.cluster->nodes) > 1) {
addReplyError(c,"The user can assign a config epoch only when the "
"node does not know any other node.");
} else if (myself->configEpoch != 0) {
addReplyError(c,"Node config epoch is already non-zero");
} else {
myself->configEpoch = epoch;
serverLog(LL_WARNING,
"configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
(unsigned long long) myself->configEpoch);

if (server.cluster->currentEpoch < (uint64_t)epoch)
server.cluster->currentEpoch = epoch;
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
}
} else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
(c->argc == 2 || c->argc == 3))
{
//节点重置
/* CLUSTER RESET [SOFT|HARD] */
int hard = 0;

if (c->argc == 3) {
if (!strcasecmp(c->argv[2]->ptr,"hard")) {
hard = 1;
} else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
hard = 0;
} else {
addReply(c,shared.syntaxerr);
return;
}
}

//有数据的话,不能重置
if (nodeIsMaster(myself) && dictSize(c->db->dict) != 0) {
addReplyError(c,"CLUSTER RESET can't be called with "
"master nodes containing keys");
return;
}
clusterReset(hard);
addReply(c,shared.ok);
} else {
addReplySubcommandSyntaxError(c);
return;
}
}
createDumpPayload

序列化redis对象,并存入rio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void createDumpPayload(rio *payload, robj *o) {
unsigned char buf[2];
uint64_t crc;

/* Serialize the object in a RDB-like format. It consist of an object type
* byte followed by the serialized object. This is understood by RESTORE. */
rioInitWithBuffer(payload,sdsempty());
serverAssert(rdbSaveObjectType(payload,o));
serverAssert(rdbSaveObject(payload,o));

/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
* ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
* ----------------+---------------------+---------------+
* RDB version and CRC are both in little endian.
*/

/* RDB version */
buf[0] = RDB_VERSION & 0xff;
buf[1] = (RDB_VERSION >> 8) & 0xff;
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);

/* CRC64 */
crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
sdslen(payload->io.buffer.ptr));
memrev64ifbe(&crc);
payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
}
restoreCommand

恢复key value,使用序列化的数据,格式RESTORE key ttl serialized-value [REPLACE]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
void restoreCommand(client *c) {
long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
rio payload;
int j, type, replace = 0, absttl = 0;
robj *obj;

/* Parse additional options */
for (j = 4; j < c->argc; j++) {
int additional = c->argc-j-1;
if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
absttl = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
lfu_freq == -1)
{
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
!= C_OK) return;
if (lru_idle < 0) {
addReplyError(c,"Invalid IDLETIME value, must be >= 0");
return;
}
lru_clock = LRU_CLOCK();
j++; /* Consume additional arg. */
} else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
lru_idle == -1)
{
if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
!= C_OK) return;
if (lfu_freq < 0 || lfu_freq > 255) {
addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
return;
}
j++; /* Consume additional arg. */
} else {
addReply(c,shared.syntaxerr);
return;
}
}

/* Make sure this key does not already exist here... */
if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
addReply(c,shared.busykeyerr);
return;
}

/* Check if the TTL value makes sense */
if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
return;
} else if (ttl < 0) {
addReplyError(c,"Invalid TTL value, must be >= 0");
return;
}

/* Verify RDB version and data checksum. */
if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
{
addReplyError(c,"DUMP payload version or checksum are wrong");
return;
}

rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}

/* Remove the old key if needed. */
if (replace) dbDelete(c->db,c->argv[1]);

/* Create the key and set the TTL if any */
dbAdd(c->db,c->argv[1],obj);
if (ttl) {
if (!absttl) ttl+=mstime();
setExpire(c,c->db,c->argv[1],ttl);
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
signalModifiedKey(c->db,c->argv[1]);
addReply(c,shared.ok);
server.dirty++;
}

key迁移操作

redis提供了原子性的key迁移命令,维护了一个跟目标节点连接fd缓存区server.migrate_cached_sockets,防止在短时间内多次迁移需要建立多个连接

migrateGetSocket

获取跟host的连接,优先从缓存中取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#define MIGRATE_SOCKET_CACHE_ITEMS 64 //缓冲区到校
#define MIGRATE_SOCKET_CACHE_TTL 10 //缓存有效时间

typedef struct migrateCachedSocket {
int fd;
long last_dbid;
time_t last_use_time;
} migrateCachedSocket;

migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
int fd;
sds name = sdsempty();
migrateCachedSocket *cs;

//查看缓存
name = sdscatlen(name,host->ptr,sdslen(host->ptr));
name = sdscatlen(name,":",1);
name = sdscatlen(name,port->ptr,sdslen(port->ptr));
cs = dictFetchValue(server.migrate_cached_sockets,name);
if (cs) {
sdsfree(name);
cs->last_use_time = server.unixtime;
return cs;
}

//缓存区满了,随机删除一个
if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
cs = dictGetVal(de);
close(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}

//创建连接
fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
atoi(c->argv[2]->ptr));
if (fd == -1) {
sdsfree(name);
addReplyErrorFormat(c,"Can't connect to target node: %s",
server.neterr);
return NULL;
}
anetEnableTcpNoDelay(server.neterr,fd);

//检查超时
if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
sdsfree(name);
addReplySds(c,
sdsnew("-IOERR error or timeout connecting to the client\r\n"));
close(fd);
return NULL;
}

//加入缓存
cs = zmalloc(sizeof(*cs));
cs->fd = fd;
cs->last_dbid = -1;
cs->last_use_time = server.unixtime;
dictAdd(server.migrate_cached_sockets,name,cs);
return cs;
}
migrateCloseSocket

关闭fd连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void migrateCloseSocket(robj *host, robj *port) {
sds name = sdsempty();
migrateCachedSocket *cs;

name = sdscatlen(name,host->ptr,sdslen(host->ptr));
name = sdscatlen(name,":",1);
name = sdscatlen(name,port->ptr,sdslen(port->ptr));
cs = dictFetchValue(server.migrate_cached_sockets,name);
if (!cs) {
sdsfree(name);
return;
}

close(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,name);
sdsfree(name);
}
migrateCloseTimedoutSockets

关闭一段时间没有使用的fd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void migrateCloseTimedoutSockets(void) {
dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
dictEntry *de;

while((de = dictNext(di)) != NULL) {
migrateCachedSocket *cs = dictGetVal(de);

if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
close(cs->fd);
zfree(cs);
dictDelete(server.migrate_cached_sockets,dictGetKey(de));
}
}
dictReleaseIterator(di);
}
migrateCommand

原子性把一个key迁移到另一个节点,有两种格式

  1. 单个 MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password]
  2. 批量 MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password] KEYS key1 key2 keyN
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
void migrateCommand(client *c) {
migrateCachedSocket *cs;
int copy = 0, replace = 0, j;
char *password = NULL;
long timeout;
long dbid;
robj **ov = NULL;
robj **kv = NULL;
robj **newargv = NULL;
rio cmd, payload;
int may_retry = 1;
int write_error = 0;
int argv_rewritten = 0;

int first_key = 3; //第一个迁移key位置
int num_keys = 1; //迁移key的数量

//解析参数,确定key,copy/replace
for (j = 6; j < c->argc; j++) {
int moreargs = j < c->argc-1;
if (!strcasecmp(c->argv[j]->ptr,"copy")) {
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
if (!moreargs) {
addReply(c,shared.syntaxerr);
return;
}
j++;
password = c->argv[j]->ptr;
} else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
if (sdslen(c->argv[3]->ptr) != 0) {
addReplyError(c,
"When using MIGRATE KEYS option, the key argument"
" must be set to the empty string");
return;
}
first_key = j+1;
num_keys = c->argc - j - 1;
break;
} else {
addReply(c,shared.syntaxerr);
return;
}
}

//dbid和超时时间格式检查
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
{
return;
}
if (timeout <= 0) timeout = 1000;

ov = zrealloc(ov,sizeof(robj*)*num_keys); //迁移的object value
kv = zrealloc(kv,sizeof(robj*)*num_keys); //迁移的object key
int oi = 0;

//筛选有效key
for (j = 0; j < num_keys; j++) {
if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
kv[oi] = c->argv[first_key+j];
oi++;
}
}
num_keys = oi;

//有效key为0
if (num_keys == 0) {
zfree(ov); zfree(kv);
addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}

try_again:
write_error = 0;

//获取连接
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) {
zfree(ov); zfree(kv);
return;
}

rioInitWithBuffer(&cmd,sdsempty());

//密码校验
if (password) {
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
sdslen(password)));
}

//发送select db操作
int select = cs->last_dbid != dbid;
if (select) {
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
}

int non_expired = 0;

//
for (j = 0; j < num_keys; j++) {
long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]);

if (expireat != -1) {
ttl = expireat-mstime();
//剔除掉已经过期的key
if (ttl < 0) {
continue;
}
if (ttl < 1) ttl = 1;
}

kv[non_expired++] = kv[j];

serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));

//写入命令
if (server.cluster_enabled)
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));

//设置key
serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(kv[j]->ptr)));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));

//设置序列化value
createDumpPayload(&payload,ov[j]);
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);

//设置替换命令
if (replace)
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
}

num_keys = non_expired;

errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
int nwritten = 0;

//分批发送,每次64K
while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite);
nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) {
write_error = 1;
goto socket_err;
}
pos += nwritten;
}
}

char buf0[1024]; /* Auth reply. */
char buf1[1024]; /* Select reply. */
char buf2[1024]; /* Restore reply. */

//密码校验回复
if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
goto socket_err;

//select回复
if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
goto socket_err;

//restore回复
int error_from_target = 0;
int socket_error = 0;
int del_idx = 1;

if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));

for (j = 0; j < num_keys; j++) {
if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
socket_error = 1;
break;
}
if ((password && buf0[0] == '-') ||
(select && buf1[0] == '-') ||
buf2[0] == '-')
{
//报错处理
if (!error_from_target) {
cs->last_dbid = -1;
char *errbuf;
if (password && buf0[0] == '-') errbuf = buf0;
else if (select && buf1[0] == '-') errbuf = buf1;
else errbuf = buf2;

error_from_target = 1;
addReplyErrorFormat(c,"Target instance replied with error: %s",
errbuf+1);
}
} else {
//成功处理
if (!copy) {
//删除本地key
dbDelete(c->db,kv[j]);
signalModifiedKey(c->db,kv[j]);
server.dirty++;

newargv[del_idx++] = kv[j];
incrRefCount(kv[j]);
}
}
}

if (!error_from_target && socket_error && j == 0 && may_retry &&
errno != ETIMEDOUT)
{
goto socket_err;
}

//网络问题,关闭连接
if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);

if (!copy) {
//为了主从和aof,替换成del命令
if (del_idx > 1) {
newargv[0] = createStringObject("DEL",3);
replaceClientCommandVector(c,del_idx,newargv);
argv_rewritten = 1;
} else {
zfree(newargv);
}
newargv = NULL; /* Make it safe to call zfree() on it in the future. */
}

if (!error_from_target && socket_error) {
may_retry = 0;
goto socket_err;
}

//更新最近使用db
if (!error_from_target) {
cs->last_dbid = dbid;
addReply(c,shared.ok);
} else {

}

sdsfree(cmd.io.buffer.ptr);
zfree(ov); zfree(kv); zfree(newargv);
return;

//连接错误
socket_err:

sdsfree(cmd.io.buffer.ptr);

if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
zfree(newargv);
newargv = NULL;

if (errno != ETIMEDOUT && may_retry) {
may_retry = 0;
goto try_again;
}

zfree(ov); zfree(kv);
addReplySds(c,
sdscatprintf(sdsempty(),
"-IOERR error or timeout %s to target instance\r\n",
write_error ? "writing" : "reading"));
return;
}
askingCommand

响应ask命令

1
2
3
4
5
6
7
8
void askingCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
c->flags |= CLIENT_ASKING;
addReply(c,shared.ok);
}
readonlyCommand

响应客户端进入只读模式

1
2
3
4
5
6
7
8
void readonlyCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
c->flags |= CLIENT_READONLY;
addReply(c,shared.ok);
}
readwriteCommand

响应客户端进去写模式

1
2
3
4
void readwriteCommand(client *c) {
c->flags &= ~CLIENT_READONLY;
addReply(c,shared.ok);
}
getNodeByQuery

查询能处理该命令的节点信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;

//不需要查询
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return myself;

if (error_code) *error_code = CLUSTER_REDIR_NONE;

if (cmd->proc == execCommand) {
if (!(c->flags & CLIENT_MULTI)) return myself;
ms = &c->mstate;
} else {
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
}

//检查命令所有key是否都在同一个槽点里
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, *keyindex, numkeys, j;

mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;

keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j]];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));

if (firstkey == NULL) {
firstkey = thiskey;
slot = thisslot;
n = server.cluster->slots[slot];

//槽点未分配
if (n == NULL) {
getKeysFreeResult(keyindex);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}

//检查处于迁入,迁出中的key
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1;
} else if (server.cluster->importing_slots_from[slot] != NULL) {
importing_slot = 1;
}
} else {
if (!equalStringObjects(firstkey,thiskey)) {
//key分布在不同槽点
if (slot != thisslot) {
getKeysFreeResult(keyindex);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
} else {
//多key标识
multiple_keys = 1;
}
}
}

//key已经不在我们这了
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&server.db[0],thiskey) == NULL)
{
missing_keys++;
}
}
getKeysFreeResult(keyindex);
}

//没有任何key, 自己就能处理
if (n == NULL) return myself;

//槽点状态
if (server.cluster->state != CLUSTER_OK) {
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
}

//返回响应的槽点
if (hashslot) *hashslot = slot;

if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
return myself;

//没有全部key的数量
if (migrating_slot && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_ASK;
return server.cluster->migrating_slots_to[slot];
}

if (importing_slot &&
(c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
{
if (multiple_keys && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
return myself;
}
}

//只读模式
if (c->flags & CLIENT_READONLY &&
(cmd->flags & CMD_READONLY || cmd->proc == evalCommand ||
cmd->proc == evalShaCommand) &&
nodeIsSlave(myself) &&
myself->slaveof == n)
{
return myself;
}

//返回MOVE命令
if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
return n;
}
clusterRedirectBlockedClientIfNeeded

对于阻塞的操作,检查存在问题,返回对应的错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int clusterRedirectBlockedClientIfNeeded(client *c) {
if (c->flags & CLIENT_BLOCKED &&
(c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM))
{
dictEntry *de;
dictIterator *di;

//集群状态fail
if (server.cluster->state == CLUSTER_FAIL) {
clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
return 1;
}

//同一个槽点检查
di = dictGetIterator(c->bpop.keys);
if ((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
clusterNode *node = server.cluster->slots[slot];

//该槽点不是我们负责
if (node != myself &&
server.cluster->importing_slots_from[slot] == NULL)
{
//槽点分配
if (node == NULL) {
clusterRedirectClient(c,NULL,0,
CLUSTER_REDIR_DOWN_UNBOUND);
} else {
//需要到别的节点查询
clusterRedirectClient(c,node,slot,
CLUSTER_REDIR_MOVED);
}
dictReleaseIterator(di);
return 1;
}
}
dictReleaseIterator(di);
}
return 0;
}