redis-哨兵

为了提高redis服务的高可用性,redis提供了一个哨兵模式。在该模式下,哨兵会做以下三件事:

  1. 监控master节点和slave节点的运行情况;
  2. 当某个节点出现异常时,通知告警当前情况;
  3. 当master节点发生故障时,哨兵会从slave节点中挑选一个节点成为新的master,并通知其他的slave节点将服务器配置改为新的master,当客户端尝试连接原来的master时,会向客户端返回新的master地址。当运行哨兵模式的时候,都是多个哨兵一起运行,否则哨兵只能提供监控&告警的功能,故障修复功能则永远不会触发。

启动

哨兵是一种运行在redis-server下的特殊模式,可以通过redis-server /path/to/sentinel.conf --sentinel来启动,也可以直接通过redis-sentinel /path/to/sentinel.conf来启动。无论哪种形式启动,都必须指定哨兵的配置文件。

运行原理

获取拓扑结构

哨兵启动的时候,会指定需要监控的master节点信息,通过向master节点发送info命令,获取master下的所有slave节点信息。同时订阅master节点下的__sentinel__:hello频道来获取监控该master的其他哨兵。因为哨兵在监控一个master时,都会定时的向该频道发送自己的信息。如下图所示,左边是发现slave,右边是发现其他sentinel

监控

哨兵向自己发现的拓扑结构中的所有节点,定时发送INFOPING命令检测节点的存活情况。对于PING命令,PONG正常响应、LOADING启动中、MASTERDOWN节点还没有连接上master都算是有效回复。对于INFO命令,除了获取拓扑结构外,也会根据返回的角色,来判断一个节点角色的变化。如果一个节点原本是slave,后面被人为的改成master,且原master是状态良好的话,哨兵会向该节点发送slaveof ip port命令,恢复原有的拓扑结构。正常运行情况如下所示:

当一个节点最近一次的响应时间超过设置后,则该节点被当前哨兵标记成主观下线。
当一个节点被多个哨兵标记成主观下线后,改节点会被当前哨兵标记成客观下线,哨兵通过彼此间的交流得知他人的结果。主观下线只适用于master节点,哨兵节点和slave节点不存在该状态,如下图所示:

报警

当哨兵发现各种异常后,会触发配置的脚本且向频道发布消息,部分频道如下:

  • +slave :哨兵发现的新的slave节点
  • +sentinel : 哨兵发现新的哨兵节点
  • +sdown :一个节点被标记成主观下线

故障修复

当一个master节点被哨兵标记成客观下线后,哨兵之间会采用Raft算法选举出一个Leader。该Leader会进行以下操作:

  • 从该master节点的所有slave节点中选出一个,发送slaveof no one命令,使其成为新的master节点
  • 通知其他哨兵,更新对应的配置
  • 向其他的slave发送slaveof ip port命令,更换master
  • 故障修复完成,恢复正常

选举Leader的过程如下:

  • 哨兵发现节点变成客观下线,把自己的纪元+1,然后准备选举自己成为这个纪元内的leader
  • 向其他哨兵同步信息的时候,带上自己的运行id,期望他人投自己一票
  • 其他哨兵接收到选举信息后,如果发现消息里的纪元比自己的要大,那么就以消息里的为准,来更新自己的纪元,并为该消息里的哨兵投上自己的一票,然后返回自己的投票结果
  • 哨兵每隔一段时间,都会基于询问的结果,看看那个哨兵获得的投票最多,如果最多的票数大于等于两个数(1.总的哨兵节点/2+1 2.设置的最低票数),则会当选Leader

操作如下图所示:

配置文件

示例:

1
2
3
4
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 60000
sentinel failover-timeout mymaster 180000
sentinel parallel-syncs mymaster 1

所有和哨兵相关的配置都是以sentinel开始,第一行是最主要的配置,表示监控一个名为mymaster的节点,后面是节点的ip为127.0.0.1、端口6397以及该节点发生故障时,需要至少2个哨兵达成一致才可以。其余的配置都是针对mymaster的节点进一步参数配置,详细如下:

  • down-after-milliseconds表示哨兵认为节点下线所需要的毫秒
  • failover-timeout表示故障修复流程的超时时间
  • parallel-syncs表示故障修复期间,最多有多少个slave节点可以同步到新的master

源码分析

哨兵模式的运行主要入口是定时任务sentinelTimer,上一级是serverCron,运行频率和serverCron一样。

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
//ip&端口
typedef struct sentinelAddr {
char *ip;
int port;
} sentinelAddr;

#define SRI_MASTER (1<<0) //标识master节点
#define SRI_SLAVE (1<<1) //标识slave节点
#define SRI_SENTINEL (1<<2) //标识哨兵节点
#define SRI_S_DOWN (1<<3) // 标识主观下线
#define SRI_O_DOWN (1<<4) // 标识客观下线
#define SRI_MASTER_DOWN (1<<5) //标识master下线
#define SRI_FAILOVER_IN_PROGRESS (1<<6) //标识故障修复中
#define SRI_PROMOTED (1<<7) //标识slave被选举成新的master
#define SRI_RECONF_SENT (1<<8) //slave被发送slaveof命令
#define SRI_RECONF_INPROG (1<<9) //slave更换master中
#define SRI_RECONF_DONE (1<<10) //slave更换master结束
#define SRI_FORCE_FAILOVER (1<<11) //强制故障修复,在master良好的情况下
#define SRI_SCRIPT_KILL_SENT (1<<12) //发送kill脚本

//各种过期时间,毫秒单位
#define SENTINEL_INFO_PERIOD 10000 //发送info的间隔时间
#define SENTINEL_PING_PERIOD 1000 //发送ping的间隔时间
#define SENTINEL_ASK_PERIOD 1000 //询问其他哨兵master是否下线的时间间隔
#define SENTINEL_PUBLISH_PERIOD 2000 //向频道发送自身信息的时间间隔
#define SENTINEL_DEFAULT_DOWN_AFTER 30000 //默认的判断下线的时间
#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello" //哨兵之间交流的频道
#define SENTINEL_TILT_TRIGGER 2000 //触发tilt的间隔
#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30) //tilt判断的时间间隔
#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100 //slave节点的默认优先级
#define SENTINEL_SLAVE_RECONF_TIMEOUT 10000 //slave节点更新的默认超时时间
#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1 //故障修复时,有多少个slave节点同时进行同步的默认值
#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000 //建立连接的时间间隔
#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*3*1000) //故障恢复的默认超时时间
#define SENTINEL_MAX_PENDING_COMMANDS 100 //最多的等待发送命令数量,PING、INFO、PUBLISH
#define SENTINEL_ELECTION_TIMEOUT 10000 //选举的超时时间
#define SENTINEL_MAX_DESYNC 1000 //故障修复开始的最大延迟时间
#define SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG 1 //默认的脚本重写

//故障修复的状态
#define SENTINEL_FAILOVER_STATE_NONE 0 //无故障
#define SENTINEL_FAILOVER_STATE_WAIT_START 1 //等待开始
#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 //选出一个slave成为新的master
#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 //选出的slave成为master
#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 //等待选出的slave改变自身的角色slave->master
#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 //剩余的slave更新master
#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 //更新哨兵的配置

#define SENTINEL_MASTER_LINK_STATUS_UP 0 //master正常在线
#define SENTINEL_MASTER_LINK_STATUS_DOWN 1 //master下线

#define SENTINEL_NO_FLAGS 0 //无任何标识
#define SENTINEL_GENERATE_EVENT (1<<16) //附带制造event
#define SENTINEL_LEADER (1<<17) //哨兵当选leader
#define SENTINEL_OBSERVER (1<<18) //哨兵是观察者

#define SENTINEL_SCRIPT_NONE 0
#define SENTINEL_SCRIPT_RUNNING 1 //脚本运行中
#define SENTINEL_SCRIPT_MAX_QUEUE 256 //脚本队列最大量
#define SENTINEL_SCRIPT_MAX_RUNNING 16 //最大运行的脚本数量
#define SENTINEL_SCRIPT_MAX_RUNTIME 60000 //脚本的最大运行时长
#define SENTINEL_SCRIPT_MAX_RETRY 10 //脚本失败重试的最大次数
#define SENTINEL_SCRIPT_RETRY_DELAY 30000 //重试的时间间隔

#define SENTINEL_SIMFAILURE_NONE 0
#define SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION (1<<0) //选举完之后退出
#define SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION (1<<1) //晋升完之后退出

//哨兵和每一个master,slave,其他哨兵节点的连接信息,分为两类,1.普通连接 2.发布订阅的连接
typedef struct instanceLink {
int refcount; // 引用计数
int disconnected; // 断开是否连接
int pending_commands; // 等待回复的命令数量
redisAsyncContext *cc; // 普通连接的上下文
redisAsyncContext *pc; // 发布订阅连接的上下文
mstime_t cc_conn_time; // 普通连接的建立连接时间
mstime_t pc_conn_time; // 发布订阅连接的建立连接时间
mstime_t pc_last_activity; // 上一次接收到任意消息的时间
mstime_t last_avail_time; // 上一次有效回复ping命令的时间
mstime_t act_ping_time; //上一次ping发送的时间,没有收到PONG回复的那次,收到PONG之后归0
mstime_t last_ping_time; //上一次发送ping的时间
mstime_t last_pong_time; //上一次收到ping回复的时间
mstime_t last_reconn_time; //上一次重连的时间
} instanceLink;

//管理其他哨兵、master、slave节点的结构体
typedef struct sentinelRedisInstance {
int flags; //标志位,标识当前节点属性和状态,详见SRI_....定义
char *name; //配置文件中master的名字
char *runid; //当前实例的运行id
uint64_t config_epoch; //设置纪元
sentinelAddr *addr; //master地址
instanceLink *link; //连接信息
mstime_t last_pub_time; //上一次往hello频道发送消息的时间
mstime_t last_hello_time; //上一次从hello频道收到消息的时间
mstime_t last_master_down_reply_time; //上一次回复is-master-down command的时间
mstime_t s_down_since_time; //标记成主观下线的时间
mstime_t o_down_since_time; //标记成客观下线的时间
mstime_t down_after_period; //过了多久没有有效回复,认为下线的时间
mstime_t info_refresh; //收到info回复的时间
dict *renamed_commands; //重命名的命令集

//节点报告自己角色
int role_reported;
mstime_t role_reported_time; //汇报节点的时间
mstime_t slave_conf_change_time; //上一次slave节点conf变更的时间

//master节点专有
dict *sentinels; //监听该master的其他哨兵
dict *slaves; //该master的slave节点
unsigned int quorum;//成为leader,需要多少个哨兵需要达成一致
int parallel_syncs; //同时有多少个slave节点更新配置
char *auth_pass; //连接的密码

//slave节点专有
mstime_t master_link_down_time; //和master同步连接断开的时间
int slave_priority; //优先级
mstime_t slave_reconf_sent_time; //发送slaveof的时间
struct sentinelRedisInstance *master; //master的信息
char *slave_master_host; //master的host信息,info命令出来的
int slave_master_port; //master的port信息,info命令出来的
int slave_master_link_status; //master连接的状态,info命令出来的
unsigned long long slave_repl_offset; //slave同步的偏移量

//故障修复专有
char *leader; //对于master来说,是需要主导故障修复的哨兵节点,对于哨兵来说,表示投票的对象
uint64_t failover_epoch; //故障修复的纪元
int failover_state; //故障修复的进度状态,详见SENTINEL_FAILOVER_*
mstime_t failover_state_change_time; //故障修复状态变更的时间
mstime_t failover_start_time; //上一次故障修复开始的时间
mstime_t failover_timeout; //故障修复的超时时间
mstime_t failover_delay_logged; //记录的延迟时间
struct sentinelRedisInstance *promoted_slave; //从slave中选举出来新的master

//通知脚本
char *notification_script;
char *client_reconfig_script;
sds info; //节点info命令返回的信息
} sentinelRedisInstance;

//哨兵模式
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; //哨兵id
uint64_t current_epoch; //当前纪元
dict *masters; //当前哨兵监听的所有master信息,key是配置中的名字,value是sentinelRedisInstance
int tilt; //是否在tilt模式
int running_scripts; //运行中的脚本数量
mstime_t tilt_start_time; //tilt开始时间
mstime_t previous_time; //上一次运行定时任务的时间
list *scripts_queue; //脚本队列
char *announce_ip; //哨兵之间沟通的,表明自己的ip
int announce_port; //哨兵之间沟通的,表明自己的port
unsigned long simfailure_flags; //标志
int deny_scripts_reconfig; //是否允许在运行期间改变脚本的路径信息
} sentinel;

//哨兵脚本任务
typedef struct sentinelScriptJob {
int flags; //标志位,详见SENTINEL_SCRIPT_*
int retry_num; //失败重试次数
char **argv; //参数
mstime_t start_time; //脚本运行开始时间
pid_t pid; //脚本运行进程id
} sentinelScriptJob;


//适配的事件循环
typedef struct redisAeEvents {
redisAsyncContext *context; //异步连接上下文
aeEventLoop *loop; //事件循环
int fd; //连接fd
int reading, writing;
} redisAeEvents;

redisAeReadEvent

处理读事件

1
2
3
4
5
6
static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask);

redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleRead(e->context);
}

redisAeWriteEvent

写事件

1
2
3
4
5
6
static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask);

redisAeEvents *e = (redisAeEvents*)privdata;
redisAsyncHandleWrite(e->context);
}

redisAeAddRead

增加读事件处理

1
2
3
4
5
6
7
8
static void redisAeAddRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->reading) {
e->reading = 1;
aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
}
}

redisAeDelRead

删除读处理

1
2
3
4
5
6
7
8
static void redisAeDelRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (e->reading) {
e->reading = 0;
aeDeleteFileEvent(loop,e->fd,AE_READABLE);
}
}

redisAeAddWrite

添加写事件处理

1
2
3
4
5
6
7
8
static void redisAeAddWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->writing) {
e->writing = 1;
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
}
}

redisAeDelWrite

删除写处理

1
2
3
4
5
6
7
8
static void redisAeDelWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (e->writing) {
e->writing = 0;
aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
}
}

redisAeCleanup

删除写处理&读处理

1
2
3
4
5
6
static void redisAeCleanup(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
redisAeDelRead(privdata);
redisAeDelWrite(privdata);
zfree(e);
}

redisAeAttach

把异步的相应信息添加到主事件循环中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisAeEvents *e;

//已经挂载了
if (ac->ev.data != NULL)
return C_ERR;

e = (redisAeEvents*)zmalloc(sizeof(*e));
e->context = ac;
e->loop = loop;
e->fd = c->fd;
e->reading = e->writing = 0;

//注册各种情况下的处理
ac->ev.addRead = redisAeAddRead;
ac->ev.delRead = redisAeDelRead;
ac->ev.addWrite = redisAeAddWrite;
ac->ev.delWrite = redisAeDelWrite;
ac->ev.cleanup = redisAeCleanup;
ac->ev.data = e;

return C_OK;
}

哨兵模式下接受的命令

