redis-主从

为提升redis的性能,从实例的数量上来解决这个问题,可以使用redis的主从同步功能。可以在启动前预先配置拓扑结构,也可以在运行中动态改变拓扑结构。redis主从同步的命令主要有两个sync和psync。psync是sync的升级版本,具有断开续传的特点,解决了sync只能全量更新的问题。

同步场景

redis同步主要分为三种场景

  1. master和slave已经连接上:master会向slave发送一些列的命令流,当master有数据变更时,包括处理客户端的写入、过期和淘汰场景
  2. master和slave断开重连后:slave会尝试部分同步,获取从断开连接后的数据
  3. 无法进行部分同步:slave会请求全量更新,然后就会变成情况一

master和slave构成的拓扑结构,可以是多层的,即slave下也可以有多个slave,为了区分,称之为sub-slave。sub-slave收到的数据都是从最顶层的master传输下来的数据。

同步原理

每一个master有以下三个要素

  1. 身份标识replid: 这是40字节长度的随机字符串
  2. 同步数据缓冲区: 默认是1M的内存块,使用的时候作为环形使用。master会把要同步的数据做两步处理,一是发送给从节点,二是放到数据缓冲区,以便断开重连的slave能找到历史数据
  3. 偏移量: 标识自己产生了多少字节的同步数据,是缓冲区数据总的数据长度。客户端发起部分重传时,要求的偏移量必须比这个小

相关命令有slaveofreplicaof

slaveof/replicaof no one //停止同步,当前实例变成master
slaveof/replicaof hostname port //更改当前节点的master,如果节点已经是某个master的从节点,会舍弃当前的数据

slave本意为奴隶,是一个敏感词汇,让人联想到奴隶制相关的。redis作者迫于大量的批评压力,被迫新增一个别名replica,用replicaof逐渐的替换slaveof

数据结构

redisServer结构体中,涉及到主从同步的属性如下所述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
struct redisServer {
//主节点
char replid[CONFIG_RUN_ID_SIZE+1]; //身份标识id
char replid2[CONFIG_RUN_ID_SIZE+1]; //身份标识id2,当发生断开、热加入master时,replid会发生变化,为了从节点能继续获取部分数据,存储replid
long long master_repl_offset; //偏移量
long long second_replid_offset; //和replid2一样的场景
int slaveseldb; //上一次同步时,数据所在的db索引,可以一定程度避免每次都传输select db
int repl_ping_slave_period; //主ping从的频率,用来保持心跳
char *repl_backlog; //同步数据缓冲区(环形使用)
long long repl_backlog_size; //同步数据缓冲区大小
long long repl_backlog_histlen; //缓冲区数据实际长度,最多是一个环形的大小repl_backlog_size,当数据写满之后
long long repl_backlog_idx; //在数据缓冲区内,写数据的位置
long long repl_backlog_off; //在缓冲区内,能找到最早数据的位置
time_t repl_backlog_time_limit; //slave数量变成0后,过去多长时间,释放缓冲区
time_t repl_no_slaves_since; //slave数量变成0的时间点
int repl_min_slaves_to_write; //slave状态良好的最低数量,如果良好节点数量小于该值的话,master拒绝写操作
int repl_min_slaves_max_lag; //slave两次发送ack确认间隔最大值,超过则认为改节点有问题,配合repl_min_slaves_to_write一起使用
int repl_good_slaves_count; //状态良好的slave数量,根据repl_min_slaves_to_write和repl_min_slaves_max_lag计算出来
int repl_diskless_sync; //生成rdb时,可以使用无盘模式,直接通过socket发送内容,不使用rdb文件
int repl_diskless_sync_delay; //无盘模式下,当slave等待该时间后,才会进行rdb,为了让更多的slave能够使用同一次生成的rdb

//从节点
char *masterauth; //验证密码
char *masterhost; //master的host
int masterport; //master的端口
int repl_timeout; //同步超时时间
client *master; //master客户端,建立完连接之后生成
client *cached_master; //master客户端的备份,用于断开之后保存client *master,便于恢复后复用
int repl_syncio_timeout; //传输rdb过程中io超时时间
int repl_state; //同步状态,详见REPL_STATE_*
off_t repl_transfer_size; //传输的rdb大小
off_t repl_transfer_read; //传输rdb过程中,已经读取的字节数
off_t repl_transfer_last_fsync_off; //上一次传输的offset
int repl_transfer_s; //传输rdb所用到的socket
int repl_transfer_fd; //传输rdb使用的文件句柄,把接收到的rdb数据写入于此
char *repl_transfer_tmpfile; //传输使用临时文件
time_t repl_transfer_lastio; //上一次读取的数据时间,用于超时检测
int repl_slave_ro; //slave是否只读
int repl_slave_ignore_maxmemory; //slave是否忽略最大内存限制
time_t repl_down_since; //和master断开的时间
int repl_disable_tcp_nodelay; //同步之后 是否禁用TCP_NODELAY
int slave_announce_port; //用于同步的端口
char *slave_announce_ip; //用于同步的ip
char master_replid[CONFIG_RUN_ID_SIZE+1]; //master的replid
long long master_initial_offset; //master的同步offset
}

同步缓冲区的示意图

同步流程

master和slave的大部分交互都是在定时任务replicationCron中进行的,这个定时任务的上级是serverCron,频率是1秒1次。总体的流程如下:

  • 一个即将成为slave的redis实例启动后,其状态为REPL_STATE_NONE,表示不需要进行
  • 接收slaveofreplicaof命令后,更新master的ip和端口,状态变为REPL_STATE_CONNECT,表示等待连接(如果redis配置文件中配置了,启动的时候就到这一步了)
  • 发送PING命令,状态变为REPL_STATE_RECEIVE_PONG,表示等待接收PONG回复
  • 接收到PONG返回,状态变为REPL_STATE_SEND_AUTH,表示接下来要发送密码。ping->pong这一来一回,是为了验证网络连接正常
  • 发送验证密码,状态变为REPL_STATE_RECEIVE_AUTH,表示等待接收回复确认。如果masterauth为空的话,跳过该阶段
  • 收到OK回复,状态变成REPL_STATE_SEND_PORT ,表示加下来要发送同步数据用到的端口
  • 发送replconf listening-port xxxx,告知使用的端口,状态变成REPL_STATE_RECEIVE_PORT,表示等待接收回复确认
  • 收到OK回复,状态变成REPL_STATE_SEND_IP,表示接下来要发送同步数据用到的ip
  • 发送replconf ip-address xxxx,告知同步使用的ip,状态变成REPL_STATE_RECEIVE_IP,表示等待接收回复确认
  • 收到OK回复,状态变成REPL_STATE_SEND_CAPA,表示接下来要发送支持功能
  • 发送replconf capa eof capa psync2,表明支持eof、psync2协议,状态变成REPL_STATE_RECEIVE_CAPA,表示等待接收回复确认
  • 收到OK回复,状态变成REPL_STATE_SEND_PSYNC,表示接下来要发送psync命令
  • 发送psync命令,状态变成REPL_STATE_RECEIVE_PSYNC,表示等待接收回复确认
  • 收到psync回复,根据返回的结果不同,分为以下几种情况
  1. 返回CONTINUE,用于断开又连上的场景
  2. 返回FULLRESYNC,表示接下来要进行全量同步
  3. 返回不支持psync命令,然后发送sync
  4. 返回其余报错信息,服务端此刻没有能力响应,如加载db中或者master不是根节点,作为一个slave还没有连接上它自己的master