哨兵模式是一种特殊的redis模式,只能响应部分命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
{"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
{"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},
{"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0}
};

initSentinelConfig

初始化哨兵的配置

1
2
3
4
void initSentinelConfig(void) {
server.port = REDIS_SENTINEL_PORT; //哨兵模式的端口
server.protected_mode = 0;
}

initSentinel

初始化哨兵

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void initSentinel(void) {
unsigned int j;

//清空正常模式的命令集
dictEmpty(server.commands,NULL);

//把哨兵模式下的命令添加进去
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
int retval;
struct redisCommand *cmd = sentinelcmds+j;

retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
serverAssert(retval == DICT_OK);
}

//初始化信息
sentinel.current_epoch = 0;
sentinel.masters = dictCreate(&instancesDictType,NULL);
sentinel.tilt = 0;
sentinel.tilt_start_time = 0;
sentinel.previous_time = mstime();
sentinel.running_scripts = 0;
sentinel.scripts_queue = listCreate();
sentinel.announce_ip = NULL;
sentinel.announce_port = 0;
sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG;
memset(sentinel.myid,0,sizeof(sentinel.myid));
}

sentinelIsRunning

检查哨兵模式下的必要条件,创建运行id,更新配置到文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void sentinelIsRunning(void) {
int j;

//检查必要条件,配置文件
if (server.configfile == NULL) {
serverLog(LL_WARNING,
"Sentinel started without a config file. Exiting...");
exit(1);
} else if (access(server.configfile,W_OK) == -1) {
serverLog(LL_WARNING,
"Sentinel config file %s is not writable: %s. Exiting...",
server.configfile,strerror(errno));
exit(1);
}

//检查运行id
for (j = 0; j < CONFIG_RUN_ID_SIZE; j++)
if (sentinel.myid[j] != 0) break;

//生成一个id
if (j == CONFIG_RUN_ID_SIZE) {
getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE);
sentinelFlushConfig();
}

serverLog(LL_WARNING,"Sentinel ID is %s", sentinel.myid);

//生成+monitor事件,对于监控的每一个master
sentinelGenerateInitialMonitorEvents();
}

createSentinelAddr

创建哨兵地址结构体,根据host和端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sentinelAddr *createSentinelAddr(char *hostname, int port) {
char ip[NET_IP_STR_LEN];
sentinelAddr *sa;

//检查端口
if (port < 0 || port > 65535) {
errno = EINVAL;
return NULL;
}

//host转换成ip
if (anetResolve(NULL,hostname,ip,sizeof(ip)) == ANET_ERR) {
errno = ENOENT;
return NULL;
}
sa = zmalloc(sizeof(*sa));
sa->ip = sdsnew(ip);
sa->port = port;
return sa;
}

dupSentinelAddr

复制一个地址结构体

1
2
3
4
5
6
7
8
sentinelAddr *dupSentinelAddr(sentinelAddr *src) {
sentinelAddr *sa;

sa = zmalloc(sizeof(*sa));
sa->ip = sdsnew(src->ip);
sa->port = src->port;
return sa;
}

releaseSentinelAddr

释放地址结构体

1
2
3
4
void releaseSentinelAddr(sentinelAddr *sa) {
sdsfree(sa->ip);
zfree(sa);
}

sentinelAddrIsEqual

判断两个地址结构体是否一样

1
2
3
int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) {
return a->port == b->port && !strcasecmp(a->ip,b->ip);
}

sentinelEvent

产生一个哨兵事件

level表示日志等级,只有LL_WARNING级别的才会触发脚本通知
fmt以‘%@’开始,表示输出ri节点的信息,格式为 ,如果ri不是master节点,会再添加上一段信息@

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
const char *fmt, ...) {
va_list ap;
char msg[LOG_MAX_LEN];
robj *channel, *payload;

//处理特殊格式
if (fmt[0] == '%' && fmt[1] == '@') {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
NULL : ri->master;

if (master) {
snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
sentinelRedisInstanceTypeStr(ri),
ri->name, ri->addr->ip, ri->addr->port,
master->name, master->addr->ip, master->addr->port);
} else {
snprintf(msg, sizeof(msg), "%s %s %s %d",
sentinelRedisInstanceTypeStr(ri),
ri->name, ri->addr->ip, ri->addr->port);
}
fmt += 2;
} else {
msg[0] = '\0';
}

//处理剩余的格式
if (fmt[0] != '\0') {
va_start(ap, fmt);
vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
va_end(ap);
}

//记录下来
if (level >= server.verbosity)
serverLog(level,"%s %s",type,msg);

//非debug模式,往type频道内发送消息
if (level != LL_DEBUG) {
channel = createStringObject(type,strlen(type));
payload = createStringObject(msg,strlen(msg));
pubsubPublishMessage(channel,payload);
decrRefCount(channel);
decrRefCount(payload);
}

//启动脚本处理
if (level == LL_WARNING && ri != NULL) {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
ri : ri->master;
if (master && master->notification_script) {
sentinelScheduleScriptExecution(master->notification_script,
type,msg,NULL);
}
}
}

sentinelGenerateInitialMonitorEvents

产生+monitor事件,在初始化的时候针对监控的每一个master触发

1
2
3
4
5
6
7
8
9
10
11
12
void sentinelGenerateInitialMonitorEvents(void) {
dictIterator *di;
dictEntry *de;

//遍历当前哨兵监控的所有master
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
}
dictReleaseIterator(di);
}

脚本

sentinelReleaseScriptJob

释放脚本作业

1
2
3
4
5
6
7
void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
int j = 0;

while(sj->argv[j]) sdsfree(sj->argv[j++]);
zfree(sj->argv);
zfree(sj);
}
sentinelScheduleScriptExecution
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#define SENTINEL_SCRIPT_MAX_ARGS 16 //脚本最大的参数数量
void sentinelScheduleScriptExecution(char *path, ...) {
va_list ap;
char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
int argc = 1;
sentinelScriptJob *sj;

va_start(ap, path);
while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
argv[argc] = va_arg(ap,char*);
if (!argv[argc]) break;
argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
argc++;
}
va_end(ap);
argv[0] = sdsnew(path);

//设置任务的属性
sj = zmalloc(sizeof(*sj));
sj->flags = SENTINEL_SCRIPT_NONE;
sj->retry_num = 0;
sj->argv = zmalloc(sizeof(char*)*(argc+1));
sj->start_time = 0;
sj->pid = 0;
memcpy(sj->argv,argv,sizeof(char*)*(argc+1));

//添加到任务队列
listAddNodeTail(sentinel.scripts_queue,sj);

//当任务队列已经满的时候,删除最早的未执行的任务
if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
listNode *ln;
listIter li;

listRewind(sentinel.scripts_queue,&li);
while ((ln = listNext(&li)) != NULL) {
sj = ln->value;

if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;

//删除
listDelNode(sentinel.scripts_queue,ln);
sentinelReleaseScriptJob(sj);
break;
}
serverAssert(listLength(sentinel.scripts_queue) <=
SENTINEL_SCRIPT_MAX_QUEUE);
}
}
sentinelGetScriptListNodeByPid

根据进程id,获取对应的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
listNode *ln;
listIter li;

listRewind(sentinel.scripts_queue,&li);
while ((ln = listNext(&li)) != NULL) {
sentinelScriptJob *sj = ln->value;

//匹配运行中的脚本
if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
return ln;
}
return NULL;
}
sentinelRunPendingScripts

运行脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void sentinelRunPendingScripts(void) {
listNode *ln;
listIter li;
mstime_t now = mstime();

//查找任务队列&执行,如果运行中的数量还没有达到最大限制
listRewind(sentinel.scripts_queue,&li);
while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
(ln = listNext(&li)) != NULL)
{
sentinelScriptJob *sj = ln->value;
pid_t pid;

//跳过运行中的
if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;

//跳过重试的,但还没有到时间的
if (sj->start_time && sj->start_time > now) continue;

//标记运行中&记录时间&重试次数
sj->flags |= SENTINEL_SCRIPT_RUNNING;
sj->start_time = mstime();
sj->retry_num++;
pid = fork();

if (pid == -1) {
sentinelEvent(LL_WARNING,"-script-error",NULL,
"%s %d %d", sj->argv[0], 99, 0);
sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
sj->pid = 0;
} else if (pid == 0) {
//子进程
execve(sj->argv[0],sj->argv,environ);
_exit(2);
} else {
//父进程,维护运行中的脚本数量和任务运行的进程id
sentinel.running_scripts++;
sj->pid = pid;
sentinelEvent(LL_DEBUG,"+script-child",NULL,"%ld",(long)pid);
}
}
}
sentinelScriptRetryDelay

根据失败次数获取下一次执行任务的延迟时间,每失败一次时间久翻倍

1
2
3
4
5
6
mstime_t sentinelScriptRetryDelay(int retry_num) {
mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;

while (retry_num-- > 1) delay *= 2;
return delay;
}
sentinelCollectTerminatedScripts

根据脚本进程的退出情况,处理后续

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void sentinelCollectTerminatedScripts(void) {
int statloc;
pid_t pid;

//检查是否有子进程退出
while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
listNode *ln;
sentinelScriptJob *sj;

if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
sentinelEvent(LL_DEBUG,"-script-child",NULL,"%ld %d %d",
(long)pid, exitcode, bysignal);

//获取队列中对应的任务
ln = sentinelGetScriptListNodeByPid(pid);
if (ln == NULL) {
serverLog(LL_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
continue;
}
sj = ln->value;

//接收到信号退出,或者返回码为1表示重试
if ((bysignal || exitcode == 1) &&
sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
{
sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
sj->pid = 0;

//下次执行时间加倍
sj->start_time = mstime() +
sentinelScriptRetryDelay(sj->retry_num);
} else {
//移除任务,虽然任务不是完美的方式退出,可能有失败
if (bysignal || exitcode != 0) {
sentinelEvent(LL_WARNING,"-script-error",NULL,
"%s %d %d", sj->argv[0], bysignal, exitcode);
}
listDelNode(sentinel.scripts_queue,ln);
sentinelReleaseScriptJob(sj);
sentinel.running_scripts--;
}
}
}
sentinelKillTimedoutScripts

干掉运行超时的脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void sentinelKillTimedoutScripts(void) {
listNode *ln;
listIter li;
mstime_t now = mstime();

listRewind(sentinel.scripts_queue,&li);
while ((ln = listNext(&li)) != NULL) {
sentinelScriptJob *sj = ln->value;

//运行中&超时
if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
(now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
{
sentinelEvent(LL_WARNING,"-script-timeout",NULL,"%s %ld",
sj->argv[0], (long)sj->pid);
kill(sj->pid,SIGKILL);
}
}
}
sentinelPendingScriptsCommand

响应sentinel pending-script命令,获取任务队列中的脚本信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void sentinelPendingScriptsCommand(client *c) {
listNode *ln;
listIter li;

addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
listRewind(sentinel.scripts_queue,&li);
while ((ln = listNext(&li)) != NULL) {
sentinelScriptJob *sj = ln->value;
int j = 0;

addReplyMultiBulkLen(c,10);

addReplyBulkCString(c,"argv");
while (sj->argv[j]) j++;
addReplyMultiBulkLen(c,j);
j = 0;
while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);

addReplyBulkCString(c,"flags");
addReplyBulkCString(c,
(sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");

addReplyBulkCString(c,"pid");
addReplyBulkLongLong(c,sj->pid);

if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
addReplyBulkCString(c,"run-time");
addReplyBulkLongLong(c,mstime() - sj->start_time);
} else {
mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
if (delay < 0) delay = 0;
addReplyBulkCString(c,"run-delay");
addReplyBulkLongLong(c,delay);
}

addReplyBulkCString(c,"retry-num");
addReplyBulkLongLong(c,sj->retry_num);
}
}
sentinelCallClientReconfScript

slave节点更新配置时,触发的脚本

脚本的参数为

1
2
3
4
5
6
7
8
9
10
11
void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
char fromport[32], toport[32];

if (master->client_reconfig_script == NULL) return;
ll2string(fromport,sizeof(fromport),from->port);
ll2string(toport,sizeof(toport),to->port);
sentinelScheduleScriptExecution(master->client_reconfig_script,
master->name,
(role == SENTINEL_LEADER) ? "leader" : "observer",
state, from->ip, fromport, to->ip, toport, NULL);
}

连接相关

哨兵和master,slave,其他哨兵的连接,都是使用instanceLink结构体存储,分为普通命令连接和发布订阅连接两种,都是异步的流程

创建连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
instanceLink *createInstanceLink(void) {
instanceLink *link = zmalloc(sizeof(*link));

link->refcount = 1;
link->disconnected = 1;
link->pending_commands = 0;
link->cc = NULL;
link->pc = NULL;
link->cc_conn_time = 0;
link->pc_conn_time = 0;
link->last_reconn_time = 0;
link->pc_last_activity = 0;
link->act_ping_time = mstime();
link->last_ping_time = 0;
link->last_avail_time = mstime();
link->last_pong_time = mstime();
return link;
}
instanceLinkCloseConnection

关闭特定的连接,普通命令连接或者发布订阅连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void instanceLinkCloseConnection(instanceLink *link, redisAsyncContext *c) {
if (c == NULL) return;

//普通连接
if (link->cc == c) {
link->cc = NULL;
link->pending_commands = 0;
}

//发布订阅
if (link->pc == c) link->pc = NULL;
c->data = NULL;
link->disconnected = 1;
redisAsyncFree(c);
}

释放连接,连接都是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
instanceLink *releaseInstanceLink(instanceLink *link, sentinelRedisInstance *ri)
{
serverAssert(link->refcount > 0);
link->refcount--;

//如果还有其他节点使用该连接,那么在hiredis的上下文中,替换其回调函数为忽略任何信息
if (link->refcount != 0) {
if (ri && ri->link->cc) {
redisCallback *cb;
redisCallbackList *callbacks = &link->cc->replies;

cb = callbacks->head;
while(cb) {
//替换
if (cb->privdata == ri) {
cb->fn = sentinelDiscardReplyCallback;
cb->privdata = NULL; /* Not strictly needed. */
}
cb = cb->next;
}
}
return link;
}

//没有人使用了,关闭普通连接&发布订阅连接
instanceLinkCloseConnection(link,link->cc);
instanceLinkCloseConnection(link,link->pc);
zfree(link);
return NULL;
}
sentinelTryConnectionSharing

尝试共享连接,当一个哨兵监听多个master时,每个master的sentinels里,都有一个sentinelRedisInstance哨兵节点,那么这些哨兵的节点是可以共用一个的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_SENTINEL);
dictIterator *di;
dictEntry *de;

if (ri->runid == NULL) return C_ERR; //无法辨识
if (ri->link->refcount > 1) return C_ERR; //当前节点的连接已经共享了,无需替换

//遍历所有的master节点
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *master = dictGetVal(de), *match;

//跳过自己当前的master
if (master == ri->master) continue;

//从master的哨兵里查找相同运行id的节点
match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
NULL,0,ri->runid);
if (match == NULL) continue; //没有找到
if (match == ri) continue; //找到自己,虽然永远不会发生

//找一个可以共用的,那么就可以把自己的连接给释放掉了
releaseInstanceLink(ri->link,NULL);
ri->link = match->link;
match->link->refcount++;
return C_OK;
}
dictReleaseIterator(di);
return C_ERR;
}
sentinelUpdateSentinelAddressInAllMasters

更新所有master中的某一个哨兵的地址信息,使用ri中的地址替换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int sentinelUpdateSentinelAddressInAllMasters(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_SENTINEL);
dictIterator *di;
dictEntry *de;
int reconfigured = 0;

//遍历所有master
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *master = dictGetVal(de), *match;
match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
NULL,0,ri->runid);
//没找到
if (match == NULL) continue;

//断开连接
if (match->link->cc != NULL)
instanceLinkCloseConnection(match->link,match->link->cc);
if (match->link->pc != NULL)
instanceLinkCloseConnection(match->link,match->link->pc);

if (match == ri) continue;//找到自己

//释放旧的地址,复制新的
releaseSentinelAddr(match->addr);
match->addr = dupSentinelAddr(ri->addr);
reconfigured++;
}
dictReleaseIterator(di);

//通知
if (reconfigured)
sentinelEvent(LL_NOTICE,"+sentinel-address-update", ri,
"%@ %d additional matching instances", reconfigured);
return reconfigured;
}
instanceLinkConnectionError

连接出错

1
2
3
4
5
6
7
8
9
10
11
12
13
void instanceLinkConnectionError(const redisAsyncContext *c) {
instanceLink *link = c->data;
int pubsub;

if (!link) return;

pubsub = (link->pc == c);
if (pubsub)
link->pc = NULL;
else
link->cc = NULL;
link->disconnected = 1;
}
sentinelLinkEstablishedCallback

确定连接结果的回调,只处理失败的情况

1
2
3
void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
if (status != C_OK) instanceLinkConnectionError(c);
}
sentinelDisconnectCallback

断开连接的回调

1
2
3
4
void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
UNUSED(status);
instanceLinkConnectionError(c);
}

哨兵管理节点实例

每一个master、slave、其他的哨兵都是在该对象下管理

createSentinelRedisInstance

创建节点实例

name 节点实例的名字,对于master,是在配置文件中指定的;对于salve,是ip:port;对于其他哨兵,是起runid
flags 节点实例类型,master、slave、哨兵
hostname 实例主机名
port 实例端口
quorum master节点发生故障时,需要多少个哨兵达成一致才可以
master 创建slave、哨兵节点实例的时候,对应的master信息,创建master时,为NULL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
sentinelRedisInstance *ri;
sentinelAddr *addr;
dict *table = NULL;
char slavename[NET_PEER_ID_LEN], *sdsname;

serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
serverAssert((flags & SRI_MASTER) || master != NULL);

//检查host&port是否可用
addr = createSentinelAddr(hostname,port);
if (addr == NULL) return NULL;

//对于slave节点,ip:port作为其名字
if (flags & SRI_SLAVE) {
anetFormatAddr(slavename, sizeof(slavename), hostname, port);
name = slavename;
}

//确定新节点要插入的位置
if (flags & SRI_MASTER) table = sentinel.masters;
else if (flags & SRI_SLAVE) table = master->slaves;
else if (flags & SRI_SENTINEL) table = master->sentinels;

//检查是否已经存在
sdsname = sdsnew(name);
if (dictFind(table,sdsname)) {
releaseSentinelAddr(addr);
sdsfree(sdsname);
errno = EBUSY;
return NULL;
}

//创建实例,连接状态为断开,下一次时间循环时会处理连接
ri = zmalloc(sizeof(*ri));

ri->flags = flags;
ri->name = sdsname;
ri->runid = NULL;
ri->config_epoch = 0;
ri->addr = addr;
ri->link = createInstanceLink();
ri->last_pub_time = mstime();
ri->last_hello_time = mstime();
ri->last_master_down_reply_time = mstime();
ri->s_down_since_time = 0;
ri->o_down_since_time = 0;
ri->down_after_period = master ? master->down_after_period :
SENTINEL_DEFAULT_DOWN_AFTER;
ri->master_link_down_time = 0;
ri->auth_pass = NULL;
ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
ri->slave_reconf_sent_time = 0;
ri->slave_master_host = NULL;
ri->slave_master_port = 0;
ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
ri->slave_repl_offset = 0;
ri->sentinels = dictCreate(&instancesDictType,NULL);
ri->quorum = quorum;
ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
ri->master = master;
ri->slaves = dictCreate(&instancesDictType,NULL);
ri->info_refresh = 0;
ri->renamed_commands = dictCreate(&renamedCommandsDictType,NULL);

/* Failover state. */
ri->leader = NULL;
ri->leader_epoch = 0;
ri->failover_epoch = 0;
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = 0;
ri->failover_start_time = 0;
ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
ri->failover_delay_logged = 0;
ri->promoted_slave = NULL;
ri->notification_script = NULL;
ri->client_reconfig_script = NULL;
ri->info = NULL;

//角色
ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
ri->role_reported_time = mstime();
ri->slave_conf_change_time = mstime();

//添加到正确的位置
dictAdd(table, ri->name, ri);
return ri;
}
releaseSentinelRedisInstance

释放节点实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {

dictRelease(ri->sentinels);
dictRelease(ri->slaves);

//断开连接
releaseInstanceLink(ri->link,ri);

sdsfree(ri->name);
sdsfree(ri->runid);
sdsfree(ri->notification_script);
sdsfree(ri->client_reconfig_script);
sdsfree(ri->slave_master_host);
sdsfree(ri->leader);
sdsfree(ri->auth_pass);
sdsfree(ri->info);
releaseSentinelAddr(ri->addr);
dictRelease(ri->renamed_commands);

//如果释放的节点是被选中成为新master的节点,那么更新master中的被选中节点信息
if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
ri->master->promoted_slave = NULL;

zfree(ri);
}
sentinelRedisInstanceLookupSlave

根据ip和端口在master节点中查找对应的slave节点实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
sentinelRedisInstance *ri, char *ip, int port)
{
sds key;
sentinelRedisInstance *slave;
char buf[NET_PEER_ID_LEN];

serverAssert(ri->flags & SRI_MASTER);

//组装名称ip:port
anetFormatAddr(buf,sizeof(buf),ip,port);
key = sdsnew(buf);

//查找
slave = dictFetchValue(ri->slaves,key);
sdsfree(key);
return slave;
}
sentinelRedisInstanceTypeStr

返回实例对应的类型

1
2
3
4
5
6
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
if (ri->flags & SRI_MASTER) return "master";
else if (ri->flags & SRI_SLAVE) return "slave";
else if (ri->flags & SRI_SENTINEL) return "sentinel";
else return "unknown";
}
removeMatchingSentinelFromMaster

从master的监听哨兵中移除一个哨兵,根据runid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int removeMatchingSentinelFromMaster(sentinelRedisInstance *master, char *runid) {
dictIterator *di;
dictEntry *de;
int removed = 0;

if (runid == NULL) return 0;

di = dictGetSafeIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

//匹配runid
if (ri->runid && strcmp(ri->runid,runid) == 0) {
dictDelete(master->sentinels,ri->name);
removed++;
}
}
dictReleaseIterator(di);
return removed;
}
getSentinelRedisInstanceByAddrAndRunID

在节点字典里,根绝ip,port,runid查找节点,ip和runid必须有一个非空

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *instance = NULL;

//ip&runid至少有一个非空
serverAssert(ip || runid);
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

if (runid && !ri->runid) continue;
if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
(ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
ri->addr->port == port)))
{
instance = ri;
break;
}
}
dictReleaseIterator(di);
return instance;
}
sentinelGetMasterByName

根据名称获取master节点实例

1
2
3
4
5
6
7
8
sentinelRedisInstance *sentinelGetMasterByName(char *name) {
sentinelRedisInstance *ri;
sds sdsname = sdsnew(name);

ri = dictFetchValue(sentinel.masters,sdsname);
sdsfree(sdsname);
return ri;
}
sentinelAddFlagsToDictOfRedisInstances

向一堆节点添加标志位

1
2
3
4
5
6
7
8
9
10
11
void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
ri->flags |= flags;
}
dictReleaseIterator(di);
}
sentinelDelFlagsToDictOfRedisInstances

向一堆节点删除标志位

1
2
3
4
5
6
7
8
9
10
11
void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
ri->flags &= ~flags;
}
dictReleaseIterator(di);
}
sentinelResetMaster

重置master节点信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#define SENTINEL_RESET_NO_SENTINELS (1<<0)
void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
serverAssert(ri->flags & SRI_MASTER);
dictRelease(ri->slaves);

//清空对应的slave节点信息
ri->slaves = dictCreate(&instancesDictType,NULL);

//清空对应的哨兵节点信息
if (!(flags & SENTINEL_RESET_NO_SENTINELS)) {
dictRelease(ri->sentinels);
ri->sentinels = dictCreate(&instancesDictType,NULL);
}

//关闭连接
instanceLinkCloseConnection(ri->link,ri->link->cc);
instanceLinkCloseConnection(ri->link,ri->link->pc);
ri->flags &= SRI_MASTER;

//清除leader信息
if (ri->leader) {
sdsfree(ri->leader);
ri->leader = NULL;
}

//故障修复状态归零
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = 0;
ri->failover_start_time = 0;
ri->promoted_slave = NULL;
sdsfree(ri->runid);
sdsfree(ri->slave_master_host);
ri->runid = NULL;
ri->slave_master_host = NULL;
ri->link->act_ping_time = mstime();
ri->link->last_ping_time = 0;
ri->link->last_avail_time = mstime();
ri->link->last_pong_time = mstime();
ri->role_reported_time = mstime();
ri->role_reported = SRI_MASTER;
if (flags & SENTINEL_GENERATE_EVENT)
sentinelEvent(LL_WARNING,"+reset-master",ri,"%@");
}
sentinelResetMastersByPattern

正则匹配,重置master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int sentinelResetMastersByPattern(char *pattern, int flags) {
dictIterator *di;
dictEntry *de;
int reset = 0;

di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

if (ri->name) {
if (stringmatch(pattern,ri->name,0)) {
sentinelResetMaster(ri,flags);
reset++;
}
}
}
dictReleaseIterator(di);
return reset;
}
sentinelResetMasterAndChangeAddress

重置master并改变ip和port

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
sentinelAddr *oldaddr, *newaddr;
sentinelAddr **slaves = NULL;
int numslaves = 0, j;
dictIterator *di;
dictEntry *de;

//校验ip&port
newaddr = createSentinelAddr(ip,port);
if (newaddr == NULL) return C_ERR;

//在master下的所有的slave节点中去除掉匹配中的slave,针对于某个slave提升为master的情况
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);

if (sentinelAddrIsEqual(slave->addr,newaddr)) continue;
slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
slaves[numslaves++] = createSentinelAddr(slave->addr->ip,
slave->addr->port);
}
dictReleaseIterator(di);

//把之前的master加入到slave节点中去,为了后面改节点恢复成master
if (!sentinelAddrIsEqual(newaddr,master->addr)) {
slaves = zrealloc(slaves,sizeof(sentinelAddr*)*(numslaves+1));
slaves[numslaves++] = createSentinelAddr(master->addr->ip,
master->addr->port);
}

//重置master信息,指定新的ip和port
sentinelResetMaster(master,SENTINEL_RESET_NO_SENTINELS);
oldaddr = master->addr;
master->addr = newaddr;
master->o_down_since_time = 0;
master->s_down_since_time = 0;

//把salve节点 归到 新的master下
for (j = 0; j < numslaves; j++) {
sentinelRedisInstance *slave;

slave = createSentinelRedisInstance(NULL,SRI_SLAVE,slaves[j]->ip,
slaves[j]->port, master->quorum, master);
releaseSentinelAddr(slaves[j]);
if (slave) sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
}
zfree(slaves);

//更新配置
releaseSentinelAddr(oldaddr);
sentinelFlushConfig();
return C_OK;
}
sentinelRedisInstanceNoDownFor

判断节点在指定的超时时间内,是否有下线转态,非0表示没有

1
2
3
4
5
6
7
8
int sentinelRedisInstanceNoDownFor(sentinelRedisInstance *ri, mstime_t ms) {
mstime_t most_recent;

most_recent = ri->s_down_since_time;
if (ri->o_down_since_time > most_recent)
most_recent = ri->o_down_since_time;
return most_recent == 0 || (mstime() - most_recent) > ms;
}
sentinelGetCurrentMasterAddress

获取当前节点master的地址

1
2
3
4
5
6
7
8
9
10
11
12
13
sentinelAddr *sentinelGetCurrentMasterAddress(sentinelRedisInstance *master) {

//如果是故障恢复中,且选出提升的salve已经提升为master了,返回新master的信息
if ((master->flags & SRI_FAILOVER_IN_PROGRESS) &&
master->promoted_slave &&
master->failover_state >= SENTINEL_FAILOVER_STATE_RECONF_SLAVES)
{
return master->promoted_slave->addr;
} else {
//返回旧的master
return master->addr;
}
}
sentinelPropagateDownAfterPeriod

更新master下的所有slave、哨兵节点的 变为下线所需要的时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void sentinelPropagateDownAfterPeriod(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
int j;
dict *d[] = {master->slaves, master->sentinels, NULL};

for (j = 0; d[j]; j++) {
di = dictGetIterator(d[j]);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
ri->down_after_period = master->down_after_period;
}
dictReleaseIterator(di);
}
}
sentinelGetInstanceTypeString

获取节点类型

1
2
3
4
5
6
char *sentinelGetInstanceTypeString(sentinelRedisInstance *ri) {
if (ri->flags & SRI_MASTER) return "master";
else if (ri->flags & SRI_SLAVE) return "slave";
else if (ri->flags & SRI_SENTINEL) return "sentinel";
else return "unknown";
}
sentinelInstanceMapCommand

获取重命名的命令,当我们从哨兵节点向master发送命令的时候,可能需要采用别名;因为master方面可能为了安全考虑,已经更换了conf,saleof的名字

1
2
3
4
5
6
7
char *sentinelInstanceMapCommand(sentinelRedisInstance *ri, char *command) {
sds sc = sdsnew(command);
if (ri->master) ri = ri->master;
char *retval = dictFetchValue(ri->renamed_commands, sc);
sdsfree(sc);
return retval ? retval : command;
}

配置相关

sentinelHandleConfiguration