第一种情况,状态变为REPL_STATE_CONNECTED,表示已经连接上
第二种和第三种情况,状态会变为REPL_STATE_TRANSFER,表示等待接受master发送rdb数据
整个过程的失败和第四种情况一样,会关闭连接,状态变更到REPL_STATE_CONNECT,下一次replicationCron的时候又会重新发起上述流程

  • 接收完master发送的rdb数据后,用来初始化db,状态变成REPL_STATE_CONNECTED,表示已经连接上
  • 接收master发送过来的增量数据,然后执行,稳定状态

除主流程之外,定时任务还做了以下几点事情:

  • 超时检测,包括,连接超时、传输超时
  • 心跳检测,发送PING命令,收到PONG回复
  • 发送ack信息,slave会向master发送ack信息,告知自己的同步的偏移量
  • 清理同步数据缓冲区
  • 统计状态良好的slave数量,可能影响到master对于写命令的响应
  • 无盘模式下,为所有等待rdb文件的slave,创建能共用的rdb

源码分析

replicationGetSlaveName

获取从节点的名称ip:port,为空的话返回client的id,主要是在记log的时候用到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
char *replicationGetSlaveName(client *c) {
static char buf[NET_PEER_ID_LEN];
char ip[NET_IP_STR_LEN];

ip[0] = '\0';
buf[0] = '\0';
if (c->slave_ip[0] != '\0' ||
anetPeerToString(c->fd,ip,sizeof(ip),NULL) != -1)
{
/* Note that the 'ip' buffer is always larger than 'c->slave_ip' */
if (c->slave_ip[0] != '\0') memcpy(ip,c->slave_ip,sizeof(c->slave_ip));

if (c->slave_listening_port)
anetFormatAddr(buf,sizeof(buf),ip,c->slave_listening_port);
else
snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",ip);
} else {
snprintf(buf,sizeof(buf),"client id #%llu",
(unsigned long long) c->id);
}
return buf;
}

createReplicationBacklog

创建同步数据缓冲区

1
2
3
4
5
6
7
8
9
void createReplicationBacklog(void) {
serverAssert(server.repl_backlog == NULL);
server.repl_backlog = zmalloc(server.repl_backlog_size);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;

//虽然没有数据,假设当前缓冲区是满的,那么缓冲区第一个字节是最大的偏移量+1,表示的是上一圈的
server.repl_backlog_off = server.master_repl_offset+1;
}

resizeReplicationBacklog

调整数据缓冲区的大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void resizeReplicationBacklog(long long newsize) {
// 设置最小值
if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
if (server.repl_backlog_size == newsize) return;

server.repl_backlog_size = newsize;
if (server.repl_backlog != NULL) {

zfree(server.repl_backlog);
server.repl_backlog = zmalloc(server.repl_backlog_size);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;

//虽然没有数据,假设当前缓冲区是满的,那么缓冲区第一个字节是最大的偏移量+1,表示的是上一圈的
server.repl_backlog_off = server.master_repl_offset+1;
}
}

freeReplicationBacklog

释放数据缓冲区

1
2
3
4
5
void freeReplicationBacklog(void) {
serverAssert(listLength(server.slaves) == 0);
zfree(server.repl_backlog);
server.repl_backlog = NULL;
}

feedReplicationBacklog

往数据缓冲区内写入数据,最底层的入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;

// 维护master自己的最大偏移量
server.master_repl_offset += len;

// 由于缓冲区是作为环形使用的,所以分批次写入,每次只写到一圈的结尾
while(len) {
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
if (thislen > len) thislen = len;
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);

//缓冲区可写入的位置,到尾之后,从头开始
server.repl_backlog_idx += thislen;
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
len -= thislen;
p += thislen;

//缓冲区的实际数据长度
server.repl_backlog_histlen += thislen;
}

//实际数据长度最大就是缓冲区的大小
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;

//缓冲区内第一个字节的位置,也就是数据的起始位置
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
}

feedReplicationBacklogWithObject

往数据缓冲区内写入object类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void feedReplicationBacklogWithObject(robj *o) {
char llstr[LONG_STR_SIZE];
void *p;
size_t len;

// 对象是整形,转换成对应的字符串
if (o->encoding == OBJ_ENCODING_INT) {
len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
p = llstr;
} else {
len = sdslen(o->ptr);
p = o->ptr;
}
feedReplicationBacklog(p,len);
}

replicationFeedSlaves

向从节点同步数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j, len;
char llstr[LONG_STR_SIZE];

//不是最顶级的master,忽略
if (server.masterhost != NULL) return;

//没有数据缓冲区或没有从节点,忽略
if (server.repl_backlog == NULL && listLength(slaves) == 0) return;

serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));

//如果上一次同步数据所使用的的db索引和这次不一样,则生成一个select db命令
if (server.slaveseldb != dictid) {
robj *selectcmd;

// 部分范围的db的select命令之前已经生成过,可以直接用
if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
selectcmd = shared.select[dictid];
} else {
int dictid_len;

dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(OBJ_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}

//select db命令添加到数据缓冲区
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);

//发送给从节点
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReply(slave,selectcmd);
}

//减少引用计数
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}

//更新最新同步数据所在的db索引
server.slaveseldb = dictid;

//把本次要同步的命令写入数据缓冲区
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];

//构造内容,使用数组
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);

//构造数组内的每一项
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);

/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}

//发送给从节点
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;

addReplyMultiBulkLen(slave,argc);

for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}

replicationFeedSlavesFromMasterStream

同步从master接收到的数据给自己的从节点,针对在拓扑结构中处于中间位置的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
listNode *ln;
listIter li;

//调试用
if (0) {
printf("%zu:",buflen);
for (size_t j = 0; j < buflen; j++) {
printf("%c", isprint(buf[j]) ? buf[j] : '.');
}
printf("\n");
}

//写入数据缓冲区
if (server.repl_backlog) feedReplicationBacklog(buf,buflen);

//发送给从节点
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReplyString(slave,buf,buflen);
}
}

replicationFeedMonitors

以监控者模式,发送同步信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
listNode *ln;
listIter li;
int j;
sds cmdrepr = sdsnew("+");
robj *cmdobj;
struct timeval tv;

gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
if (c->flags & CLIENT_LUA) {
cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
} else if (c->flags & CLIENT_UNIX_SOCKET) {
cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
} else {
cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
}

for (j = 0; j < argc; j++) {
if (argv[j]->encoding == OBJ_ENCODING_INT) {
cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
} else {
cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
sdslen(argv[j]->ptr));
}
if (j != argc-1)
cmdrepr = sdscatlen(cmdrepr," ",1);
}
cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
cmdobj = createObject(OBJ_STRING,cmdrepr);

listRewind(monitors,&li);
while((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor,cmdobj);
}
decrRefCount(cmdobj);
}

addReplyReplicationBacklog

向客户端发送缓冲区内的数据,从offset开始,用于断开重传场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
long long addReplyReplicationBacklog(client *c, long long offset) {
long long j, skip, len;

serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);