处理配置文件中相关的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
char *sentinelHandleConfiguration(char **argv, int argc) {
sentinelRedisInstance *ri;

if (!strcasecmp(argv[0],"monitor") && argc == 5) {
//监听master 格式:monitor <name> <host> <port> <quorum>
int quorum = atoi(argv[4]);

if (quorum <= 0) return "Quorum must be 1 or greater.";
if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
atoi(argv[3]),quorum,NULL) == NULL)
{
switch(errno) {
case EBUSY: return "Duplicated master name.";
case ENOENT: return "Can't resolve master instance hostname.";
case EINVAL: return "Invalid port number";
}
}
} else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
//指定判断下线时长 down-after-milliseconds <name> <milliseconds>
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->down_after_period = atoi(argv[2]);
if (ri->down_after_period <= 0)
return "negative or zero time parameter.";
sentinelPropagateDownAfterPeriod(ri);
} else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
//指定故障修复超时时间 failover-timeout <name> <milliseconds>
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->failover_timeout = atoi(argv[2]);
if (ri->failover_timeout <= 0)
return "negative or zero time parameter.";
} else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
//指定故障修复时,最多有多少个slave节点同时同步 parallel-syncs <name> <milliseconds>
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->parallel_syncs = atoi(argv[2]);
} else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
//指定通知脚本 notification-script <name> <path>
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
if (access(argv[2],X_OK) == -1)
return "Notification script seems non existing or non executable.";
ri->notification_script = sdsnew(argv[2]);
} else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
//指定更新配置时的脚本 client-reconfig-script <name> <path>
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
if (access(argv[2],X_OK) == -1)
return "Client reconfiguration script seems non existing or "
"non executable.";
ri->client_reconfig_script = sdsnew(argv[2]);
} else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
//指定连接时的密码 auth-pass <name> <password>
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->auth_pass = sdsnew(argv[2]);
} else if (!strcasecmp(argv[0],"current-epoch") && argc == 2) {
//指定纪元 current-epoch <epoch>
unsigned long long current_epoch = strtoull(argv[1],NULL,10);
if (current_epoch > sentinel.current_epoch)
sentinel.current_epoch = current_epoch;
} else if (!strcasecmp(argv[0],"myid") && argc == 2) {
//指定运行id
if (strlen(argv[1]) != CONFIG_RUN_ID_SIZE)
return "Malformed Sentinel id in myid option.";
memcpy(sentinel.myid,argv[1],CONFIG_RUN_ID_SIZE);
} else if (!strcasecmp(argv[0],"config-epoch") && argc == 3) {
//指定配置纪元 config-epoch <name> <epoch>
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->config_epoch = strtoull(argv[2],NULL,10);

if (ri->config_epoch > sentinel.current_epoch)
sentinel.current_epoch = ri->config_epoch;
} else if (!strcasecmp(argv[0],"leader-epoch") && argc == 3) {
//指定leader纪元 leader-epoch <name> <epoch>
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->leader_epoch = strtoull(argv[2],NULL,10);
} else if ((!strcasecmp(argv[0],"known-slave") ||
!strcasecmp(argv[0],"known-replica")) && argc == 4)
{
sentinelRedisInstance *slave;

//指定对应的slave known-replica <name> <ip> <port>
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,argv[2],
atoi(argv[3]), ri->quorum, ri)) == NULL)
{
return "Wrong hostname or port for replica.";
}
} else if (!strcasecmp(argv[0],"known-sentinel") &&
(argc == 4 || argc == 5)) {
sentinelRedisInstance *si;

if (argc == 5) {
//指定对应的哨兵 known-sentinel <name> <ip> <port> [runid]
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
if ((si = createSentinelRedisInstance(argv[4],SRI_SENTINEL,argv[2],
atoi(argv[3]), ri->quorum, ri)) == NULL)
{
return "Wrong hostname or port for sentinel.";
}
si->runid = sdsnew(argv[4]);
sentinelTryConnectionSharing(si);
}
} else if (!strcasecmp(argv[0],"rename-command") && argc == 4) {
//命令重命名 rename-command <name> <command> <renamed-command>
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
sds oldcmd = sdsnew(argv[2]);
sds newcmd = sdsnew(argv[3]);
if (dictAdd(ri->renamed_commands,oldcmd,newcmd) != DICT_OK) {
sdsfree(oldcmd);
sdsfree(newcmd);
return "Same command renamed multiple times with rename-command.";
}
} else if (!strcasecmp(argv[0],"announce-ip") && argc == 2) {
//指定ip announce-ip <ip-address>
if (strlen(argv[1]))
sentinel.announce_ip = sdsnew(argv[1]);
} else if (!strcasecmp(argv[0],"announce-port") && argc == 2) {
//指定端口 announce-port <port>
sentinel.announce_port = atoi(argv[1]);
} else if (!strcasecmp(argv[0],"deny-scripts-reconfig") && argc == 2) {
//指定脚本延迟 deny-scripts-reconfig <yes|no>
if ((sentinel.deny_scripts_reconfig = yesnotoi(argv[1])) == -1) {
return "Please specify yes or no for the "
"deny-scripts-reconfig options.";
}
} else {
return "Unrecognized sentinel configuration statement.";
}
return NULL;
}
rewriteConfigSentinelOption

重写哨兵配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
void rewriteConfigSentinelOption(struct rewriteConfigState *state) {
dictIterator *di, *di2;
dictEntry *de;
sds line;

//运行id
line = sdscatprintf(sdsempty(), "sentinel myid %s", sentinel.myid);
rewriteConfigRewriteLine(state,"sentinel",line,1);

//脚本延迟
line = sdscatprintf(sdsempty(), "sentinel deny-scripts-reconfig %s",
sentinel.deny_scripts_reconfig ? "yes" : "no");
rewriteConfigRewriteLine(state,"sentinel",line,
sentinel.deny_scripts_reconfig != SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG);

//针对每一个监听的master 创建一个 sentinel monitor
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *master, *ri;
sentinelAddr *master_addr;

//master信息
master = dictGetVal(de);
master_addr = sentinelGetCurrentMasterAddress(master);
line = sdscatprintf(sdsempty(),"sentinel monitor %s %s %d %d",
master->name, master_addr->ip, master_addr->port,
master->quorum);
rewriteConfigRewriteLine(state,"sentinel",line,1);

//下线断定时间
if (master->down_after_period != SENTINEL_DEFAULT_DOWN_AFTER) {
line = sdscatprintf(sdsempty(),
"sentinel down-after-milliseconds %s %ld",
master->name, (long) master->down_after_period);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}

//故障修复超时
if (master->failover_timeout != SENTINEL_DEFAULT_FAILOVER_TIMEOUT) {
line = sdscatprintf(sdsempty(),
"sentinel failover-timeout %s %ld",
master->name, (long) master->failover_timeout);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}

//slave同时同步数量
if (master->parallel_syncs != SENTINEL_DEFAULT_PARALLEL_SYNCS) {
line = sdscatprintf(sdsempty(),
"sentinel parallel-syncs %s %d",
master->name, master->parallel_syncs);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}

//通知脚本
if (master->notification_script) {
line = sdscatprintf(sdsempty(),
"sentinel notification-script %s %s",
master->name, master->notification_script);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}

//重写config脚本
if (master->client_reconfig_script) {
line = sdscatprintf(sdsempty(),
"sentinel client-reconfig-script %s %s",
master->name, master->client_reconfig_script);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}

//密码
if (master->auth_pass) {
line = sdscatprintf(sdsempty(),
"sentinel auth-pass %s %s",
master->name, master->auth_pass);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}

//配置纪元
line = sdscatprintf(sdsempty(),
"sentinel config-epoch %s %llu",
master->name, (unsigned long long) master->config_epoch);
rewriteConfigRewriteLine(state,"sentinel",line,1);

//leader纪元
line = sdscatprintf(sdsempty(),
"sentinel leader-epoch %s %llu",
master->name, (unsigned long long) master->leader_epoch);
rewriteConfigRewriteLine(state,"sentinel",line,1);

//slave节点信息
di2 = dictGetIterator(master->slaves);
while((de = dictNext(di2)) != NULL) {
sentinelAddr *slave_addr;

ri = dictGetVal(de);
slave_addr = ri->addr;

//处理某个slave已经选举成功的情况
if (sentinelAddrIsEqual(slave_addr,master_addr))
slave_addr = master->addr;
line = sdscatprintf(sdsempty(),
"sentinel known-replica %s %s %d",
master->name, slave_addr->ip, slave_addr->port);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}
dictReleaseIterator(di2);

//其他哨兵信息
di2 = dictGetIterator(master->sentinels);
while((de = dictNext(di2)) != NULL) {
ri = dictGetVal(de);
if (ri->runid == NULL) continue;
line = sdscatprintf(sdsempty(),
"sentinel known-sentinel %s %s %d %s",
master->name, ri->addr->ip, ri->addr->port, ri->runid);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}
dictReleaseIterator(di2);

//重命名命令
di2 = dictGetIterator(master->renamed_commands);
while((de = dictNext(di2)) != NULL) {
sds oldname = dictGetKey(de);
sds newname = dictGetVal(de);
line = sdscatprintf(sdsempty(),
"sentinel rename-command %s %s %s",
master->name, oldname, newname);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}
dictReleaseIterator(di2);
}

//哨兵当前纪元
line = sdscatprintf(sdsempty(),
"sentinel current-epoch %llu", (unsigned long long) sentinel.current_epoch);
rewriteConfigRewriteLine(state,"sentinel",line,1);

//哨兵ip
if (sentinel.announce_ip) {
line = sdsnew("sentinel announce-ip ");
line = sdscatrepr(line, sentinel.announce_ip, sdslen(sentinel.announce_ip));
rewriteConfigRewriteLine(state,"sentinel",line,1);
}

//哨兵端口
if (sentinel.announce_port) {
line = sdscatprintf(sdsempty(),"sentinel announce-port %d",
sentinel.announce_port);
rewriteConfigRewriteLine(state,"sentinel",line,1);
}

dictReleaseIterator(di);
}
sentinelFlushConfig

把哨兵模式下的设置参数写入到文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void sentinelFlushConfig(void) {
int fd = -1;
int saved_hz = server.hz;
int rewrite_status;

server.hz = CONFIG_DEFAULT_HZ;
rewrite_status = rewriteConfig(server.configfile);
server.hz = saved_hz;

if (rewrite_status == -1) goto werr;
if ((fd = open(server.configfile,O_RDONLY)) == -1) goto werr;
if (fsync(fd) == -1) goto werr;
if (close(fd) == EOF) goto werr;
return;

werr:
if (fd != -1) close(fd);
serverLog(LL_WARNING,"WARNING: Sentinel was not able to save the new configuration on disk!!!: %s", strerror(errno));
}

hiredis

hiredis是一个Redis官方的开发库,封装了各种连接和遵守redis协议的操作

sentinelSendAuthIfNeeded

发送密码验证,如果设置了的话,auth命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
char *auth_pass = NULL;

if (ri->flags & SRI_MASTER) {
auth_pass = ri->auth_pass;
} else if (ri->flags & SRI_SLAVE) {
auth_pass = ri->master->auth_pass;
} else if (ri->flags & SRI_SENTINEL) {
if (server.requirepass) auth_pass = server.requirepass;
}

if (auth_pass) {
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
sentinelInstanceMapCommand(ri,"AUTH"),
auth_pass) == C_OK) ri->link->pending_commands++;
}
}
sentinelSetClientName

设置当前连接的名字,client命令

1
2
3
4
5
6
7
8
9
10
11
12
void sentinelSetClientName(sentinelRedisInstance *ri, redisAsyncContext *c, char *type) {
char name[64];

snprintf(name,sizeof(name),"sentinel-%.8s-%s",sentinel.myid,type);
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri,
"%s SETNAME %s",
sentinelInstanceMapCommand(ri,"CLIENT"),
name) == C_OK)
{
ri->link->pending_commands++;
}
}
sentinelReconnectInstance

重新连接实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
if (ri->link->disconnected == 0) return;
if (ri->addr->port == 0) return;
instanceLink *link = ri->link;
mstime_t now = mstime();

//存活中,不需要
if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
ri->link->last_reconn_time = now;

//普通命令连接
if (link->cc == NULL) {

//连接
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (link->cc->err) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link,link->cc);
} else {
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
redisAeAttach(server.el,link->cc);

//指定连接回调
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback);

//指定断开连接回调
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback);

//发送密码验证
sentinelSendAuthIfNeeded(ri,link->cc);

//设置客户端连接名称
sentinelSetClientName(ri,link->cc,"cmd");

//发送ping命令
sentinelSendPing(ri);
}
}

//发布订阅连接
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (link->pc->err) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link,link->pc);
} else {
int retval;

link->pc_conn_time = mstime();
link->pc->data = link;
redisAeAttach(server.el,link->pc);

//指定连接成功回调
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);

//指定断开连接回调
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);

//发送密码
sentinelSendAuthIfNeeded(ri,link->pc);

//设置对方当前连接的的名字
sentinelSetClientName(ri,link->pc,"pubsub");

//订阅hello频道
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "%s %s",
sentinelInstanceMapCommand(ri,"SUBSCRIBE"),
SENTINEL_HELLO_CHANNEL);
if (retval != C_OK) {
//不能订阅的话,断开连接
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}

//维护连接状态,只针对哨兵而言
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}
sentinelMasterLooksSane

判断master是否正常

1
2
3
4
5
6
7
int sentinelMasterLooksSane(sentinelRedisInstance *master) {
return
master->flags & SRI_MASTER && //配置文件标识master
master->role_reported == SRI_MASTER && //info标识master
(master->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0 && //没有主观下线&没有客观下线
(mstime() - master->info_refresh) < SENTINEL_INFO_PERIOD*2; //在两倍的info相应时间内有info回复
}
sentinelRefreshInstanceInfo

根据info命令返回信息,更新节点信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
sds *lines;
int numlines, j;
int role = 0;

//缓存下来info信息
sdsfree(ri->info);
ri->info = sdsnew(info);

//断开的时间重置成0
ri->master_link_down_time = 0;

//按行分割信息
lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
for (j = 0; j < numlines; j++) {
sentinelRedisInstance *slave;
sds l = lines[j];

// 运行id run_id:<40 hex chars>
if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
if (ri->runid == NULL) {
ri->runid = sdsnewlen(l+7,40);
} else {
//如果之前有存runid,这次不一样,说明这台机器重启了
if (strncmp(ri->runid,l+7,40) != 0) {
sentinelEvent(LL_NOTICE,"+reboot",ri,"%@");
sdsfree(ri->runid);
ri->runid = sdsnewlen(l+7,40);
}
}
}

//从节点信息
//旧版本格式 slave0:<ip>,<port>,<state>
//新版本格式 slave0:ip=127.0.0.1,port=9999,...
if ((ri->flags & SRI_MASTER) &&
sdslen(l) >= 7 &&
!memcmp(l,"slave",5) && isdigit(l[5]))
{
char *ip, *port, *end;

//旧格式
if (strstr(l,"ip=") == NULL) {
ip = strchr(l,':'); if (!ip) continue;
ip++; //ip开始
port = strchr(ip,','); if (!port) continue;
*port = '\0';
port++; //port开始
end = strchr(port,','); if (!end) continue;
*end = '\0';
} else {
//新格式
ip = strstr(l,"ip="); if (!ip) continue;
ip += 3; //ip开始
port = strstr(l,"port="); if (!port) continue;
port += 5; //port开始
//分割成\0
end = strchr(ip,','); if (end) *end = '\0';
end = strchr(port,','); if (end) *end = '\0';
}

//把不存在的slave加入到master之中
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
atoi(port), ri->quorum, ri)) != NULL)
{
sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
sentinelFlushConfig();
}
}
}

//和master断开的时间 master_link_down_since_seconds:<seconds>
if (sdslen(l) >= 32 &&
!memcmp(l,"master_link_down_since_seconds",30))
{
ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
}

//角色
if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;

if (role == SRI_SLAVE) {
//master_host:<host>
if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
if (ri->slave_master_host == NULL ||
strcasecmp(l+12,ri->slave_master_host))
{
sdsfree(ri->slave_master_host);
ri->slave_master_host = sdsnew(l+12);
ri->slave_conf_change_time = mstime();
}
}

//master_port:<port>
if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {
int slave_master_port = atoi(l+12);

if (ri->slave_master_port != slave_master_port) {
ri->slave_master_port = slave_master_port;
ri->slave_conf_change_time = mstime();
}
}

//和master的连接状态 master_link_status:<status>
if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
ri->slave_master_link_status =
(strcasecmp(l+19,"up") == 0) ?
SENTINEL_MASTER_LINK_STATUS_UP :
SENTINEL_MASTER_LINK_STATUS_DOWN;
}

//优先级 slave_priority:<priority>
if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
ri->slave_priority = atoi(l+15);

//同步offset slave_repl_offset:<offset>
if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))
ri->slave_repl_offset = strtoull(l+18,NULL,10);
}
}
ri->info_refresh = mstime();
sdsfreesplitres(lines,numlines);

//角色不一致,一开始设定的角色和info汇报的角色
if (role != ri->role_reported) {
ri->role_reported_time = mstime();
ri->role_reported = role;
if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();

sentinelEvent(LL_VERBOSE,
((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
"+role-change" : "-role-change",
ri, "%@ new reported role is %s",
role == SRI_MASTER ? "master" : "slave",
ri->flags & SRI_MASTER ? "master" : "slave");
}

//tilt下返回
if (sentinel.tilt) return;

//从master变成slave
if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {

}

//从slave变成master
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
//选举出来的slave成为了master
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
//此次纪元操作成功,更新纪元,后面会同步给其他哨兵
ri->master->config_epoch = ri->master->failover_epoch;
//故障修复流转到等待其他slave更新配置
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
ri->master->failover_state_change_time = mstime();

//更新哨兵配置
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
if (sentinel.simfailure_flags &
SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
sentinelSimFailureCrash();
sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
ri->master,"%@");
sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
"start",ri->master->addr,ri->addr);
sentinelForceHelloUpdateForMaster(ri->master);
} else {
//其余的情况下,认为是不正常的,需要把这个节点重新变更为slave
mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;

//节点不是被选举出来&master节点状态是ok的
if (!(ri->flags & SRI_PROMOTED) &&
sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri,wait_time) &&
mstime() - ri->role_reported_time > wait_time)
{
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
}
}
}

//对于从节点汇报的master和当前的master信息不一致的时候,设置成现在的
if ((ri->flags & SRI_SLAVE) &&
role == SRI_SLAVE &&
(ri->slave_master_port != ri->master->addr->port ||
strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
{
mstime_t wait_time = ri->master->failover_timeout;

//先检查现在的master状态ok不
if (sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri,wait_time) &&
mstime() - ri->slave_conf_change_time > wait_time)
{
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
}
}

//处理其余slave更新master的情况
if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
(ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
{
//发送slaveof -> 等待中
if ((ri->flags & SRI_RECONF_SENT) &&
ri->slave_master_host &&
strcmp(ri->slave_master_host,
ri->master->promoted_slave->addr->ip) == 0 &&
ri->slave_master_port == ri->master->promoted_slave->addr->port)
{
ri->flags &= ~SRI_RECONF_SENT;
ri->flags |= SRI_RECONF_INPROG;
sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
}

//等待中 -> slaveof完成
if ((ri->flags & SRI_RECONF_INPROG) &&
ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
{
ri->flags &= ~SRI_RECONF_INPROG;
ri->flags |= SRI_RECONF_DONE;
sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
}
}
}
sentinelInfoReplyCallback

info命令的回调

1
2
3
4
5
6
7
8
9
10
11
12
void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

if (r->type == REDIS_REPLY_STRING)
sentinelRefreshInstanceInfo(ri,r->str);
}
sentinelDiscardReplyCallback

忽略内容的回调

1
2
3
4
5
6
7
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
instanceLink *link = c->data;
UNUSED(reply);
UNUSED(privdata);

if (link) link->pending_commands--;
}
sentinelPingReplyCallback

ping命令的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

if (r->type == REDIS_REPLY_STATUS ||
r->type == REDIS_REPLY_ERROR) {

//有效的回复包括以下三种 正常、启动中、master断开
if (strncmp(r->str,"PONG",4) == 0 ||
strncmp(r->str,"LOADING",7) == 0 ||
strncmp(r->str,"MASTERDOWN",10) == 0)
{
link->last_avail_time = mstime();
link->act_ping_time = 0;
} else {
//busy下,发送命令,终止其脚本的执行
if (strncmp(r->str,"BUSY",4) == 0 &&
(ri->flags & SRI_S_DOWN) &&
!(ri->flags & SRI_SCRIPT_KILL_SENT))
{
if (redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri,
"%s KILL",
sentinelInstanceMapCommand(ri,"SCRIPT")) == C_OK)
{
ri->link->pending_commands++;
}
ri->flags |= SRI_SCRIPT_KILL_SENT;
}
}
}
link->last_pong_time = mstime();
}
sentinelPublishReplyCallback

发布消息的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

//成功的话,更新时间
if (r->type != REDIS_REPLY_ERROR)
ri->last_pub_time = mstime();
}
sentinelProcessHelloMessage

处理hello频道的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
void sentinelProcessHelloMessage(char *hello, int hello_len) {
// 消息格式
// 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
// 5=master_ip,6=master_port,7=master_config_epoch.

int numtokens, port, removed, master_port;
uint64_t current_epoch, master_config_epoch;
char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
sentinelRedisInstance *si, *master;

//根据数量校验消息
if (numtokens == 8) {
//确定这个master,是不是自己也监听了
master = sentinelGetMasterByName(token[4]);
if (!master) goto cleanup;

//检查该哨兵是不是已经在自己的拓扑结构中了
port = atoi(token[1]);
master_port = atoi(token[6]);
si = getSentinelRedisInstanceByAddrAndRunID(
master->sentinels,token[0],port,token[2]);
current_epoch = strtoull(token[3],NULL,10);
master_config_epoch = strtoull(token[7],NULL,10);

//不在的话
if (!si) {
//删除相同runid的哨兵,处理哨兵的ip发生变化的情况
removed = removeMatchingSentinelFromMaster(master,token[2]);
if (removed) {
sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
"%@ ip %s port %d for %s", token[0],port,token[2]);
} else {
//没有相同runid的其他哨兵,检查是否有相同ip,port的哨兵
sentinelRedisInstance *other =
getSentinelRedisInstanceByAddrAndRunID(
master->sentinels, token[0],port,NULL);

//找到的话,标识其端口为无效
if (other) {
sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
other->addr->port = 0; /* It means: invalid address. */
sentinelUpdateSentinelAddressInAllMasters(other);
}
}

//创建一个新的哨兵
si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
token[0],port,master->quorum,master);

if (si) {
if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
//填充runid,因为哨兵不能通过info命令填充,只能在这
si->runid = sdsnew(token[2]);

//共享这个哨兵的连接,在多个master之间
sentinelTryConnectionSharing(si);

//更新所有master的当前哨兵的地址
if (removed) sentinelUpdateSentinelAddressInAllMasters(si);

//重写哨兵配置
sentinelFlushConfig();
}
}

//更新自己的纪元,这个哨兵的纪元更高
if (current_epoch > sentinel.current_epoch) {
sentinel.current_epoch = current_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}

//更新监听master的信息,包括纪元&ip&端口
if (si && master->config_epoch < master_config_epoch) {
master->config_epoch = master_config_epoch;

//master信息有变化
if (master_port != master->addr->port ||
strcmp(master->addr->ip, token[5]))
{
sentinelAddr *old_addr;

sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
sentinelEvent(LL_WARNING,"+switch-master",
master,"%s %s %d %s %d",
master->name,
master->addr->ip, master->addr->port,
token[5], master_port);

old_addr = dupSentinelAddr(master->addr);

//重置master
sentinelResetMasterAndChangeAddress(master, token[5], master_port);
sentinelCallClientReconfScript(master,
SENTINEL_OBSERVER,"start",
old_addr,master->addr);
releaseSentinelAddr(old_addr);
}
}

if (si) si->last_hello_time = mstime();
}

cleanup:
sdsfreesplitres(token,numtokens);
}
sentinelReceiveHelloMessages

收到hello频道消息的回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
redisReply *r;
UNUSED(c);

if (!reply || !ri) return;
r = reply;

//更新活跃时间
ri->link->pc_last_activity = mstime();

//校验消息格式
if (r->type != REDIS_REPLY_ARRAY ||
r->elements != 3 ||
r->element[0]->type != REDIS_REPLY_STRING ||
r->element[1]->type != REDIS_REPLY_STRING ||
r->element[2]->type != REDIS_REPLY_STRING ||
strcmp(r->element[0]->str,"message") != 0) return;

//过滤自己发送的
if (strstr(r->element[2]->str,sentinel.myid) != NULL) return;

//处理hello信息
sentinelProcessHelloMessage(r->element[2]->str, r->element[2]->len);
}
sentinelSendHello

向hello频道发送自身的状况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int sentinelSendHello(sentinelRedisInstance *ri) {
char ip[NET_IP_STR_LEN];
char payload[NET_IP_STR_LEN+1024];
int retval;
char *announce_ip;
int announce_port;
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);

if (ri->link->disconnected) return C_ERR;

//声明的ip端口,本身的ip端口优先级选择
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
} else {
if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
return C_ERR;
announce_ip = ip;
}
announce_port = sentinel.announce_port ?
sentinel.announce_port : server.port;

//拼装hello消息
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," //当前哨兵的信息
"%s,%s,%d,%llu", //master的信息
announce_ip, announce_port, sentinel.myid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);

//发布消息
retval = redisAsyncCommand(ri->link->cc,
sentinelPublishReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"PUBLISH"),
SENTINEL_HELLO_CHANNEL,payload);
if (retval != C_OK) return C_ERR;
ri->link->pending_commands++;
return C_OK;
}
sentinelForceHelloUpdateDictOfRedisInstances

强制更新每一个节点的上一次发布消息时间,为了下一次扫描的时候一定会发送hello消息

1
2
3
4
5
6
7
8
9
10
11
12
void sentinelForceHelloUpdateDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;

di = dictGetSafeIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
if (ri->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
ri->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
}
dictReleaseIterator(di);
}
sentinelForceHelloUpdateForMaster

强制更新master及其下面的哨兵、从节点的发布时间

1
2
3
4
5
6
7
8
int sentinelForceHelloUpdateForMaster(sentinelRedisInstance *master) {
if (!(master->flags & SRI_MASTER)) return C_ERR;
if (master->last_pub_time >= (SENTINEL_PUBLISH_PERIOD+1))
master->last_pub_time -= (SENTINEL_PUBLISH_PERIOD+1);
sentinelForceHelloUpdateDictOfRedisInstances(master->sentinels);
sentinelForceHelloUpdateDictOfRedisInstances(master->slaves);
return C_OK;
}
sentinelSendPing

发送ping命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int sentinelSendPing(sentinelRedisInstance *ri) {
int retval = redisAsyncCommand(ri->link->cc,
sentinelPingReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"PING"));
if (retval == C_OK) {
ri->link->pending_commands++;
ri->link->last_ping_time = mstime();

//记录无效ping的开始时间,收到ping回复之后就归0了
if (ri->link->act_ping_time == 0)
ri->link->act_ping_time = ri->link->last_ping_time;
return 1;
} else {
return 0;
}
}
sentinelSendPeriodicCommands

哨兵的定时处理任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;

//连接断开
if (ri->link->disconnected) return;

//当前连接未回复的命令数量达到上限
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;

//对于从节点,如果处于主观下线或者故障修复中,发送info的频率改成1s间隔
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}

//发送ping的频率,正常是如果上一次收到ping回复的时间超过`down-after-milliseconds`设置,如果`down-after-milliseconds`超过1s的话,就每秒
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;

//发送info命令,只针对于master,slave
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"INFO"));
if (retval == C_OK) ri->link->pending_commands++;
}

//发送ping
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
sentinelSendPing(ri);
}

//发布hello消息
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
}

响应命令

sentinelFailoverStateStr

获取故障修复状态的描述

1
2
3
4
5
6
7
8
9
10
11
12
const char *sentinelFailoverStateStr(int state) {
switch(state) {
case SENTINEL_FAILOVER_STATE_NONE: return "none";
case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
default: return "unknown";
}
}
addReplySentinelRedisInstance

添加单个节点的各种信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
void addReplySentinelRedisInstance(client *c, sentinelRedisInstance *ri) {
char *flags = sdsempty();
void *mbl;
int fields = 0;

mbl = addDeferredMultiBulkLength(c);

addReplyBulkCString(c,"name");
addReplyBulkCString(c,ri->name);
fields++;

addReplyBulkCString(c,"ip");
addReplyBulkCString(c,ri->addr->ip);
fields++;

addReplyBulkCString(c,"port");
addReplyBulkLongLong(c,ri->addr->port);
fields++;

addReplyBulkCString(c,"runid");
addReplyBulkCString(c,ri->runid ? ri->runid : "");
fields++;

//标志位
addReplyBulkCString(c,"flags");
if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
if (ri->link->disconnected) flags = sdscat(flags,"disconnected,");
if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
flags = sdscat(flags,"failover_in_progress,");
if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");

//删除最后一个","
if (sdslen(flags) != 0) sdsrange(flags,0,-2);
addReplyBulkCString(c,flags);
sdsfree(flags);
fields++;

addReplyBulkCString(c,"link-pending-commands");
addReplyBulkLongLong(c,ri->link->pending_commands);
fields++;

addReplyBulkCString(c,"link-refcount");
addReplyBulkLongLong(c,ri->link->refcount);
fields++;

if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
addReplyBulkCString(c,"failover-state");
addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
fields++;
}

addReplyBulkCString(c,"last-ping-sent");
addReplyBulkLongLong(c,
ri->link->act_ping_time ? (mstime() - ri->link->act_ping_time) : 0);
fields++;

addReplyBulkCString(c,"last-ok-ping-reply");
addReplyBulkLongLong(c,mstime() - ri->link->last_avail_time);
fields++;

addReplyBulkCString(c,"last-ping-reply");
addReplyBulkLongLong(c,mstime() - ri->link->last_pong_time);
fields++;

if (ri->flags & SRI_S_DOWN) {
addReplyBulkCString(c,"s-down-time");
addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
fields++;
}

if (ri->flags & SRI_O_DOWN) {
addReplyBulkCString(c,"o-down-time");
addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
fields++;
}