if (server.repl_backlog_histlen == 0) {
serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
return 0;
}

serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
server.repl_backlog_size);
serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
server.repl_backlog_off);
serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
server.repl_backlog_histlen);
serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
server.repl_backlog_idx);

//需要跳过的长度
skip = offset - server.repl_backlog_off;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);

//指向缓冲区数据的起始点
j = (server.repl_backlog_idx +
(server.repl_backlog_size-server.repl_backlog_histlen)) %
server.repl_backlog_size;
serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);

//确定最终缓冲区内起始点
j = (j + skip) % server.repl_backlog_size;

//分批发送数据,每次最多到一圈末尾
len = server.repl_backlog_histlen - skip;
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
while(len) {
long long thislen =
((server.repl_backlog_size - j) < len) ?
(server.repl_backlog_size - j) : len;

serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
addReplySds(c,sdsnewlen(server.repl_backlog + j, thislen));
len -= thislen;
j = 0;
}
return server.repl_backlog_histlen - skip;
}

getPsyncInitialOffset

获取当前节点的最大偏移量,回复客户端psync命令

1
2
3
long long getPsyncInitialOffset(void) {
return server.master_repl_offset;
}

replicationSetupSlaveForFullResync

给客户端回复需要全量同步,操作该方法之后,节点是处于等待rdb结束的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;

//记录下节点的偏移量
slave->psync_initial_offset = offset;

//设置从节点需要等待rdb结束
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;

//在rdb之后,需要重新计算新数据所在的db索引
server.slaveseldb = -1;

//告诉客户端需要全量同步,对于老的sync命令是不需要这一步的,sync流程中,客户端发成功命令后,就已经在等待rdb数据了
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
server.replid,offset);
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}

masterTryPartialResynchronization

master尝试部分重传,返回C_ERR表示需要全量重传

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
int masterTryPartialResynchronization(client *c) {
long long psync_offset, psync_len;
char *master_replid = c->argv[1]->ptr;
char buf[128];
int buflen;

//获取客户端部分重传的起始offset,psync id offset
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) !=
C_OK) goto need_full_resync;

//校验id是不是ok的
if (strcasecmp(master_replid, server.replid) &&
(strcasecmp(master_replid, server.replid2) ||
psync_offset > server.second_replid_offset))
{
// psync ? -1 表示从节点要求强制全量
if (master_replid[0] != '?') {
if (strcasecmp(master_replid, server.replid) &&
strcasecmp(master_replid, server.replid2))
{
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Replication ID mismatch (Replica asked for '%s', my "
"replication IDs are '%s' and '%s')",
master_replid, server.replid, server.replid2);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
"Requested offset for second ID was %lld, but I can reply "
"up to %lld", psync_offset, server.second_replid_offset);
}
} else {
serverLog(LL_NOTICE,"Full resync requested by replica %s",
replicationGetSlaveName(c));
}
goto need_full_resync;
}

//要求的起始点是否在缓冲区内,server.repl_backlog_off第一个字节的偏移量,server.repl_backlog_off + server.repl_backlog_histlen最后一个字节的偏移量
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
if (psync_offset > server.master_repl_offset) {
serverLog(LL_WARNING,
"Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
}
goto need_full_resync;
}

//到此表示可以部分重传,标记客户端为从节点,同步状态为在线,ack时间
c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_ONLINE;
c->repl_ack_time = server.unixtime;
c->repl_put_online_on_ack = 0;
listAddNodeTail(server.slaves,c);

//判断客户端支持psync哪个版本功能,决定continue 后是否带replid
if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
} else {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
}

//告知客户端continue
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return C_OK;
}

//向客户端发送offset之后的数据
psync_len = addReplyReplicationBacklog(c,psync_offset);
serverLog(LL_NOTICE,
"Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
replicationGetSlaveName(c),
psync_len, psync_offset);

//刷新好的从节点数量
refreshGoodSlavesCount();
return C_OK;

need_full_resync:
//返回C_ERR,调用方获取到后,进行全量同步
return C_ERR;
}

startBgsaveForReplication

为了全量同步执行rdb操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
int startBgsaveForReplication(int mincapa) {
int retval;

//是否使用无盘模式,一看master自己支持,而看客户端是否支持无盘模式下的数据结构
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;

serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",
socket_target ? "replicas sockets" : "disk");

rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);

//根据是否无盘模式,决定不同的rdb方法
if (rsiptr) {
if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr);
else
retval = rdbSaveBackground(server.rdb_filename,rsiptr);
} else {
serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
retval = C_ERR;
}

//rdb失败,告知从节点原因
if (retval == C_ERR) {
serverLog(LL_WARNING,"BGSAVE for replication failed");
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->flags &= ~CLIENT_SLAVE;
listDelNode(server.slaves,ln);
addReplyError(slave,
"BGSAVE failed, replication can't continue");
slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
return retval;
}

//使用文件情况下,告知客户端fullresync id offset,无盘情况下,在rdbSaveToSlavesSockets中就已经告知了
if (!socket_target) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
replicationSetupSlaveForFullResync(slave,
getPsyncInitialOffset());
}
}
}

if (retval == C_OK) replicationScriptCacheFlush();
return retval;
}

syncCommand

响应客户端的sync/psync命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
void syncCommand(client *c) {

//如果客户端已经被标记成salve,忽略
if (c->flags & CLIENT_SLAVE) return;

//如果自己也是从节点,且没有和master连接上的话,报错
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
return;
}

//如果还有为输出的内容,报错
if (clientHasPendingReplies(c)) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}

serverLog(LL_NOTICE,"Replica %s asks for synchronization",
replicationGetSlaveName(c));

//如果是psync的话,尝试续传同步
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return;
} else {
char *master_replid = c->argv[1]->ptr;

//部分重传不成功,且客户端也是期望部分,统计+1
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
//sync命令,给客户端打上标记
c->flags |= CLIENT_PRE_PSYNC;
}

//全量同步统计+1
server.stat_sync_full++;

//设置客户端同步状态为等待rdb开始
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay)
anetDisableTcpNoDelay(NULL, c->fd);
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);

//创建数据缓冲区
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
//生成replid id
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
}

//rdb进行中,且目标为文件
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
client *slave;
listNode *ln;
listIter li;

//查看该rdb是否为另一个从节点同步引起的
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}

//如果是另一个节点引起的,且另一个节点的能力范围能全部覆盖当前节点,那么就可以复用这个rdb了
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
copyClientOutputBuffer(c,slave);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
//不能复用,等待下次
serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
}

//rdb进行中,且目标为socket,无盘模式,直接告知等待下一次
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");

//没有rdb进行中
} else {

//master和从节点都支持无盘模式,那么就等着,下次replicationCron的时候就会执行无盘rdb,另一方面等着更多的从节点
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
//只能使用文件了,那就执行
if (server.aof_child_pid == -1) {
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but an AOF rewrite is active. "
"BGSAVE for replication delayed");
}
}
}
return;
}

replconfCommand