addReplyBulkCString(c,"down-after-milliseconds");
addReplyBulkLongLong(c,ri->down_after_period);
fields++;

//master&salve才有的信息
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
addReplyBulkCString(c,"info-refresh");
addReplyBulkLongLong(c,mstime() - ri->info_refresh);
fields++;

addReplyBulkCString(c,"role-reported");
addReplyBulkCString(c, (ri->role_reported == SRI_MASTER) ? "master" :
"slave");
fields++;

addReplyBulkCString(c,"role-reported-time");
addReplyBulkLongLong(c,mstime() - ri->role_reported_time);
fields++;
}

//只有master有的信息
if (ri->flags & SRI_MASTER) {
addReplyBulkCString(c,"config-epoch");
addReplyBulkLongLong(c,ri->config_epoch);
fields++;

addReplyBulkCString(c,"num-slaves");
addReplyBulkLongLong(c,dictSize(ri->slaves));
fields++;

addReplyBulkCString(c,"num-other-sentinels");
addReplyBulkLongLong(c,dictSize(ri->sentinels));
fields++;

addReplyBulkCString(c,"quorum");
addReplyBulkLongLong(c,ri->quorum);
fields++;

addReplyBulkCString(c,"failover-timeout");
addReplyBulkLongLong(c,ri->failover_timeout);
fields++;

addReplyBulkCString(c,"parallel-syncs");
addReplyBulkLongLong(c,ri->parallel_syncs);
fields++;

if (ri->notification_script) {
addReplyBulkCString(c,"notification-script");
addReplyBulkCString(c,ri->notification_script);
fields++;
}

if (ri->client_reconfig_script) {
addReplyBulkCString(c,"client-reconfig-script");
addReplyBulkCString(c,ri->client_reconfig_script);
fields++;
}
}

//只有slave才有的信息
if (ri->flags & SRI_SLAVE) {
addReplyBulkCString(c,"master-link-down-time");
addReplyBulkLongLong(c,ri->master_link_down_time);
fields++;

addReplyBulkCString(c,"master-link-status");
addReplyBulkCString(c,
(ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
"ok" : "err");
fields++;

addReplyBulkCString(c,"master-host");
addReplyBulkCString(c,
ri->slave_master_host ? ri->slave_master_host : "?");
fields++;

addReplyBulkCString(c,"master-port");
addReplyBulkLongLong(c,ri->slave_master_port);
fields++;

addReplyBulkCString(c,"slave-priority");
addReplyBulkLongLong(c,ri->slave_priority);
fields++;

addReplyBulkCString(c,"slave-repl-offset");
addReplyBulkLongLong(c,ri->slave_repl_offset);
fields++;
}

//只有哨兵才有的信息
if (ri->flags & SRI_SENTINEL) {
addReplyBulkCString(c,"last-hello-message");
addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
fields++;

addReplyBulkCString(c,"voted-leader");
addReplyBulkCString(c,ri->leader ? ri->leader : "?");
fields++;

addReplyBulkCString(c,"voted-leader-epoch");
addReplyBulkLongLong(c,ri->leader_epoch);
fields++;
}

setDeferredMultiBulkLength(c,mbl,fields*2);
}
addReplyDictOfRedisInstances

批量添加节点的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
void addReplyDictOfRedisInstances(client *c, dict *instances) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(instances);
addReplyMultiBulkLen(c,dictSize(instances));
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

addReplySentinelRedisInstance(c,ri);
}
dictReleaseIterator(di);
}
sentinelGetMasterByNameOrReplyError

根据名字获取master节点信息

1
2
3
4
5
6
7
8
9
10
11
12
sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(client *c,
robj *name)
{
sentinelRedisInstance *ri;

ri = dictFetchValue(sentinel.masters,name->ptr);
if (!ri) {
addReplyError(c,"No such master with that name");
return NULL;
}
return ri;
}
sentinelIsQuorumReachable

哨兵的数量是否支持选举,要求状态良好的哨兵数量满足两个条件:1.达到设置的最低数量 2.在总哨兵数量的比例要>50%
usableptr 返回状态良好哨兵数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#define SENTINEL_ISQR_OK 0 //选举成功
#define SENTINEL_ISQR_NOQUORUM (1<<0) //可用哨兵不够规定的数量
#define SENTINEL_ISQR_NOAUTH (1<<1) //没有通过

int sentinelIsQuorumReachable(sentinelRedisInstance *master, int *usableptr) {
dictIterator *di;
dictEntry *de;
int usable = 1; //状态良好哨兵的数量
int result = SENTINEL_ISQR_OK;
int voters = dictSize(master->sentinels)+1; //所有参与投票选举的(已知的哨兵+自己)

di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

//排除不可用的哨兵
if (ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
usable++;
}
dictReleaseIterator(di);

//状态良好哨兵数量 比要求的最低票数还要低
if (usable < (int)master->quorum) result |= SENTINEL_ISQR_NOQUORUM;

//状态良好哨兵数量 低于总的票数的一半+1,比例必须要>总票数的50%
if (usable < voters/2+1) result |= SENTINEL_ISQR_NOAUTH;
if (usableptr) *usableptr = usable;
return result;
}
sentinelCommand

响应sentinel命令,运行中动态改变哨兵配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
void sentinelCommand(client *c) {
if (!strcasecmp(c->argv[1]->ptr,"masters")) {
//获取所有master的信息 SENTINEL MASTERS
if (c->argc != 2) goto numargserr;
addReplyDictOfRedisInstances(c,sentinel.masters);
} else if (!strcasecmp(c->argv[1]->ptr,"master")) {
//获取某个master的信息 SENTINEL MASTER <name>
sentinelRedisInstance *ri;

if (c->argc != 3) goto numargserr;
if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
== NULL) return;
addReplySentinelRedisInstance(c,ri);
} else if (!strcasecmp(c->argv[1]->ptr,"slaves") ||
!strcasecmp(c->argv[1]->ptr,"replicas"))
{
// 获取master下所有salve的信息 SENTINEL REPLICAS <master-name>
sentinelRedisInstance *ri;

if (c->argc != 3) goto numargserr;
if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
return;
addReplyDictOfRedisInstances(c,ri->slaves);
} else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
//获取监听该master的其他所有哨兵信息 SENTINEL SENTINELS <master-name>
sentinelRedisInstance *ri;

if (c->argc != 3) goto numargserr;
if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
return;
addReplyDictOfRedisInstances(c,ri->sentinels);
} else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
//获取 ip&port该master 是否已经下线
//current-epoch表示发送请求哨兵的纪元
//如果runid不为*,请求方希望我们为这个runid投票,为*的话则不要求
// SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>
sentinelRedisInstance *ri;
long long req_epoch;
uint64_t leader_epoch = 0;
char *leader = NULL;
long port;
int isdown = 0;

//校验参数数量
if (c->argc != 6) goto numargserr;

//转换端口&纪元
if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != C_OK ||
getLongLongFromObjectOrReply(c,c->argv[4],&req_epoch,NULL)
!= C_OK)
return;
ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
c->argv[2]->ptr,port,NULL);

//是否主观下线,tilt模式下返回0
if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
(ri->flags & SRI_MASTER))
isdown = 1;

//期望给runid投票
if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
c->argv[5]->ptr,
&leader_epoch);
}

addReplyMultiBulkLen(c,3);
addReply(c, isdown ? shared.cone : shared.czero);
addReplyBulkCString(c, leader ? leader : "*");
addReplyLongLong(c, (long long)leader_epoch);
if (leader) sdsfree(leader);
} else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
//重置master节点,根据正则匹配master SENTINEL RESET <pattern>
if (c->argc != 3) goto numargserr;
addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
} else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
//根绝名称获取master的ip&port信息 SENTINEL GET-MASTER-ADDR-BY-NAME <master-name>
sentinelRedisInstance *ri;

if (c->argc != 3) goto numargserr;
ri = sentinelGetMasterByName(c->argv[2]->ptr);
if (ri == NULL) {
addReply(c,shared.nullmultibulk);
} else {
sentinelAddr *addr = sentinelGetCurrentMasterAddress(ri);

addReplyMultiBulkLen(c,2);
addReplyBulkCString(c,addr->ip);
addReplyBulkLongLong(c,addr->port);
}
} else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
//强制开始故障修复 SENTINEL FAILOVER <master-name>
sentinelRedisInstance *ri;

if (c->argc != 3) goto numargserr;
if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
return;
if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
return;
}
if (sentinelSelectSlave(ri) == NULL) {
addReplySds(c,sdsnew("-NOGOODSLAVE No suitable replica to promote\r\n"));
return;
}
serverLog(LL_WARNING,"Executing user requested FAILOVER of '%s'",
ri->name);
sentinelStartFailover(ri);
ri->flags |= SRI_FORCE_FAILOVER;
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
//获取脚本队列里任务信息 SENTINEL PENDING-SCRIPTS

if (c->argc != 2) goto numargserr;
sentinelPendingScriptsCommand(c);
} else if (!strcasecmp(c->argv[1]->ptr,"monitor")) {
//监听新的master SENTINEL MONITOR <name> <ip> <port> <quorum>
sentinelRedisInstance *ri;
long quorum, port;
char ip[NET_IP_STR_LEN];

if (c->argc != 6) goto numargserr;
if (getLongFromObjectOrReply(c,c->argv[5],&quorum,"Invalid quorum")
!= C_OK) return;
if (getLongFromObjectOrReply(c,c->argv[4],&port,"Invalid port")
!= C_OK) return;

if (quorum <= 0) {
addReplyError(c, "Quorum must be 1 or greater.");
return;
}

//确保ip是对的
if (anetResolveIP(NULL,c->argv[3]->ptr,ip,sizeof(ip)) == ANET_ERR) {
addReplyError(c,"Invalid IP address specified");
return;
}

//创建master节点
ri = createSentinelRedisInstance(c->argv[2]->ptr,SRI_MASTER,
c->argv[3]->ptr,port,quorum,NULL);
if (ri == NULL) {
switch(errno) {
case EBUSY:
addReplyError(c,"Duplicated master name");
break;
case EINVAL:
addReplyError(c,"Invalid port number");
break;
default:
addReplyError(c,"Unspecified error adding the instance");
break;
}
} else {
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
addReply(c,shared.ok);
}
} else if (!strcasecmp(c->argv[1]->ptr,"flushconfig")) {
//重写哨兵配置
if (c->argc != 2) goto numargserr;
sentinelFlushConfig();
addReply(c,shared.ok);
return;
} else if (!strcasecmp(c->argv[1]->ptr,"remove")) {
//删除监听的master SENTINEL REMOVE <name>
sentinelRedisInstance *ri;

if (c->argc != 3) goto numargserr;
if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
== NULL) return;
sentinelEvent(LL_WARNING,"-monitor",ri,"%@");
dictDelete(sentinel.masters,c->argv[2]->ptr);
sentinelFlushConfig();
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"ckquorum")) {
//获取该master是否满足投票数量条件 SENTINEL CKQUORUM <name>
sentinelRedisInstance *ri;
int usable;

if (c->argc != 3) goto numargserr;
if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
== NULL) return;
int result = sentinelIsQuorumReachable(ri,&usable);
if (result == SENTINEL_ISQR_OK) {
addReplySds(c, sdscatfmt(sdsempty(),
"+OK %i usable Sentinels. Quorum and failover authorization "
"can be reached\r\n",usable));
} else {
sds e = sdscatfmt(sdsempty(),
"-NOQUORUM %i usable Sentinels. ",usable);
if (result & SENTINEL_ISQR_NOQUORUM)
e = sdscat(e,"Not enough available Sentinels to reach the"
" specified quorum for this master");
if (result & SENTINEL_ISQR_NOAUTH) {
if (result & SENTINEL_ISQR_NOQUORUM) e = sdscat(e,". ");
e = sdscat(e, "Not enough available Sentinels to reach the"
" majority and authorize a failover");
}
e = sdscat(e,"\r\n");
addReplySds(c,e);
}
} else if (!strcasecmp(c->argv[1]->ptr,"set")) {
//设置监听master对应的参数信息 SENTINEL SET
if (c->argc < 3) goto numargserr;
sentinelSetCommand(c);
} else if (!strcasecmp(c->argv[1]->ptr,"info-cache")) {
//批量获取master节点及其salve节点缓存的info命令信息 SENTINEL INFO-CACHE <name> ....
if (c->argc < 2) goto numargserr;
mstime_t now = mstime();

//根据名称查找对应的master节点
dictType copy_keeper = instancesDictType;
copy_keeper.valDestructor = NULL;
dict *masters_local = sentinel.masters;
if (c->argc > 2) {
masters_local = dictCreate(&copy_keeper, NULL);

for (int i = 2; i < c->argc; i++) {
sentinelRedisInstance *ri;
ri = sentinelGetMasterByName(c->argv[i]->ptr);
if (!ri) continue;
dictAdd(masters_local, ri->name, ri);
}
}

/* 回复格式:
* 1.) master name
* 2.) 1.) info from master
* 2.) info from replica
* ...
* 3.) other master name
* ...
*/
addReplyMultiBulkLen(c,dictSize(masters_local) * 2);

dictIterator *di;
dictEntry *de;
di = dictGetIterator(masters_local);
//master的info信息
while ((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
addReplyBulkCBuffer(c,ri->name,strlen(ri->name));
addReplyMultiBulkLen(c,dictSize(ri->slaves) + 1); /* +1 for self */
addReplyMultiBulkLen(c,2);
addReplyLongLong(c, now - ri->info_refresh);
if (ri->info)
addReplyBulkCBuffer(c,ri->info,sdslen(ri->info));
else
addReply(c,shared.nullbulk);

dictIterator *sdi;
dictEntry *sde;
sdi = dictGetIterator(ri->slaves);

//slave的info信息
while ((sde = dictNext(sdi)) != NULL) {
sentinelRedisInstance *sri = dictGetVal(sde);
addReplyMultiBulkLen(c,2);
addReplyLongLong(c, now - sri->info_refresh);
if (sri->info)
addReplyBulkCBuffer(c,sri->info,sdslen(sri->info));
else
addReply(c,shared.nullbulk);
}
dictReleaseIterator(sdi);
}
dictReleaseIterator(di);
if (masters_local != sentinel.masters) dictRelease(masters_local);
} else if (!strcasecmp(c->argv[1]->ptr,"simulate-failure")) {
//设置哨兵的标志 SENTINEL SIMULATE-FAILURE <flag> <flag> ... <flag>
int j;

sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
for (j = 2; j < c->argc; j++) {
if (!strcasecmp(c->argv[j]->ptr,"crash-after-election")) {
sentinel.simfailure_flags |=
SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION;
serverLog(LL_WARNING,"Failure simulation: this Sentinel "
"will crash after being successfully elected as failover "
"leader");
} else if (!strcasecmp(c->argv[j]->ptr,"crash-after-promotion")) {
sentinel.simfailure_flags |=
SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION;
serverLog(LL_WARNING,"Failure simulation: this Sentinel "
"will crash after promoting the selected replica to master");
} else if (!strcasecmp(c->argv[j]->ptr,"help")) {
addReplyMultiBulkLen(c,2);
addReplyBulkCString(c,"crash-after-election");
addReplyBulkCString(c,"crash-after-promotion");
} else {
addReplyError(c,"Unknown failure simulation specified");
return;
}
}
addReply(c,shared.ok);
} else {
addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
(char*)c->argv[1]->ptr);
}
return;

numargserr:
addReplyErrorFormat(c,"Wrong number of arguments for 'sentinel %s'",
(char*)c->argv[1]->ptr);
}
sentinelInfoCommand

获取哨兵信息,包括server、client、cpu、stats等信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#define info_section_from_redis(section_name) do { \
if (defsections || allsections || !strcasecmp(section,section_name)) { \
sds redissection; \
if (sections++) info = sdscat(info,"\r\n"); \
redissection = genRedisInfoString(section_name); \
info = sdscatlen(info,redissection,sdslen(redissection)); \
sdsfree(redissection); \
} \
} while(0)

// SENTINEL INFO [section] section指server、client、cpu、stats
void sentinelInfoCommand(client *c) {
if (c->argc > 2) {
addReply(c,shared.syntaxerr);
return;
}

int defsections = 0, allsections = 0;
char *section = c->argc == 2 ? c->argv[1]->ptr : NULL;
if (section) {
allsections = !strcasecmp(section,"all");
defsections = !strcasecmp(section,"default");
} else {
defsections = 1;
}

int sections = 0;
sds info = sdsempty();

info_section_from_redis("server");
info_section_from_redis("clients");
info_section_from_redis("cpu");
info_section_from_redis("stats");

if (defsections || allsections || !strcasecmp(section,"sentinel")) {
dictIterator *di;
dictEntry *de;
int master_id = 0;

if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
"# Sentinel\r\n"
"sentinel_masters:%lu\r\n"
"sentinel_tilt:%d\r\n"
"sentinel_running_scripts:%d\r\n"
"sentinel_scripts_queue_length:%ld\r\n"
"sentinel_simulate_failure_flags:%lu\r\n",
dictSize(sentinel.masters),
sentinel.tilt,
sentinel.running_scripts,
listLength(sentinel.scripts_queue),
sentinel.simfailure_flags);

di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
char *status = "ok";

if (ri->flags & SRI_O_DOWN) status = "odown";
else if (ri->flags & SRI_S_DOWN) status = "sdown";
info = sdscatprintf(info,
"master%d:name=%s,status=%s,address=%s:%d,"
"slaves=%lu,sentinels=%lu\r\n",
master_id++, ri->name, status,
ri->addr->ip, ri->addr->port,
dictSize(ri->slaves),
dictSize(ri->sentinels)+1);
}
dictReleaseIterator(di);
}

addReplyBulkSds(c, info);
}
sentinelRoleCommand

响应role命令,返回sentinel以及监听的所有的master名字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void sentinelRoleCommand(client *c) {
dictIterator *di;
dictEntry *de;

addReplyMultiBulkLen(c,2);
addReplyBulkCBuffer(c,"sentinel",8);
addReplyMultiBulkLen(c,dictSize(sentinel.masters));

//所有master名字
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

addReplyBulkCString(c,ri->name);
}
dictReleaseIterator(di);
}
sentinelSetCommand

响应sentinel set命令,更新监听master的哨兵配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
void sentinelSetCommand(client *c) {
sentinelRedisInstance *ri;
int j, changes = 0;
int badarg = 0; //出问题参数索引
char *option;

//查找master
if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2]))
== NULL) return;

//处理选项
for (j = 3; j < c->argc; j++) {
int moreargs = (c->argc-1) - j;
option = c->argv[j]->ptr;
long long ll;
int old_j = j; /* Used to know what to log as an event. */

//认定断线所需要时长
if (!strcasecmp(option,"down-after-milliseconds") && moreargs > 0) {
/* down-after-millisecodns <milliseconds> */
robj *o = c->argv[++j];
if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
badarg = j;
goto badfmt;
}
ri->down_after_period = ll;
sentinelPropagateDownAfterPeriod(ri);
changes++;
} else if (!strcasecmp(option,"failover-timeout") && moreargs > 0) {
//故障修复超时时间 failover-timeout <milliseconds>
robj *o = c->argv[++j];
if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
badarg = j;
goto badfmt;
}
ri->failover_timeout = ll;
changes++;
} else if (!strcasecmp(option,"parallel-syncs") && moreargs > 0) {
//确定salve同步的最大并发数 parallel-syncs <milliseconds>
robj *o = c->argv[++j];
if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
badarg = j;
goto badfmt;
}
ri->parallel_syncs = ll;
changes++;
} else if (!strcasecmp(option,"notification-script") && moreargs > 0) {
//通知脚本 notification-script <path>
char *value = c->argv[++j]->ptr;
if (sentinel.deny_scripts_reconfig) {
addReplyError(c,
"Reconfiguration of scripts path is denied for "
"security reasons. Check the deny-scripts-reconfig "
"configuration directive in your Sentinel configuration");
return;
}

if (strlen(value) && access(value,X_OK) == -1) {
addReplyError(c,
"Notification script seems non existing or non executable");
if (changes) sentinelFlushConfig();
return;
}
sdsfree(ri->notification_script);
ri->notification_script = strlen(value) ? sdsnew(value) : NULL;
changes++;
} else if (!strcasecmp(option,"client-reconfig-script") && moreargs > 0) {
//更改配置脚本 client-reconfig-script <path>
char *value = c->argv[++j]->ptr;
if (sentinel.deny_scripts_reconfig) {
addReplyError(c,
"Reconfiguration of scripts path is denied for "
"security reasons. Check the deny-scripts-reconfig "
"configuration directive in your Sentinel configuration");
return;
}

if (strlen(value) && access(value,X_OK) == -1) {
addReplyError(c,
"Client reconfiguration script seems non existing or "
"non executable");
if (changes) sentinelFlushConfig();
return;
}
sdsfree(ri->client_reconfig_script);
ri->client_reconfig_script = strlen(value) ? sdsnew(value) : NULL;
changes++;
} else if (!strcasecmp(option,"auth-pass") && moreargs > 0) {
//密码 auth-pass <password>
char *value = c->argv[++j]->ptr;
sdsfree(ri->auth_pass);
ri->auth_pass = strlen(value) ? sdsnew(value) : NULL;
changes++;
} else if (!strcasecmp(option,"quorum") && moreargs > 0) {
//最小票数 quorum <count>
robj *o = c->argv[++j];
if (getLongLongFromObject(o,&ll) == C_ERR || ll <= 0) {
badarg = j;
goto badfmt;
}
ri->quorum = ll;
changes++;
} else if (!strcasecmp(option,"rename-command") && moreargs > 1) {
//重命名命令 rename-command <oldname> <newname>
sds oldname = c->argv[++j]->ptr;
sds newname = c->argv[++j]->ptr;

if ((sdslen(oldname) == 0) || (sdslen(newname) == 0)) {
badarg = sdslen(newname) ? j-1 : j;
goto badfmt;
}

//删除旧的
dictDelete(ri->renamed_commands,oldname);

//添加新的
if (!dictSdsKeyCaseCompare(NULL,oldname,newname)) {
oldname = sdsdup(oldname);
newname = sdsdup(newname);
dictAdd(ri->renamed_commands,oldname,newname);
}
changes++;
} else {
addReplyErrorFormat(c,"Unknown option or number of arguments for "
"SENTINEL SET '%s'", option);
if (changes) sentinelFlushConfig();
return;
}

//记录日志
int numargs = j-old_j+1;
switch(numargs) {
case 2:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s",c->argv[old_j]->ptr,
c->argv[old_j+1]->ptr);
break;
case 3:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s %s %s",c->argv[old_j]->ptr,
c->argv[old_j+1]->ptr,
c->argv[old_j+2]->ptr);
break;
default:
sentinelEvent(LL_WARNING,"+set",ri,"%@ %s",c->argv[old_j]->ptr);
break;
}
}

if (changes) sentinelFlushConfig();
addReply(c,shared.ok);
return;

//格式错误
badfmt:
if (changes) sentinelFlushConfig();
addReplyErrorFormat(c,"Invalid argument '%s' for SENTINEL SET '%s'",
(char*)c->argv[badarg]->ptr,option);
}
sentinelPublishCommand

响应订阅命令,只有hello频道才有用

1
2
3
4
5
6
7
8
void sentinelPublishCommand(client *c) {
if (strcmp(c->argv[1]->ptr,SENTINEL_HELLO_CHANNEL)) {
addReplyError(c, "Only HELLO messages are accepted by Sentinel instances.");
return;
}
sentinelProcessHelloMessage(c->argv[2]->ptr,sdslen(c->argv[2]->ptr));
addReplyLongLong(c,1);
}

判断校验

sentinelCheckSubjectivelyDown

判断节点是否为主观下线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
mstime_t elapsed = 0;

if (ri->link->act_ping_time)
elapsed = mstime() - ri->link->act_ping_time;
else if (ri->link->disconnected)
elapsed = mstime() - ri->link->last_avail_time;

//建立连接后,但是在规定的时间内,没有收到ping的回复
if (ri->link->cc &&
(mstime() - ri->link->cc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
ri->link->act_ping_time != 0 &&
(mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) &&
(mstime() - ri->link->last_pong_time) > (ri->down_after_period/2))
{
instanceLinkCloseConnection(ri->link,ri->link->cc);
}

//发布订阅连接建立,在3倍的时间内没有收到有效数据
if (ri->link->pc &&
(mstime() - ri->link->pc_conn_time) >
SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{
instanceLinkCloseConnection(ri->link,ri->link->pc);
}

//主观下线条件2选一
//1.在N久内未收到有效回复
//2.节点从master变更到slave在一段时间内(下线时间+2倍的info频率)
if (elapsed > ri->down_after_period ||
(ri->flags & SRI_MASTER &&
ri->role_reported == SRI_SLAVE &&
mstime() - ri->role_reported_time >
(ri->down_after_period+SENTINEL_INFO_PERIOD*2)))
{
//现在不是主观下线
if ((ri->flags & SRI_S_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
ri->s_down_since_time = mstime();
ri->flags |= SRI_S_DOWN;
}
} else {
//从下线恢复正常
if (ri->flags & SRI_S_DOWN) {
sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
}
}
}
sentinelCheckObjectivelyDown

检查是否客观下线,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
unsigned int quorum = 0, odown = 0;

//先判断主观下线
if (master->flags & SRI_S_DOWN) {
quorum = 1; //自己先投一票
di = dictGetIterator(master->sentinels);

//查看其它哨兵的情况
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

if (ri->flags & SRI_MASTER_DOWN) quorum++;
}
dictReleaseIterator(di);

//数量够了,断定为客观下线
if (quorum >= master->quorum) odown = 1;
}

if (odown) {
//第一次客观下线
if ((master->flags & SRI_O_DOWN) == 0) {
sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
quorum, master->quorum);
master->flags |= SRI_O_DOWN;
master->o_down_since_time = mstime();
}
} else {
//把客观下线的标志拿掉
if (master->flags & SRI_O_DOWN) {
sentinelEvent(LL_WARNING,"-odown",master,"%@");
master->flags &= ~SRI_O_DOWN;
}
}
}
sentinelReceiveIsMasterDownReply

处理询问其它哨兵master是否挂掉的回调,SENTINEL is-master-down-by-addr

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = privdata;
instanceLink *link = c->data;
redisReply *r;

if (!reply || !link) return;
link->pending_commands--;
r = reply;

//校验数据格式
if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
r->element[0]->type == REDIS_REPLY_INTEGER &&
r->element[1]->type == REDIS_REPLY_STRING &&
r->element[2]->type == REDIS_REPLY_INTEGER)
{
//记录回复时间
ri->last_master_down_reply_time = mstime();

if (r->element[0]->integer == 1) {
//该哨兵也认为下线了
ri->flags |= SRI_MASTER_DOWN;
} else {
//没有下线
ri->flags &= ~SRI_MASTER_DOWN;
}
if (strcmp(r->element[1]->str,"*")) {
//如果runid这一列不是*的话,表明该节点已经为某个哨兵投过票了,把其投票的纪元和选举的结果存下来
sdsfree(ri->leader);
if ((long long)ri->leader_epoch != r->element[2]->integer)
serverLog(LL_WARNING,
"%s voted for %s %llu", ri->name,
r->element[1]->str,
(unsigned long long) r->element[2]->integer);

//投票的结果
ri->leader = sdsnew(r->element[1]->str);
//投票当时所在的纪元
ri->leader_epoch = r->element[2]->integer;
}
}
}
sentinelAskMasterStateToOtherSentinels

向其他所有哨兵询问master是否挂了,为了尽快的得到回复,选举出leader然后开始故障修复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#define SENTINEL_ASK_FORCED (1<<0) //是否强制

void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
dictIterator *di;
dictEntry *de;

//依次遍历监听该master的其他哨兵
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
mstime_t elapsed = mstime() - ri->last_master_down_reply_time; //上一次该命令回复到现在的时间间隔
char port[32];
int retval;

//如果上一次到现在太久了,认为结果不可靠,清除掉其标记和选举的leader
if (elapsed > SENTINEL_ASK_PERIOD*5) {
ri->flags &= ~SRI_MASTER_DOWN;
sdsfree(ri->leader);
ri->leader = NULL;
}

//询问其他哨兵的时候,有一下3个条件
//1.我们认为这个master已经挂了
//2.和这个哨兵处于连接中
//3.正常情况下,在SENTINEL_ASK_PERIOD时间内没有收到回复
if ((master->flags & SRI_S_DOWN) == 0) continue;
if (ri->link->disconnected) continue;
if (!(flags & SENTINEL_ASK_FORCED) &&
mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
continue;

//发送sentinel is-master-down-by-addr
ll2string(port,sizeof(port),master->addr->port);
retval = redisAsyncCommand(ri->link->cc,
sentinelReceiveIsMasterDownReply, ri,
"%s is-master-down-by-addr %s %s %llu %s",
sentinelInstanceMapCommand(ri,"SENTINEL"),
master->addr->ip, port,
sentinel.current_epoch,
(master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
sentinel.myid : "*");
if (retval == C_OK) ri->link->pending_commands++;
}
dictReleaseIterator(di);
}

故障修复

sentinelSimFailureCrash

故障修复失败模式执行时的崩溃

1
2
3
4
5
void sentinelSimFailureCrash(void) {
serverLog(LL_WARNING,
"Sentinel CRASH because of SENTINEL simulate-failure");
exit(99);
}
sentinelVoteLeader

在某个纪元下投指定的leader一票,返回现在的leader和leader所在的纪元

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
//如果投票所在的纪元比当前哨兵的纪元大,则更新自己的纪元
if (req_epoch > sentinel.current_epoch) {
sentinel.current_epoch = req_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}