响应客户端replconf命令,在握手阶段传输ip/端口/能力信息、同步ack信息等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void replconfCommand(client *c) {
int j;

//参数肯定是奇数,否则报错 REPLCONF <option> <value> <option> <value> ...
if ((c->argc % 2) == 0) {
addReply(c,shared.syntaxerr);
return;
}

//确定具体参数listening-port、ip-address、capa、eof、psync2、ack、getack
for (j = 1; j < c->argc; j+=2) {
if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
long port;

if ((getLongFromObjectOrReply(c,c->argv[j+1],
&port,NULL) != C_OK))
return;
c->slave_listening_port = port;
} else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
sds ip = c->argv[j+1]->ptr;
if (sdslen(ip) < sizeof(c->slave_ip)) {
memcpy(c->slave_ip,ip,sdslen(ip)+1);
} else {
addReplyErrorFormat(c,"REPLCONF ip-address provided by "
"replica instance is too long: %zd bytes", sdslen(ip));
return;
}
} else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
c->slave_capa |= SLAVE_CAPA_EOF;
else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
c->slave_capa |= SLAVE_CAPA_PSYNC2;
} else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
long long offset;

//只有从节点才会发送改消息
if (!(c->flags & CLIENT_SLAVE)) return;
if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
return;
if (offset > c->repl_ack_off)
c->repl_ack_off = offset;
c->repl_ack_time = server.unixtime;

if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
putSlaveOnline(c);
//该命令不需要回复任何信息
return;
} else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
if (server.masterhost && server.master) replicationSendAck();
return;
} else {
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
(char*)c->argv[j]->ptr);
return;
}
}
addReply(c,shared.ok);
}

putSlaveOnline

把从节点设置成在线状态,重新挂客户端连接的写事件,因为sync的时候,已经禁止了客户端的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void putSlaveOnline(client *slave) {
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 0;
slave->repl_ack_time = server.unixtime;

//重新打开向客户端发送数据
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
sendReplyToClient, slave) == AE_ERR) {
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
freeClient(slave);
return;
}
refreshGoodSlavesCount();
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));
}

sendBulkToSlave

发送rdb文件给从节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
client *slave = privdata;
UNUSED(el);
UNUSED(mask);
char buf[PROTO_IOBUF_LEN];
ssize_t nwritten, buflen;

//先发送文件长度
if (slave->replpreamble) {
nwritten = write(fd,slave->replpreamble,sdslen(slave->replpreamble));
if (nwritten == -1) {
serverLog(LL_VERBOSE,"Write error sending RDB preamble to replica: %s",
strerror(errno));
freeClient(slave);
return;
}
server.stat_net_output_bytes += nwritten;
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
slave->replpreamble = NULL;
} else {
return;
}
}

//调整文件句柄位置
lseek(slave->repldbfd,slave->repldboff,SEEK_SET);

//读取到buf中
buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
if (buflen <= 0) {
serverLog(LL_WARNING,"Read error sending DB to replica: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
freeClient(slave);
return;
}

//buf发送给从节点
if ((nwritten = write(fd,buf,buflen)) == -1) {
if (errno != EAGAIN) {
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
strerror(errno));
freeClient(slave);
}
return;
}

//记录发送字节数
slave->repldboff += nwritten;
server.stat_net_output_bytes += nwritten;

//发送完成,关闭文件,删除写事件
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
putSlaveOnline(slave);
}
}

updateSlavesWaitingBgsave

rdb结束后的处理工作,1.发送内容给从节点 2.如果还有其他节点需要rdb文件,开始一个新的rdb流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
listNode *ln;
int startbgsave = 0;
int mincapa = -1;
listIter li;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

// 查看所有的等待rdb执行的数量,mincapa这些节点最小公共能力
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
startbgsave = 1;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
//当前rdb能适用的节点
struct redis_stat buf;

//rdb直接发送,不使用文件
if (type == RDB_CHILD_TYPE_SOCKET) {
serverLog(LL_NOTICE,
"Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from slave to enable streaming",
replicationGetSlaveName(slave));
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 1;
slave->repl_ack_time = server.unixtime;
} else {
//使用文件
if (bgsaveerr != C_OK) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
continue;
}

//打开rdb文件
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}

//设置从节点发送文件的起始点,文件大小
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);

//注册写事件为发送rdb文件内容
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
}
}
}

//如果还有等待rdb的节点,开始新的rdb
if (startbgsave) startBgsaveForReplication(mincapa);
}

changeReplicationId

生成主repl id,用来标识来源

1
2
3
4
void changeReplicationId(void) {
getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE);
server.replid[CONFIG_RUN_ID_SIZE] = '\0';
}

clearReplicationId2

清除次repl id

1
2
3
4
5
void clearReplicationId2(void) {
memset(server.replid2,'0',sizeof(server.replid));
server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
server.second_replid_offset = -1;
}

shiftReplicationId

把主replid分给次replid

1
2
3
4
5
6
7
void shiftReplicationId(void) {

memcpy(server.replid2,server.replid,sizeof(server.replid));
server.second_replid_offset = server.master_repl_offset+1;
changeReplicationId();
serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
}

slaveIsInHandshakeState

判断从节点是否处于握手流程中

1
2
3
4
int slaveIsInHandshakeState(void) {
return server.repl_state >= REPL_STATE_RECEIVE_PONG &&
server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
}

replicationSendNewlineToMaster

发送空行信息给master,防止超时,两种情况下用到1.清空db 2.使用rdb文件初始化db

1
2
3
4
5
6
7
8
void replicationSendNewlineToMaster(void) {
static time_t newline_sent;
if (time(NULL) != newline_sent) {
newline_sent = time(NULL);
if (write(server.repl_transfer_s,"\n",1) == -1) {
}
}
}

replicationEmptyDbCallback

清空数据库的回调,防止连接超时

1
2
3
4
void replicationEmptyDbCallback(void *privdata) {
UNUSED(privdata);
replicationSendNewlineToMaster();
}

replicationCreateMasterClient

创建master客户端,在成功建立连接后执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void replicationCreateMasterClient(int fd, int dbid) {
server.master = createClient(fd);
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
memcpy(server.master->replid, server.master_replid,
sizeof(server.master_replid));

//master不支持psync
if (server.master->reploff == -1)
server.master->flags |= CLIENT_PRE_PSYNC;
if (dbid != -1) selectDb(server.master,dbid);
}

restartAOF

重启aof,从节点在接收到rdb文件并且加载完后,需要根据之前的设置决定是否重启aof

1
2
3
4
5
6
7
8
9
10
11
void restartAOF() {
int retry = 10;
while (retry-- && startAppendOnly() == C_ERR) {
serverLog(LL_WARNING,"Failed enabling the AOF after successful master synchronization! Trying it again in one second.");
sleep(1);
}
if (!retry) {
serverLog(LL_WARNING,"FATAL: this replica instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
exit(1);
}
}

readSyncBulkPayload

从节点读取master发送过来的数据,分为rdb文件和无盘两种形式,rdb文件一开始就知道总的长度,无盘模式则是用特殊标记开始和结束,基于此读取的方式也不一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen, nwritten;
off_t left;
UNUSED(el);
UNUSED(privdata);
UNUSED(mask);

//master使用无盘模式发送的rdb是用eofmark包住的
static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE];

//使用使用无盘模式
static int usemark = 0;

//读取前置信息,获取类型区别
if (server.repl_transfer_size == -1) {
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
}

if (buf[0] == '-') {
serverLog(LL_WARNING,
"MASTER aborted replication with an error: %s",
buf+1);
goto error;
} else if (buf[0] == '\0') {
//用于保持连接的空信息
server.repl_transfer_lastio = server.unixtime;
return;
} else if (buf[0] != '$') {
serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
}

//master发回来的数据有两种可能,一种是使用文件,开头是文件的长度$<count>,另一种是无盘模式,开头是$EOF:<40 bytes delimiter>

//无盘模式
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
usemark = 1;
memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
memset(lastbytes,0,CONFIG_RUN_ID_SIZE);

//更新size避免下次再进入这里
server.repl_transfer_size = 0;
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving streamed RDB from master");
} else {
//rdb文件模式
usemark = 0;
server.repl_transfer_size = strtol(buf+1,NULL,10);
serverLog(LL_NOTICE,
"MASTER <-> REPLICA sync: receiving %lld bytes from master",
(long long) server.repl_transfer_size);
}
return;
}

//读取rdb内容
if (usemark) {
//无盘模式下,每次读取固定长度
readlen = sizeof(buf);
} else {
//rdb文件下,实时计算剩余长度
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}

nread = read(fd,buf,readlen);
if (nread <= 0) {
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake();
return;
}
server.stat_net_input_bytes += nread;

//是否读到无盘模式下的结尾
int eof_reached = 0;

if (usemark) {
//记录下最后的读取的内容,用来匹配是否到结尾了
if (nread >= CONFIG_RUN_ID_SIZE) {
memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);
} else {
int rem = CONFIG_RUN_ID_SIZE-nread;
memmove(lastbytes,lastbytes+nread,rem);
memcpy(lastbytes+rem,buf,nread);
}

//比对开头和结尾的标识是否一致
if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;
}

//更新传输时间,用于超时检测
server.repl_transfer_lastio = server.unixtime;
if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s",
(nwritten == -1) ? strerror(errno) : "short write");
goto error;
}

//记录读取的传输字节
server.repl_transfer_read += nread;

//删除最后的40个标识结束字节,通过指定文件长度
if (usemark && eof_reached) {
if (ftruncate(server.repl_transfer_fd,
server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
{
serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));
goto error;
}
}

//把数据落到磁盘上,避免最后一次性大量的数据落地
if (server.repl_transfer_read >=
server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
{
off_t sync_size = server.repl_transfer_read -
server.repl_transfer_last_fsync_off;
rdb_fsync_range(server.repl_transfer_fd,
server.repl_transfer_last_fsync_off, sync_size);
server.repl_transfer_last_fsync_off += sync_size;
}

//rdb文件格式下,是否读完了
if (!usemark) {
if (server.repl_transfer_read == server.repl_transfer_size)
eof_reached = 1;
}

//数据读完了,开始基于此初始化
if (eof_reached) {

//记录aof状态,初始化期间aof关闭
int aof_is_enabled = server.aof_state != AOF_OFF;

//关闭正在进行的rdb,马上就用新的数据了,没用了
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the "
"master, but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid "
"any race",
(long) server.rdb_child_pid);
kill(server.rdb_child_pid,SIGUSR1);
rdbRemoveTempFile(server.rdb_child_pid);
}

//传输的文件转为rdb正式文件
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno));
cancelReplicationHandshake();
return;
}
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");

//关闭aof
if(aof_is_enabled) stopAppendOnly();

//数据清空
signalFlushedDb(-1);
emptyDb(
-1,
server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
replicationEmptyDbCallback);

//删除可读事件,防止初始化期间,接受新的数据
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;

//出事话
if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
cancelReplicationHandshake();

//恢复aof
if (aof_is_enabled) restartAOF();
return;
}

zfree(server.repl_transfer_tmpfile);
close(server.repl_transfer_fd);

//创建master客户端,设置同步状态为连接上了
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;

//把master的replid设置成自己的replid,在多级拓扑中有用
memcpy(server.replid,server.master->replid,sizeof(server.replid));
server.master_repl_offset = server.master->reploff;
clearReplicationId2();

//创建数据缓冲区
if (server.repl_backlog == NULL) createReplicationBacklog();

serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");

if (aof_is_enabled) restartAOF();
}
return;

error:
cancelReplicationHandshake();
return;
}

sendSynchronousCommand

对master的fd进行读/写数据,在数据传输阶段之前都是使用该方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#define SYNC_CMD_READ (1<<0)
#define SYNC_CMD_WRITE (1<<1)
#define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE)
char *sendSynchronousCommand(int flags, int fd, ...) {

//向master写数据,使用redi协议
if (flags & SYNC_CMD_WRITE) {
char *arg;
va_list ap;
sds cmd = sdsempty();
sds cmdargs = sdsempty();
size_t argslen = 0;
va_start(ap,fd);

while(1) {
arg = va_arg(ap, char*);
if (arg == NULL) break;

cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n",strlen(arg),arg);
argslen++;
}

va_end(ap);

cmd = sdscatprintf(cmd,"*%zu\r\n",argslen);
cmd = sdscatsds(cmd,cmdargs);
sdsfree(cmdargs);

//发送数据
if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000)
== -1)
{
sdsfree(cmd);
return sdscatprintf(sdsempty(),"-Writing to master: %s",
strerror(errno));
}
sdsfree(cmd);
}

//读数据
if (flags & SYNC_CMD_READ) {
char buf[256];

if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000)
== -1)
{
return sdscatprintf(sdsempty(),"-Reading from master: %s",
strerror(errno));
}
server.repl_transfer_lastio = server.unixtime;
return sdsnew(buf);
}
return NULL;
}

slaveTryPartialResynchronization

客户端使用psync发起同步,发送请求/读取请求结果

发送的话,可能返回以下几种结果
PSYNC_WRITE_ERROR: 发送失败
PSYNC_WAIT_REPLY: 发送成功,下一次读就行了

读的话,可能返回以下几种结果
PSYNC_CONTINUE: master返回continue,表示可以断点续传
PSYNC_FULLRESYNC: master返回表示需要全量同步
PSYNC_NOT_SUPPORTED: master不支持psync命令
PSYNC_TRY_LATER: 还没有返回,过会再查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#define PSYNC_WRITE_ERROR 0
#define PSYNC_WAIT_REPLY 1
#define PSYNC_CONTINUE 2
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
#define PSYNC_TRY_LATER 5

int slaveTryPartialResynchronization(int fd, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply;

//写数据
if (!read_reply) {
server.master_initial_offset = -1;

//如果是断开后又连上,cached_master中就是之前连上的信息
if (server.cached_master) {
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
//全新的话,直接强制请求全量的 psync ? -1
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}

//发送命令
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
aeDeleteFileEvent(server.el,fd,AE_READABLE);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}

//读数据
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (sdslen(reply) == 0) {
//空数据,只是为了保持连接,防止超时被剔除
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}

//能拿到psync的结果了,那么后续的处理就得交给别的处理了,如接受rdb内容等
aeDeleteFileEvent(server.el,fd,AE_READABLE);

//fullresync表示master将要发送全量的数据
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;

//校验fullresync是否符合格式 +FULLRESYNC replid offset
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {

//校验通过,设置master的replid和offset
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}
//释放备用master
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}