//master现在选举出的leader的纪元是老的,在新的纪元下,需要重新选
if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
{
sdsfree(master->leader);

//设置新的leader和选举出该leader所在的纪元
master->leader = sdsnew(req_runid);
master->leader_epoch = sentinel.current_epoch;

//持久化哨兵配置
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+vote-for-leader",master,"%s %llu",
master->leader, (unsigned long long) master->leader_epoch);

//如果不是投自己的话,设置故障修复开始时间为现在+一段时间的延迟
if (strcasecmp(master->leader,sentinel.myid))
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
}

//返回当前master的leader所在的纪元
*leader_epoch = master->leader_epoch;
return master->leader ? sdsnew(master->leader) : NULL;
}
sentinelLeaderIncr

对某个leader+1计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct sentinelLeader {
char *runid;
unsigned long votes;
};

int sentinelLeaderIncr(dict *counters, char *runid) {
dictEntry *existing, *de;
uint64_t oldval;

//查找leader是否存在
de = dictAddRaw(counters,runid,&existing);
if (existing) {
//存在+1
oldval = dictGetUnsignedIntegerVal(existing);
dictSetUnsignedIntegerVal(existing,oldval+1);
return oldval+1;
} else {
//不存在的话,设置成1
serverAssert(de != NULL);
dictSetUnsignedIntegerVal(de,1);
return 1;
}
}
sentinelGetLeader

在某个纪元下,获取leader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
dict *counters;
dictIterator *di;
dictEntry *de;
unsigned int voters = 0, voters_quorum;
char *myvote;
char *winner = NULL;
uint64_t leader_epoch;
uint64_t max_votes = 0;

serverAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
counters = dictCreate(&leaderVotesDictType,NULL);

//投票的所有人(+1表示自己)
voters = dictSize(master->sentinels)+1;

//统计其他哨兵的投票结果
di = dictGetIterator(master->sentinels);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

//该哨兵节点已经选出leader 并且其所在的纪元和当前哨兵纪元一样
if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
sentinelLeaderIncr(counters,ri->leader);
}
dictReleaseIterator(di);

//唱票阶段,获取票数最高的
di = dictGetIterator(counters);
while((de = dictNext(di)) != NULL) {
uint64_t votes = dictGetUnsignedIntegerVal(de);

if (votes > max_votes) {
max_votes = votes;
winner = dictGetKey(de);
}
}
dictReleaseIterator(di);

//基于最高票数的结果,决定自己要投给谁
//如果已经有其他人投票了,那么自己也投给那个人,否则投自己
if (winner)
myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
else
myvote = sentinelVoteLeader(master,epoch,sentinel.myid,&leader_epoch);

//如果这次所在的纪元和之前(包括这次)选举的纪元一致,表明自己投成功了
if (myvote && leader_epoch == epoch) {
//加上自己的一票
uint64_t votes = sentinelLeaderIncr(counters,myvote);

if (votes > max_votes) {
max_votes = votes;
winner = myvote;
}
}

//确定得票最高者 是否满足成为leader的条件
//1. 要大于总投票人数的一半(向下取整)+1
//2. 要大于等于设置的最低票数
voters_quorum = voters/2+1;
if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
winner = NULL;

winner = winner ? sdsnew(winner) : NULL;
sdsfree(myvote);
dictRelease(counters);
return winner;
}
sentinelSendSlaveOf

向节点发送slaveof命令,更改其master指向,如果host为NULL,表示执行slaveof no one。发送该命令,永远都会跟随一个rewrite config命令,让节点更新配置到磁盘上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
char portstr[32];
int retval;

ll2string(portstr,sizeof(portstr),port);

//host为NULL的情况
if (host == NULL) {
host = "NO";
memcpy(portstr,"ONE",4);
}

//多命令
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

//slaveof 命令
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"SLAVEOF"),
host, portstr);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

//config rewrite 更新配置
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s REWRITE",
sentinelInstanceMapCommand(ri,"CONFIG"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

//client kill type normal 断开该客户端的所有连接
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s KILL TYPE normal",
sentinelInstanceMapCommand(ri,"CLIENT"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

//事务结束
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

return C_OK;
}
sentinelStartFailover

开始故障修复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER);

//故障修复状态为等待开始
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;

//master标记成故障修复中
master->flags |= SRI_FAILOVER_IN_PROGRESS;

//纪元+1,在新的纪元下开始整个流程
master->failover_epoch = ++sentinel.current_epoch;
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
sentinelEvent(LL_WARNING,"+try-failover",master,"%@");

//记录时间
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime();
}
sentinelStartFailoverIfNeeded

如果条件满足的话,开始故障修复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
//只有master为客观下线才可以
if (!(master->flags & SRI_O_DOWN)) return 0;

//已经在故障处理中了
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;

//距离上一次开始还没有多久(两倍的故障修复超时时间)
if (mstime() - master->failover_start_time <
master->failover_timeout*2)
{
if (master->failover_delay_logged != master->failover_start_time) {
time_t clock = (master->failover_start_time +
master->failover_timeout*2) / 1000;
char ctimebuf[26];

ctime_r(&clock,ctimebuf);
ctimebuf[24] = '\0'; /* Remove newline. */

//记录延迟
master->failover_delay_logged = master->failover_start_time;
serverLog(LL_WARNING,
"Next failover delay: I will not start a failover before %s",
ctimebuf);
}
return 0;
}

//距离上一次过去很久了,开始一次新的
sentinelStartFailover(master);
return 1;
}
compareSlavesForPromotion

挑选成为新master的salve快排比较方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int compareSlavesForPromotion(const void *a, const void *b) {
sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
**sb = (sentinelRedisInstance **)b;
char *sa_runid, *sb_runid;

//优先级,优先级低的优先
if ((*sa)->slave_priority != (*sb)->slave_priority)
return (*sa)->slave_priority - (*sb)->slave_priority;

//从master同步的偏移量,同步多的优先
if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
return -1; /* a < b */
} else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
return 1; /* a > b */
}

//比较运行id,这个只是兜底方案,无实际意义
sa_runid = (*sa)->runid;
sb_runid = (*sb)->runid;
if (sa_runid == NULL && sb_runid == NULL) return 0;
else if (sa_runid == NULL) return 1; /* a > b */
else if (sb_runid == NULL) return -1; /* a < b */
return strcasecmp(sa_runid, sb_runid);
}
sentinelSelectSlave

从一堆salve中选出新的master,能进入预选的slave需要满足以下条件

  1. 情况正常,没有被判定为主观下线或者客观下线
  2. 连接正常
  3. 距离上一次有效ping的时间,要小于5倍的ping频率(1s)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;

//master挂掉的最大时间
if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time += master->down_after_period * 10;

//遍历master下的所有slave
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;

//客观&主观下线,忽略
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;

//连接断开了,忽略
if (slave->link->disconnected) continue;

//距离上一次有效的ping时间点,大于5倍的PING频率(1s),忽略
if (mstime() - slave->link->last_avail_time > SENTINEL_PING_PERIOD*5) continue;

//优先级为0,忽略
if (slave->slave_priority == 0) continue;

//master主观下线情况下,发送给salve节点的info命令是1s/次,否则是正常的频率,针对两种场景,其info命令返回结果的超时时间也需要有所不同
if (master->flags & SRI_S_DOWN)
info_validity_time = SENTINEL_PING_PERIOD*5;
else
info_validity_time = SENTINEL_INFO_PERIOD*3;

//info过期
if (mstime() - slave->info_refresh > info_validity_time) continue;

//slave和master断开太久
if (slave->master_link_down_time > max_master_down_time) continue;
instance[instances++] = slave;
}
dictReleaseIterator(di);

//从满足条件的salves中挑选出一个
if (instances) {
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
zfree(instance);
return selected;
}

故障修复各个节点处理

sentinelFailoverWaitStart

等待开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
char *leader;
int isleader;

//检查leader是否是自己
leader = sentinelGetLeader(ri, ri->failover_epoch);
isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
sdsfree(leader);

//正常情况下没有选举出leader
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
int election_timeout = SENTINEL_ELECTION_TIMEOUT;

//选举的超时时间不能大于整个流程的超时时间
if (election_timeout > ri->failover_timeout)
election_timeout = ri->failover_timeout;

//如果选举超时,在一定时间内,没有选出一个符合条件的
if (mstime() - ri->failover_start_time > election_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-not-elected",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}

//选举有结果
sentinelEvent(LL_WARNING,"+elected-leader",ri,"%@");

//选举成功后退出
if (sentinel.simfailure_flags & SENTINEL_SIMFAILURE_CRASH_AFTER_ELECTION)
sentinelSimFailureCrash();

//选出了leader,接下来要需要选举新的master
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}
sentinelFailoverSelectSlave

从slaves中确定新的master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
sentinelRedisInstance *slave = sentinelSelectSlave(ri);

//没有好的slave
if (slave == NULL) {
sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
sentinelAbortFailover(ri);
} else {
//成功挑出来,打上标记
sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
slave->flags |= SRI_PROMOTED;
ri->promoted_slave = slave;

//接下来需要更新其他salve的master配置
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
slave, "%@");
}
}
sentinelFailoverSendSlaveOfNoOne

向选中的slave节点发送slaveof no one命令,使其成为master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;

//如果之前选举出的salve断开连接了
if (ri->promoted_slave->link->disconnected) {
//过了好久(失败的超时时间)都没有连上,这一次整体就失败了,回归初始状态
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}

//等待重新
return;
}

//发送slave no one
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL,0);
if (retval != C_OK) return;
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");

//接下来等待其晋升成为master
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}
sentinelFailoverWaitPromotion

等待选中的salve晋升

1
2
3
4
5
6
7
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
//判断在该阶段的超时,在接收到info命令的时候,如果汇报的角色是master的话,会流转到下一个状态的
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}
sentinelFailoverDetectEnd

故障修复的收尾

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
int not_reconfigured = 0, timeout = 0;
dictIterator *di;
dictEntry *de;
mstime_t elapsed = mstime() - master->failover_state_change_time;

//选举出来的slave为空或者处于主观下线中,这种情况下不能认为整个流程结束了
if (master->promoted_slave == NULL ||
master->promoted_slave->flags & SRI_S_DOWN) return;

//检查所有的salve是否都完成配置更新了
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);

if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
if (slave->flags & SRI_S_DOWN) continue;
not_reconfigured++;
}
dictReleaseIterator(di);

//超时
if (elapsed > master->failover_timeout) {
not_reconfigured = 0;
timeout = 1;
sentinelEvent(LL_WARNING,"+failover-end-for-timeout",master,"%@");
}

//slave都更新完配置了
if (not_reconfigured == 0) {
sentinelEvent(LL_WARNING,"+failover-end",master,"%@");
master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
master->failover_state_change_time = mstime();
}

//超时,恢复所有slave的master为之前的
if (timeout) {
dictIterator *di;
dictEntry *de;

di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;

if (slave->flags & (SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
if (slave->link->disconnected) continue;

retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
slave->flags |= SRI_RECONF_SENT;
}
}
dictReleaseIterator(di);
}
}
sentinelFailoverReconfNextSlave

分批更新salve的master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
dictIterator *di;
dictEntry *de;
int in_progress = 0;

//统计进行中的数量
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);

if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
in_progress++;
}
dictReleaseIterator(di);

//还没有达到并发上限
di = dictGetIterator(master->slaves);
while(in_progress < master->parallel_syncs &&
(de = dictNext(di)) != NULL)
{
sentinelRedisInstance *slave = dictGetVal(de);
int retval;

//跳过被选中成为maste的 以及 更新完的
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;

//更新配置超时,强制改为配置ok
if ((slave->flags & SRI_RECONF_SENT) &&
(mstime() - slave->slave_reconf_sent_time) >
SENTINEL_SLAVE_RECONF_TIMEOUT)
{
sentinelEvent(LL_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
slave->flags &= ~SRI_RECONF_SENT;
slave->flags |= SRI_RECONF_DONE;
}

//已经发送,等待回来
if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) continue;
if (slave->link->disconnected) continue;

//发送slave ip port
retval = sentinelSendSlaveOf(slave,
master->promoted_slave->addr->ip,
master->promoted_slave->addr->port);
if (retval == C_OK) {
slave->flags |= SRI_RECONF_SENT;
slave->slave_reconf_sent_time = mstime();
sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
in_progress++;
}
}
dictReleaseIterator(di);

//检查是否都ok了&超时处理
sentinelFailoverDetectEnd(master);
}
sentinelFailoverSwitchToPromotedSlave

切换master节点信息

1
2
3
4
5
6
7
8
9
10
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
sentinelRedisInstance *ref = master->promoted_slave ?
master->promoted_slave : master;

sentinelEvent(LL_WARNING,"+switch-master",master,"%s %s %d %s %d",
master->name, master->addr->ip, master->addr->port,
ref->addr->ip, ref->addr->port);

sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
}
sentinelFailoverStateMachine

根据故障修复进度执行对应的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);

//正常的
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;

switch(ri->failover_state) {
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}
sentinelAbortFailover

故障修复失败处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void sentinelAbortFailover(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
serverAssert(ri->failover_state <= SENTINEL_FAILOVER_STATE_WAIT_PROMOTION);

//剔除故障修复相关标记
ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_FORCE_FAILOVER);

//恢复故障修复状态为初始化
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = mstime();

//恢复被选中的slave
if (ri->promoted_slave) {
ri->promoted_slave->flags &= ~SRI_PROMOTED;
ri->promoted_slave = NULL;
}
}

定时任务

sentinelHandleRedisInstance

每一个节点的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {

//重连
sentinelReconnectInstance(ri);

//心跳检测,ping、info
sentinelSendPeriodicCommands(ri);

//结束tilt模式
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}

//检查主观下线
sentinelCheckSubjectivelyDown(ri);

/* Masters and slaves */
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
/* Nothing so far. */
}

//只有master才有的操作
if (ri->flags & SRI_MASTER) {
//检查客观下线
sentinelCheckObjectivelyDown(ri);

//尝试开启故障修复
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);

//故障修复各个阶段的处理
sentinelFailoverStateMachine(ri);

//正常询问其他哨兵对该master的看法
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
sentinelHandleDictOfRedisInstances

处理节点集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;

/* There are a number of things we need to perform against every master. */
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);

sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}

//替换master节点信息
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
sentinelCheckTiltCondition

检查tilt模式

1
2
3
4
5
6
7
8
9
10
11
void sentinelCheckTiltCondition(void) {
mstime_t now = mstime();
mstime_t delta = now - sentinel.previous_time;

if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
sentinel.tilt = 1;
sentinel.tilt_start_time = mstime();
sentinelEvent(LL_WARNING,"+tilt",NULL,"#tilt mode entered");
}
sentinel.previous_time = mstime();
}
sentinelTimer

定时任务入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void sentinelTimer(void) {
//检查tilt模式
sentinelCheckTiltCondition();

//处理所有的master及其相关的salve和哨兵
sentinelHandleDictOfRedisInstances(sentinel.masters);

//执行脚本
sentinelRunPendingScripts();

//处理脚本执行结束后续
sentinelCollectTerminatedScripts();

//处理执行超时的脚本
sentinelKillTimedoutScripts();

//更新server执行的频率
server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}