//continue表示master接下来要发送增量数据
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted. */
serverLog(LL_NOTICE,
"Successful partial resynchronization with master.");

//可能有两种形式 1.+CONTINUE\r\n 2.+CONTINUE replid\r\n
char *start = reply+10;
char *end = reply+9;
while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;

//第二种形式
if (end-start == CONFIG_RUN_ID_SIZE) {
char new[CONFIG_RUN_ID_SIZE+1];
memcpy(new,start,CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';

//断开重连之后,master的replid已经发生变化了
if (strcmp(new,server.cached_master->replid)) {

serverLog(LL_WARNING,"Master replication ID changed to %s",new);

//原来master的replid只能是可读的了,用于自己下面的从节点继续从自己这同步数据
memcpy(server.replid2,server.cached_master->replid,
sizeof(server.replid2));
server.second_replid_offset = server.master_repl_offset+1;

//主replid更新成现在的master的
memcpy(server.replid,new,sizeof(server.replid));
memcpy(server.cached_master->replid,new,sizeof(server.replid));

//断开所有从节点的连接
disconnectSlaves();
}
}

//把断开前缓存的cache_master再回归到master,可以继续用
sdsfree(reply);
replicationResurrectCachedMaster(fd);

//创建数据缓冲区
if (server.repl_backlog == NULL) createReplicationBacklog();
return PSYNC_CONTINUE;
}

//master目前不提供服务,如NOMASTERLINK:master是一个中间拓扑节点,它还没连上它的master,LOADING:加载中
if (!strncmp(reply,"-NOMASTERLINK",13) ||
!strncmp(reply,"-LOADING",8))
{
serverLog(LL_NOTICE,
"Master is currently unable to PSYNC "
"but should be in the future: %s", reply);
sdsfree(reply);
return PSYNC_TRY_LATER;
}

//其余报错
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
serverLog(LL_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
serverLog(LL_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}

syncWithMaster

和master建立连接过程的处理,握手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err = NULL;
int dfd = -1, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
UNUSED(el);
UNUSED(privdata);
UNUSED(mask);

//处理slave no one的情况
if (server.repl_state == REPL_STATE_NONE) {
close(fd);
return;
}

//检查socket状态
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
strerror(sockerr));
goto error;
}

//发送一个PING,检查连接情况
if (server.repl_state == REPL_STATE_CONNECTING) {
serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");

//在收到回复之前,杜绝其他的写
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
server.repl_state = REPL_STATE_RECEIVE_PONG;

//发送命令
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
if (err) goto write_error;
return;
}

//等待接收PONG,对PING的确认
if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

if (err[0] != '+' &&
strncmp(err,"-NOAUTH",7) != 0 &&
strncmp(err,"-ERR operation not permitted",28) != 0)
{
serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
sdsfree(err);
goto error;
} else {
serverLog(LL_NOTICE,
"Master replied to PING, replication can continue...");
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_AUTH;
}

//发送密码
if (server.repl_state == REPL_STATE_SEND_AUTH) {
if (server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else {
server.repl_state = REPL_STATE_SEND_PORT;
}
}

//等待密码确认
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PORT;
}

//发送端口
if (server.repl_state == REPL_STATE_SEND_PORT) {
sds port = sdsfromlonglong(server.slave_announce_port ?
server.slave_announce_port : server.port);
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"listening-port",port, NULL);
sdsfree(port);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_PORT;
return;
}

//等待端口确认
if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF listening-port: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_IP;
}

//如果没有配置ip,跳过发送ip阶段
if (server.repl_state == REPL_STATE_SEND_IP &&
server.slave_announce_ip == NULL)
{
server.repl_state = REPL_STATE_SEND_CAPA;
}

//发送ip
if (server.repl_state == REPL_STATE_SEND_IP) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_IP;
return;
}

//等待确认ip
if (server.repl_state == REPL_STATE_RECEIVE_IP) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF ip-address: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_CAPA;
}

//发送能力,capa eof支持无盘,capa psync2迟滞continue replid
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
return;
}

//等待确认能力
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PSYNC;
}

//发送psync
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(fd,0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}

if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) {
serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
"state should be RECEIVE_PSYNC but is %d",
server.repl_state);
goto error;
}

//读取psync结果
psync_result = slaveTryPartialResynchronization(fd,1);

//没有恢复
if (psync_result == PSYNC_WAIT_REPLY) return;

//收到了保持的连接的回复,后面再试
if (psync_result == PSYNC_TRY_LATER) goto error;

//收到了continue,进行断点重传,在slaveTryPartialResynchronization内已经设置好了server.master和当前的连接状态为已连接,所以就没有额外的代码,返回就行了
if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
return;
}

//psync失败或不支持
disconnectSlaves();
freeReplicationBacklog();

//尝试sync命令,sync命令不需要任何回复确认
if (psync_result == PSYNC_NOT_SUPPORTED) {
serverLog(LL_NOTICE,"Retrying with SYNC...");
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}

//创建接收rdb的临时文件
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) {
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
}

//把该连接接下来的交互交给readSyncBulkPayload,接收rdb数据
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
== AE_ERR)
{
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (fd=%d)",
strerror(errno),fd);
goto error;
}

//更新同步信息
server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;

error:
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
if (dfd != -1) close(dfd);
close(fd);
server.repl_transfer_s = -1;
server.repl_state = REPL_STATE_CONNECT;
return;

write_error:
serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
sdsfree(err);
goto error;
}

connectWithMaster

创建和master的socket连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int connectWithMaster(void) {
int fd;

fd = anetTcpNonBlockBestEffortBindConnect(NULL,
server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
if (fd == -1) {
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return C_ERR;
}

//握手过程的处理交给syncWithMaster
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
serverLog(LL_WARNING,"Can't create readable event for SYNC");
return C_ERR;
}

//记录连接的信息和同步状态
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
}

undoConnectWithMaster

断开master的socket连接

1
2
3
4
5
6
7
void undoConnectWithMaster(void) {
int fd = server.repl_transfer_s;

aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
close(fd);
server.repl_transfer_s = -1;
}

replicationAbortSyncTransfer

中断和master传输,在传输过程中发现数据异常的情况

1
2
3
4
5
6
7
void replicationAbortSyncTransfer(void) {
serverAssert(server.repl_state == REPL_STATE_TRANSFER);
undoConnectWithMaster();
close(server.repl_transfer_fd);
unlink(server.repl_transfer_tmpfile);
zfree(server.repl_transfer_tmpfile);
}

cancelReplicationHandshake

取消和mater的握手过程,分为连接阶段和传输阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int cancelReplicationHandshake(void) {

//传输阶段
if (server.repl_state == REPL_STATE_TRANSFER) {
replicationAbortSyncTransfer();
server.repl_state = REPL_STATE_CONNECT;
} else if (server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState())
{
//连接阶段
undoConnectWithMaster();
server.repl_state = REPL_STATE_CONNECT;
} else {
return 0;
}
return 1;
}

replicationSetMaster

设置master的ip和端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;

sdsfree(server.masterhost);
server.masterhost = sdsnew(ip);
server.masterport = port;
if (server.master) {
freeClient(server.master);
}
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */

//断开所有从节点,为了后面接受新的数据
disconnectSlaves();
cancelReplicationHandshake();

//自己本身当做cache_master,备用
if (was_master) replicationCacheMasterUsingMyself();

//初始化状态,下一次定时任务会建立连接
server.repl_state = REPL_STATE_CONNECT;
}

replicationUnsetMaster

取消同步,把自己变成一个master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void replicationUnsetMaster(void) {
if (server.masterhost == NULL) return;
sdsfree(server.masterhost);
server.masterhost = NULL;

shiftReplicationId();
if (server.master) freeClient(server.master);
replicationDiscardCachedMaster();
cancelReplicationHandshake();

//自身replid变更了,让从节点知道该信息
disconnectSlaves();
server.repl_state = REPL_STATE_NONE;

server.slaveseldb = -1;

server.repl_no_slaves_since = server.unixtime;
}

replicationHandleMasterDisconnection

和master断开连接后的处理

1
2
3
4
5
void replicationHandleMasterDisconnection(void) {
server.master = NULL;
server.repl_state = REPL_STATE_CONNECT;
server.repl_down_since = server.unixtime;
}

replicaofCommand

响应replicaf、slaveof命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void replicaofCommand(client *c) {

//集群模式下禁止
if (server.cluster_enabled) {
addReplyError(c,"REPLICAOF not allowed in cluster mode.");
return;
}

//特殊处理 slaveof/replicaof no one,把自己变成master
if (!strcasecmp(c->argv[1]->ptr,"no") &&
!strcasecmp(c->argv[2]->ptr,"one")) {
if (server.masterhost) {
replicationUnsetMaster();
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client);
sdsfree(client);
}
} else {
//slaveof/replicaof ip port
long port;

if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK))
return;

//判断是否是已经连上的master
if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
&& server.masterport == port) {
serverLog(LL_NOTICE,"REPLICAOF would result into synchronization with the master we are already connected with. No operation performed.");
addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
return;
}

//设置master ip port
replicationSetMaster(c->argv[1]->ptr, port);
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')",
server.masterhost, server.masterport, client);
sdsfree(client);
}
addReply(c,shared.ok);
}

roleCommand

响应role命令,获取角色信息master/salve,以及附带信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
void roleCommand(client *c) {

//master,获取所有从节点的ip port offset
if (server.masterhost == NULL) {
listIter li;
listNode *ln;
void *mbcount;
int slaves = 0;

addReplyMultiBulkLen(c,3);
addReplyBulkCBuffer(c,"master",6);
addReplyLongLong(c,server.master_repl_offset);
mbcount = addDeferredMultiBulkLength(c);
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
char ip[NET_IP_STR_LEN], *slaveip = slave->slave_ip;

if (slaveip[0] == '\0') {
if (anetPeerToString(slave->fd,ip,sizeof(ip),NULL) == -1)
continue;
slaveip = ip;
}
if (slave->replstate != SLAVE_STATE_ONLINE) continue;
addReplyMultiBulkLen(c,3);
addReplyBulkCString(c,slaveip);
addReplyBulkLongLong(c,slave->slave_listening_port);
addReplyBulkLongLong(c,slave->repl_ack_off);
slaves++;
}
setDeferredMultiBulkLength(c,mbcount,slaves);
} else {
//从节点
char *slavestate = NULL;

addReplyMultiBulkLen(c,5);
addReplyBulkCBuffer(c,"slave",5);
addReplyBulkCString(c,server.masterhost);
addReplyLongLong(c,server.masterport);
if (slaveIsInHandshakeState()) {
slavestate = "handshake";
} else {
switch(server.repl_state) {
case REPL_STATE_NONE: slavestate = "none"; break;
case REPL_STATE_CONNECT: slavestate = "connect"; break;
case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
case REPL_STATE_TRANSFER: slavestate = "sync"; break;
case REPL_STATE_CONNECTED: slavestate = "connected"; break;
default: slavestate = "unknown"; break;
}
}
addReplyBulkCString(c,slavestate);
addReplyLongLong(c,server.master ? server.master->reploff : -1);
}
}

replicationSendAck

发送ack命令,告知master自己的同步offset

1
2
3
4
5
6
7
8
9
10
11
12
void replicationSendAck(void) {
client *c = server.master;

if (c != NULL) {
c->flags |= CLIENT_MASTER_FORCE_REPLY;
addReplyMultiBulkLen(c,3);
addReplyBulkCString(c,"REPLCONF");
addReplyBulkCString(c,"ACK");
addReplyBulkLongLong(c,c->reploff);
c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
}
}

replicationCacheMaster

缓存master client,用于和master断开连接后,因为后续重连的话可以直接继续用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void replicationCacheMaster(client *c) {
serverAssert(server.master != NULL && server.cached_master == NULL);
serverLog(LL_NOTICE,"Caching the disconnected master state.");

unlinkClient(c);

sdsclear(server.master->querybuf);
sdsclear(server.master->pending_querybuf);
server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
c->sentlen = 0;
c->reply_bytes = 0;
c->bufpos = 0;
resetClient(c);

server.cached_master = server.master;

if (c->peerid) {
sdsfree(c->peerid);
c->peerid = NULL;
}

replicationHandleMasterDisconnection();
}

replicationCacheMasterUsingMyself

当前实例自己充当cache_master

1
2
3
4
5
6
7
8
9
10
11
12
void replicationCacheMasterUsingMyself(void) {

server.master_initial_offset = server.master_repl_offset;
replicationCreateMasterClient(-1,-1);

memcpy(server.master->replid, server.replid, sizeof(server.replid));

unlinkClient(server.master);
server.cached_master = server.master;
server.master = NULL;
serverLog(LL_NOTICE,"Before turning into a replica, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer.");
}

replicationDiscardCachedMaster

舍弃cache_master

1
2
3
4
5
6
7
8
9
10
void replicationDiscardCachedMaster(void) {
if (server.cached_master == NULL) return;

serverLog(LL_NOTICE,"Discarding previously cached master state.");

//先提出调master标记,否则不是真正的free
server.cached_master->flags &= ~CLIENT_MASTER;
freeClient(server.cached_master);
server.cached_master = NULL;
}

replicationResurrectCachedMaster

把cache_master转为master,并更新fd,用于断开重连之后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void replicationResurrectCachedMaster(int newfd) {
server.master = server.cached_master;
server.cached_master = NULL;
server.master->fd = newfd;
server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
server.master->authenticated = 1;
server.master->lastinteraction = server.unixtime;
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;

linkClient(server.master);
//回复读事件响应
if (aeCreateFileEvent(server.el, newfd, AE_READABLE,
readQueryFromClient, server.master)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
freeClientAsync(server.master);
}

//继续发送之前的数据
if (clientHasPendingReplies(server.master)) {
if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
sendReplyToClient, server.master)) {
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
freeClientAsync(server.master); /* Close ASAP. */
}
}
}

refreshGoodSlavesCount

统计良好状态的从节点数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void refreshGoodSlavesCount(void) {
listIter li;
listNode *ln;
int good = 0;

//参数参照上面的数数据结构部分说明
if (!server.repl_min_slaves_to_write ||
!server.repl_min_slaves_max_lag) return;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
time_t lag = server.unixtime - slave->repl_ack_time;

if (slave->replstate == SLAVE_STATE_ONLINE &&
lag <= server.repl_min_slaves_max_lag) good++;
}
server.repl_good_slaves_count = good;
}

replicationScriptCacheInit

同步脚本缓存初始化

1
2
3
4
5
void replicationScriptCacheInit(void) {
server.repl_scriptcache_size = 10000;
server.repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL);
server.repl_scriptcache_fifo = listCreate();
}

replicationScriptCacheFlush

清空脚本

1
2
3
4
5
void replicationScriptCacheFlush(void) {
dictEmpty(server.repl_scriptcache_dict,NULL);
listRelease(server.repl_scriptcache_fifo);
server.repl_scriptcache_fifo = listCreate();
}

replicationScriptCacheAdd

增加缓存脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void replicationScriptCacheAdd(sds sha1) {
int retval;
sds key = sdsdup(sha1);

if (listLength(server.repl_scriptcache_fifo) == server.repl_scriptcache_size)
{
listNode *ln = listLast(server.repl_scriptcache_fifo);
sds oldest = listNodeValue(ln);

retval = dictDelete(server.repl_scriptcache_dict,oldest);
serverAssert(retval == DICT_OK);
listDelNode(server.repl_scriptcache_fifo,ln);
}

retval = dictAdd(server.repl_scriptcache_dict,key,NULL);
listAddNodeHead(server.repl_scriptcache_fifo,key);
serverAssert(retval == DICT_OK);
}

replicationScriptCacheExists

判断脚本是否存在

1
2
3
int replicationScriptCacheExists(sds sha1) {
return dictFind(server.repl_scriptcache_dict,sha1) != NULL;
}

replicationRequestAckFromSlaves

设置从从节点获取ack信息,下一次beforeSleep的时候 会向从节点发送reploconf getack 命令

1
2
3
void replicationRequestAckFromSlaves(void) {
server.get_ack_from_slaves = 1;
}

replicationCountAcksByOffset

统计ack大于某一个阈值的从节点数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int replicationCountAcksByOffset(long long offset) {
listIter li;
listNode *ln;
int count = 0;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate != SLAVE_STATE_ONLINE) continue;
if (slave->repl_ack_off >= offset) count++;
}
return count;
}

waitCommand

响应watch命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void waitCommand(client *c) {
mstime_t timeout;
long numreplicas, ackreplicas;
long long offset = c->woff;

if (server.masterhost) {
addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated.");
return;
}

/* Argument parsing. */
if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
return;
if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
!= C_OK) return;

ackreplicas = replicationCountAcksByOffset(c->woff);
if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
addReplyLongLong(c,ackreplicas);
return;
}

c->bpop.timeout = timeout;
c->bpop.reploffset = offset;
c->bpop.numreplicas = numreplicas;
listAddNodeTail(server.clients_waiting_acks,c);
blockClient(c,BLOCKED_WAIT);

replicationRequestAckFromSlaves();
}

unblockClientWaitingReplicas

不阻塞客户端

1
2
3
4
5
void unblockClientWaitingReplicas(client *c) {
listNode *ln = listSearchKey(server.clients_waiting_acks,c);
serverAssert(ln != NULL);
listDelNode(server.clients_waiting_acks,ln);
}

processClientsWaitingReplicas

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void processClientsWaitingReplicas(void) {
long long last_offset = 0;
int last_numreplicas = 0;

listIter li;
listNode *ln;

listRewind(server.clients_waiting_acks,&li);
while((ln = listNext(&li))) {
client *c = ln->value;

if (last_offset && last_offset > c->bpop.reploffset &&
last_numreplicas > c->bpop.numreplicas)
{
unblockClient(c);
addReplyLongLong(c,last_numreplicas);
} else {
int numreplicas = replicationCountAcksByOffset(c->bpop.reploffset);

if (numreplicas >= c->bpop.numreplicas) {
last_offset = c->bpop.reploffset;
last_numreplicas = numreplicas;
unblockClient(c);
addReplyLongLong(c,numreplicas);
}
}
}
}

replicationGetSlaveOffset

获取master的offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
long long replicationGetSlaveOffset(void) {
long long offset = 0;

if (server.masterhost != NULL) {
if (server.master) {
offset = server.master->reploff;
} else if (server.cached_master) {
offset = server.cached_master->reploff;
}
}

if (offset < 0) offset = 0;
return offset;
}

replicationCron

主从同步的定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
void replicationCron(void) {
static long long replication_cron_loops = 0;

//连接超时
if (server.masterhost &&
(server.repl_state == REPL_STATE_CONNECTING ||
slaveIsInHandshakeState()) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
cancelReplicationHandshake();
}

//rdb数据传输超时
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
cancelReplicationHandshake();
}

//心跳检测
if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED &&
(time(NULL)-server.master->lastinteraction) > server.repl_timeout)
{
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
freeClient(server.master);
}

//需要建立和master的连接
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
}

//定时告知master自己的offset情况,如果master是旧的版本,不支持psync,忽略,保持心跳的一种方式
if (server.masterhost && server.master &&
!(server.master->flags & CLIENT_PRE_PSYNC))
replicationSendAck();


listIter li;
listNode *ln;
robj *ping_argv[1];

//ping从节点,保持连接,心跳
if ((replication_cron_loops % server.repl_ping_slave_period) == 0 &&
listLength(server.slaves))
{
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(server.slaves, server.slaveseldb,
ping_argv, 1);
decrRefCount(ping_argv[0]);
}

//对于等待rdb开始的节点,使用该方法保持心跳
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

int is_presync =
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
(slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));

if (is_presync) {
if (write(slave->fd, "\n", 1) == -1) {

}
}
}

//删除超时的从节点
if (listLength(server.slaves)) {
listIter li;
listNode *ln;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate != SLAVE_STATE_ONLINE) continue;
if (slave->flags & CLIENT_PRE_PSYNC) continue;
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout)
{
serverLog(LL_WARNING, "Disconnecting timedout replica: %s",
replicationGetSlaveName(slave));
freeClient(slave);
}
}
}

//当一段时间内从节点变为0后,改变replid,释放资源
if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
server.repl_backlog && server.masterhost == NULL)
{
time_t idle = server.unixtime - server.repl_no_slaves_since;

if (idle > server.repl_backlog_time_limit) {

changeReplicationId();
clearReplicationId2();
freeReplicationBacklog();
serverLog(LL_NOTICE,
"Replication backlog freed after %d seconds "
"without connected replicas.",
(int) server.repl_backlog_time_limit);
}
}

//释放脚本
if (listLength(server.slaves) == 0 &&
server.aof_state == AOF_OFF &&
listLength(server.repl_scriptcache_fifo) != 0)
{
replicationScriptCacheFlush();
}

//无盘模式下,为等待rdb的从节点执行rdb操作
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
time_t idle, max_idle = 0;
int slaves_waiting = 0;
int mincapa = -1;
listNode *ln;
listIter li;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
}
}

//已经有节点等待,且时间超过了repl_diskless_sync_delay,开始rdb
if (slaves_waiting &&
(!server.repl_diskless_sync ||
max_idle > server.repl_diskless_sync_delay))
{
startBgsaveForReplication(mincapa);
}
}

//刷新从节点良好数量
refreshGoodSlavesCount();

//更新定时任务次数
replication_cron_loops++;
}