redis-rdb

rdb作为redis的另一种数据持久化方式,其用途有两种。1、数据持久化,把数据存放到磁盘上。2、主从数据同步,当第一次同步或者从节点同步的信息不在主节点的缓冲区内时,会生成一个rdb文件,然后同步给从节点用于生成初始数据集。

开启条件

rdb的开启,分为主动和被动两种。

主动

redis 有一个配置项save seconds changes,表示数据库在seconds后,有changes个key改变了。redis默认有三个配置:1小时改变1次、5分钟改变100次、1分钟改变10000次,此配置可以在redis的配置文件中更改,也可以在运行期间使用config save seconds changes进行修改,config save “” 表示把配置项内容清空。在serverCron中,会去计算距离上一次数据保存过了多久以及数据库有多少个key改变了。然后和save的配置项依次比较,如果满足时间大于等于seconds、改变次数大于等于changes、上一次rdb成功或时间大于失败重试时间(默认5秒),则进行rdb数据备份。

被动

开启rdb也可以通过客户端发送命令savebgsave来手动开启,save没有创建子进程,而是在主进程进行rdb备份,因此会阻塞其他客户端的请求一直到rdb备份完成,所以需要慎用。bgsave则是正常的创建一个子进程,子进程去负责rdb数据备份。该命令有一个schedule参数,有无该参数影响点是在命令执行时,当有aof或者rdb正在进行的情况。没有该参数的话,会返回错误表明已经有一个子进程在备份数据了;有该参数的话,则会在下一次serverCron时判断是否曼满足没有aof在进行、没有rdb在进行、上一次rdb成功或时间大于失败重试时间(默认5秒),满足的话进行rdb数据备份。

还有一种情况是在主从数据同步的时候,在一开始亦或者从节点要求同步的数据在主节点的同步数据缓存中找不到,主节点会执行rdb备份,把该文件发送给从节点用于构建数据集。

RDB流程

主动的数据备份过程可以分为以下几步

  • 父进程fock子进程
  • 父进程记录开始时间、子进程id
  • 子进程关闭sockets监听
  • 子进程创建临时文件并进行db数据备份
  • 重命名临时文件为正式文件

文件格式

rdb为了让最后生成的文件尽可能的小,rdb定义了一系列的操作和数据格式,对于常用的几类数据类型,存储方式如下:

浮点型:浮点型数据有两种存储形式,一种是转换成字符串写入,一种按照二进制存储

字符串:分为压缩和不压缩两种,通用形式都是先存储字符串长度,然后存储内容

整形:采用特殊编码规则,根据一个字节的前两位,代表的含义如下

00|XXXXXX 剩余的6位表示数据长度
01|XXXXXX XXXXXXXX 剩余的6位+下个字节的8位共14位表示数据长度
10|000000 下个字节开始的32位表示数据长度
10|000001 下个字节开始的64位表示数据长度大小
11|XXXXXX 特殊编码,具体是那种编码,取决于剩余6位的值,根据值不同分为以下几种:

0:8位整形
1:16位整形
2:32位整形
3:压缩的字符串

文件构成

  • 以REDIS+4位RDB_VERSION开始,然后依次写入
  • 辅助信息,包括redis-verredis-bitsctimeused-memaof-preamble字段,如果是在主从同步的场景的话,会额外包含repl-stream-dbrepl-idrepl-offset字段。
  • 选择DB操作
  • DB索引
  • 设置DB大小操作
  • DB存储hash大小
  • DB过期hash大小
  • 遍历该DB下所有存储节点,依次写入
  • 过期时间操作&过期时间(如果设置过期)
  • lru操作&信息(如果redis达到最大内存时的淘汰策略为lru)
  • lfu操作&信息(如果redis达到最大内存时的淘汰策略为lfu)
  • value对象类型
  • Key对象(都是string)
  • value对象
  • lua脚本信息
  • 结束操作
  • 8个字节的数据校验

示例解析

接下来,以一个简单的rdb文件为例,执行rdb时,redis只有一个(key,value)为(a,100)

  • 前9个字节,是正常的REDIS+RDB_VERSION,通过后4位可以看出RDB_VERSION版本是9
  • 第10个字节是FA,表示写入辅助信息
  • 第11个字节是09,表示接下来是9个字节的字符串,为辅助信息的key
  • 第12-20个字节是redis-ver,确定key
  • 第21个字节是05,表示接下来是5个字节的字符串,为辅助信息的value
  • 第22-26个字节是5.0.3,确定value
  • 第27个字节是FA,表示写入辅助信息
  • 第28个字节是0A,表示接下来是10个字节的字符串,为辅助信息的value
  • 第29-38个字节是redis-bits,确定key
  • 第39个字节是C0,二进制表示为1100000011表示为特殊编码类型,后四位0000表示8位无符号整形,综合起来,下一个字节是无符号整形
  • 第40个字节是40,10进制表示为64,确定value,结合key意思为redis-bits是64位
  • 后面一直到第83个字节,都是如此,依次表示辅助信息ctimeused-memaof-preamble
  • 第68字节是FE,表示选择DB操作
  • 第69字节是00,表示DB索引是0
  • 第70个字节是FB,表示设置DB大小操作
  • 第71、72字节是0100,分别表示DB存储hash大小为1和过期hash大小为0
  • 第73个字节是00表示value类型是string
  • 第74个字节是01,表示key的长度
  • 第75个字节是61,确定key的值为a
  • 第76个字节是C0,表示8位无符号整形特殊编码
  • 第77个字节是64,确定值为100
  • 第78个字节是FF,表示RDB结束
  • 最后79-86共8个字节,是数据校验的结果

源码分析

rdbWriteRaw

往rdb中写入原生数据,各种类型的写入都是先进行转换,最后调用该方法

1
2
3
4
5
static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
if (rdb && rioWrite(rdb,p,len) == 0)
return -1;
return len;
}

rdbLoadRaw

从rdb中读取原生数据

1
2
3
4
5
6
7
8
void rdbLoadRaw(rio *rdb, void *buf, uint64_t len) {
if (rioRead(rdb,buf,len) == 0) {
rdbExitReportCorruptRDB(
"Impossible to read %llu bytes in rdbLoadRaw()",
(unsigned long long) len);
return;
}
}

rdbSaveType

保存RDB定义的操作类型

1
2
3
int rdbSaveType(rio *rdb, unsigned char type) {
return rdbWriteRaw(rdb,&type,1);
}

rdbLoadType

读取RDB操作类型

1
2
3
4
5
int rdbLoadType(rio *rdb) {
unsigned char type;
if (rioRead(rdb,&type,1) == 0) return -1;
return type;
}

rdbLoadTime

读取时间,适用于老版本使用RDB_OPCODE_EXPIRETIME标识过期时间,因此只有读方法,不再提供写方法

1
2
3
4
5
time_t rdbLoadTime(rio *rdb) {
int32_t t32;
rdbLoadRaw(rdb,&t32,4);
return (time_t)t32;
}

rdbSaveMillisecondTime

保存毫秒时间

1
2
3
4
5
int rdbSaveMillisecondTime(rio *rdb, long long t) {
int64_t t64 = (int64_t) t;
memrev64ifbe(&t64); //小端存储
return rdbWriteRaw(rdb,&t64,8);
}

rdbLoadMillisecondTime

读取毫秒时间

1
2
3
4
5
6
7
long long rdbLoadMillisecondTime(rio *rdb, int rdbver) {
int64_t t64;
rdbLoadRaw(rdb,&t64,8);
if (rdbver >= 9) /* Check the top comment of this function. */
memrev64ifbe(&t64); /* Convert in big endian if the system is BE. */
return (long long)t64;
}

rdbSaveLen

保存长度值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int rdbSaveLen(rio *rdb, uint64_t len) {
unsigned char buf[2];
size_t nwritten;

if (len < (1<<6)) {
/* Save a 6 bit len */
buf[0] = (len&0xFF)|(RDB_6BITLEN<<6);
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
nwritten = 1;
} else if (len < (1<<14)) {
/* Save a 14 bit len */
buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6);
buf[1] = len&0xFF;
if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
nwritten = 2;
} else if (len <= UINT32_MAX) {
/* Save a 32 bit len */
buf[0] = RDB_32BITLEN;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
uint32_t len32 = htonl(len);
if (rdbWriteRaw(rdb,&len32,4) == -1) return -1;
nwritten = 1+4;
} else {
/* Save a 64 bit len */
buf[0] = RDB_64BITLEN;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
len = htonu64(len);
if (rdbWriteRaw(rdb,&len,8) == -1) return -1;
nwritten = 1+8;
}
return nwritten;
}

rdbLoadLenByRef

读取数据长度,isencoded存储是否是额外编码,lenptr表示数据长度或者额外编码的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr) {
unsigned char buf[2];
int type;

if (isencoded) *isencoded = 0;
if (rioRead(rdb,buf,1) == 0) return -1;
type = (buf[0]&0xC0)>>6;
if (type == RDB_ENCVAL) {
/* Read a 6 bit encoding type. */
if (isencoded) *isencoded = 1;
*lenptr = buf[0]&0x3F;
} else if (type == RDB_6BITLEN) {
/* Read a 6 bit len. */
*lenptr = buf[0]&0x3F;
} else if (type == RDB_14BITLEN) {
/* Read a 14 bit len. */
if (rioRead(rdb,buf+1,1) == 0) return -1;
*lenptr = ((buf[0]&0x3F)<<8)|buf[1];
} else if (buf[0] == RDB_32BITLEN) {
/* Read a 32 bit len. */
uint32_t len;
if (rioRead(rdb,&len,4) == 0) return -1;
*lenptr = ntohl(len);
} else if (buf[0] == RDB_64BITLEN) {
/* Read a 64 bit len. */
uint64_t len;
if (rioRead(rdb,&len,8) == 0) return -1;
*lenptr = ntohu64(len);
} else {
rdbExitReportCorruptRDB(
"Unknown length encoding %d in rdbLoadLen()",type);
return -1; /* Never reached. */
}
return 0;
}

rdbLoadLen

读取数据长度

1
2
3
4
5
6
uint64_t rdbLoadLen(rio *rdb, int *isencoded) {
uint64_t len;

if (rdbLoadLenByRef(rdb,isencoded,&len) == -1) return RDB_LENERR;
return len;
}

rdbEncodeInteger

对整形数字进行编码处理,根据值大小,结果存储在enc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int rdbEncodeInteger(long long value, unsigned char *enc) {
if (value >= -(1<<7) && value <= (1<<7)-1) {
enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT8;
enc[1] = value&0xFF;
return 2;
} else if (value >= -(1<<15) && value <= (1<<15)-1) {
enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT16;
enc[1] = value&0xFF;
enc[2] = (value>>8)&0xFF;
return 3;
} else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
enc[0] = (RDB_ENCVAL<<6)|RDB_ENC_INT32;
enc[1] = value&0xFF;
enc[2] = (value>>8)&0xFF;
enc[3] = (value>>16)&0xFF;
enc[4] = (value>>24)&0xFF;
return 5;
} else {
return 0;
}
}

rdbLoadIntegerObject

从rdb文件中读取一个整形对象,enctype表示按照哪种编码读,根据flag的标志位决定返回类型,具体返回情况参照rdbGenericLoadStringObject说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void *rdbLoadIntegerObject(rio *rdb, int enctype, int flags, size_t *lenptr) {
int plain = flags & RDB_LOAD_PLAIN;
int sds = flags & RDB_LOAD_SDS;
int encode = flags & RDB_LOAD_ENC;
unsigned char enc[4];
long long val;

if (enctype == RDB_ENC_INT8) {
if (rioRead(rdb,enc,1) == 0) return NULL;
val = (signed char)enc[0];
} else if (enctype == RDB_ENC_INT16) {
uint16_t v;
if (rioRead(rdb,enc,2) == 0) return NULL;
v = enc[0]|(enc[1]<<8);
val = (int16_t)v;
} else if (enctype == RDB_ENC_INT32) {
uint32_t v;
if (rioRead(rdb,enc,4) == 0) return NULL;
v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
val = (int32_t)v;
} else {
val = 0; /* anti-warning */
rdbExitReportCorruptRDB("Unknown RDB integer encoding type %d",enctype);
}
if (plain || sds) {
char buf[LONG_STR_SIZE], *p;
int len = ll2string(buf,sizeof(buf),val);
if (lenptr) *lenptr = len;
p = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT,len);
memcpy(p,buf,len);
return p;
} else if (encode) {
return createStringObjectFromLongLongForValue(val);
} else {
return createObject(OBJ_STRING,sdsfromlonglong(val));
}
}

rdbTryIntegerEncoding

尝试将字符串转换成integer类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
long long value;
char *endptr, buf[32];

// 尝试调用系统方法转换,当整个字符串都有效时,&endptr为'\0',通常&endptr存储的是第一个无效字符的地址
value = strtoll(s, &endptr, 10);
if (endptr[0] != '\0') return 0;
ll2string(buf,32,value);

// 将转换完之后的整形再转换成字符串和原始数据比对,如果不一致的话,也是失败
if (strlen(buf) != len || memcmp(buf,s,len)) return 0;

// 转换整形,结果存放在enc中
return rdbEncodeInteger(value,enc);
}

rdbSaveLzfBlob

保存lzf压缩过的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ssize_t rdbSaveLzfBlob(rio *rdb, void *data, size_t compress_len,
size_t original_len) {
unsigned char byte;
ssize_t n, nwritten = 0;

// 保存编码类型
byte = (RDB_ENCVAL<<6)|RDB_ENC_LZF;
if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;
nwritten += n;

// 依次写入压缩后长度、原始长度、数据
if ((n = rdbSaveLen(rdb,compress_len)) == -1) goto writeerr;
nwritten += n;

if ((n = rdbSaveLen(rdb,original_len)) == -1) goto writeerr;
nwritten += n;

if ((n = rdbWriteRaw(rdb,data,compress_len)) == -1) goto writeerr;
nwritten += n;

return nwritten;

writeerr:
return -1;
}

rdbSaveLzfStringObject

保存lzf字符串对象,要求字符串长度不低于5,否则没有压缩的必要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ssize_t rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
size_t comprlen, outlen;
void *out;

// 要求至少4个字节
if (len <= 4) return 0;
outlen = len-4;
if ((out = zmalloc(outlen+1)) == NULL) return 0;

// 压缩数据
comprlen = lzf_compress(s, len, out, outlen);
if (comprlen == 0) {
zfree(out);
return 0;
}

// 写入数据
ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len);
zfree(out);
return nwritten;
}

rdbLoadLzfStringObject

从rdb文件中读取lzf字符串对象,返回对象类型取决于flags标志位,参照rdbGenericLoadStringObject说明。此方法是建立在已经确认当前数据是lzf编码的基础上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
int plain = flags & RDB_LOAD_PLAIN;
int sds = flags & RDB_LOAD_SDS;
uint64_t len, clen;
unsigned char *c = NULL;
char *val = NULL;

// 已经确认数据是lzf编码,因此直接读取压缩后长度、原始长度、压缩数据
if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if ((c = zmalloc(clen)) == NULL) goto err;

/* Allocate our target according to the uncompressed size. */
if (plain) {
val = zmalloc(len);
} else {
val = sdsnewlen(SDS_NOINIT,len);
}
if (lenptr) *lenptr = len;

// 读取数据&解压
if (rioRead(rdb,c,clen) == 0) goto err;
if (lzf_decompress(c,clen,val,len) == 0) {
if (rdbCheckMode) rdbCheckSetError("Invalid LZF compressed string");
goto err;
}
zfree(c);

if (plain || sds) {
return val;
} else {
return createObject(OBJ_STRING,val);
}
err:
zfree(c);
if (plain)
zfree(val);
else
sdsfree(val);
return NULL;
}

rdbSaveRawString

保存字符串内容,会先尝试把字符串转换成整形以节省空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
int enclen;
ssize_t n, nwritten = 0;

// 尝试转换整形
if (len <= 11) {
// 转换成整形的话,最多是5个字节(1字节表示编码类型32长度,4个字节存储内容)
unsigned char buf[5];
if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
return enclen;
}
}

// 尝试压缩数据,数据长度低于20的话,跳过
if (server.rdb_compression && len > 20) {
n = rdbSaveLzfStringObject(rdb,s,len);
if (n == -1) return -1;
if (n > 0) return n;
// n等于0时,表示无法压缩,使用原始方法存储
}

// 原始方法,保存长度&原始内容
if ((n = rdbSaveLen(rdb,len)) == -1) return -1;
nwritten += n;
if (len > 0) {
if (rdbWriteRaw(rdb,s,len) == -1) return -1;
nwritten += len;
}
return nwritten;
}

rdbSaveLongLongAsStringObject

保存长整形数据,会先尝试能否按照整形存储,否则转换成字符串存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ssize_t rdbSaveLongLongAsStringObject(rio *rdb, long long value) {
unsigned char buf[32];
ssize_t n, nwritten = 0;
int enclen = rdbEncodeInteger(value,buf);

// 可以转成整形,按照整形存储
if (enclen > 0) {
return rdbWriteRaw(rdb,buf,enclen);
} else {
// 超出整形范围,转成成字符串存储
enclen = ll2string((char*)buf,32,value);
serverAssert(enclen < 32);
if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1;
nwritten += n;
if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1;
nwritten += n;
}
return nwritten;
}

rdbSaveStringObject

保存redis字符串对象,根据字符串编码类型不同,处理也有所不同。如果是数值型编码,按照整形,转换成字符串原始数据的顺序尝试存储,其余类型则按照整形压缩原始数据的顺序存储,二者之间的差距就是数值型编码不会尝试压缩处理。

1
2
3
4
5
6
7
8
9
10
ssize_t rdbSaveStringObject(rio *rdb, robj *obj) {
/* Avoid to decode the object, then encode it again, if the
* object is already integer encoded. */
if (obj->encoding == OBJ_ENCODING_INT) {
return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
} else {
serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj));
return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));
}
}

rdbGenericLoadStringObject

从rdb文件中读取字符串对象,flags决定了返回的对象类型,flags可以是一下几种的结合:

RDB_LOAD_NONE: 返回一个rdb对象,不用任何编码
RDB_LOAD_ENC: 如果返回是一个redis对象,尝试特殊对其进行特殊编码
RDB_LOAD_PLAIN: 返回一个字符串是zmalloc创建的而非redis对象
RDB_LOAD_SDS: 返回一个SDS字符串而非redis对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
int encode = flags & RDB_LOAD_ENC;
int plain = flags & RDB_LOAD_PLAIN;
int sds = flags & RDB_LOAD_SDS;
int isencoded;
uint64_t len;

// 先读取一个字节,看数据采用了何种编码
len = rdbLoadLen(rdb,&isencoded);

// 特殊编码情况,11XXXX开始
if (isencoded) {
switch(len) {
case RDB_ENC_INT8:
case RDB_ENC_INT16:
case RDB_ENC_INT32:
return rdbLoadIntegerObject(rdb,len,flags,lenptr);
case RDB_ENC_LZF:
return rdbLoadLzfStringObject(rdb,flags,lenptr);
default:
rdbExitReportCorruptRDB("Unknown RDB string encoding type %d",len);
}
}

// 非采用rdb规定的编码情况
if (len == RDB_LENERR) return NULL;

// 返回字符串
if (plain || sds) {
void *buf = plain ? zmalloc(len) : sdsnewlen(SDS_NOINIT,len);
if (lenptr) *lenptr = len;
if (len && rioRead(rdb,buf,len) == 0) {
if (plain)
zfree(buf);
else
sdsfree(buf);
return NULL;
}
return buf;
} else {
// 采用embstr编码还是原始数据
robj *o = encode ? createStringObject(SDS_NOINIT,len) :
createRawStringObject(SDS_NOINIT,len);
if (len && rioRead(rdb,o->ptr,len) == 0) {
decrRefCount(o);
return NULL;
}
return o;
}
}

rdbLoadStringObject

从rdb文件中读取string对象,不采用任何编码

1
2
3
robj *rdbLoadStringObject(rio *rdb) {
return rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
}

rdbLoadEncodedStringObject

从rdb中读取string对象,尝试采用embstr编码

1
2
3
robj *rdbLoadEncodedStringObject(rio *rdb) {
return rdbGenericLoadStringObject(rdb,RDB_LOAD_ENC,NULL);
}

rdbSaveDoubleValue

往rdb中保存double类型数据,转换成字符串存储,其首个字节表示类型,如下:

253: 不是一个数字 NAN 0.0/0.0
254: 正无穷大 +INF 1.0/0.0
255: 负无穷大 -INF -1.0/0.0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int rdbSaveDoubleValue(rio *rdb, double val) {
unsigned char buf[128];
int len;

if (isnan(val)) {
buf[0] = 253;
len = 1;
} else if (!isfinite(val)) {
len = 1;
buf[0] = (val < 0) ? 255 : 254;
} else {
#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL)
/* Check if the float is in a safe range to be casted into a
* long long. We are assuming that long long is 64 bit here.
* Also we are assuming that there are no implementations around where
* double has precision < 52 bit.
*
* Under this assumptions we test if a double is inside an interval
* where casting to long long is safe. Then using two castings we
* make sure the decimal part is zero. If all this is true we use
* integer printing function that is much faster. */
double min = -4503599627370495; /* (2^52)-1 */
double max = 4503599627370496; /* -(2^52) */
if (val > min && val < max && val == ((double)((long long)val)))
ll2string((char*)buf+1,sizeof(buf)-1,(long long)val);
else
#endif
snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
buf[0] = strlen((char*)buf+1);
len = buf[0]+1;
}
return rdbWriteRaw(rdb,buf,len);
}

rdbLoadDoubleValue

从rdb读取double类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int rdbLoadDoubleValue(rio *rdb, double *val) {
char buf[256];
unsigned char len;

// 先读取类型,处理NAN、INF的情况
if (rioRead(rdb,&len,1) == 0) return -1;
switch(len) {
case 255: *val = R_NegInf; return 0;
case 254: *val = R_PosInf; return 0;
case 253: *val = R_Nan; return 0;
default:
// 读取正常数值
if (rioRead(rdb,buf,len) == 0) return -1;
buf[len] = '\0';
sscanf(buf, "%lg", val);
return 0;
}
}

rdbSaveBinaryDoubleValue

保存二进制格式的double数据

1
2
3
4
int rdbSaveBinaryDoubleValue(rio *rdb, double val) {
memrev64ifbe(&val);
return rdbWriteRaw(rdb,&val,sizeof(val));
}

rdbLoadBinaryDoubleValue

从rdb读取二进制格式double类型

1
2
3
4
5
int rdbLoadBinaryDoubleValue(rio *rdb, double *val) {
if (rioRead(rdb,val,sizeof(*val)) == 0) return -1;
memrev64ifbe(val);
return 0;
}

rdbSaveBinaryFloatValue

保存二进制格式的float数据

1
2
3
4
int rdbSaveBinaryFloatValue(rio *rdb, float val) {
memrev32ifbe(&val);
return rdbWriteRaw(rdb,&val,sizeof(val));
}

rdbLoadBinaryFloatValue

从rdb读取二进制格式float类型

1
2
3
4
5
int rdbLoadBinaryFloatValue(rio *rdb, float *val) {
if (rioRead(rdb,val,sizeof(*val)) == 0) return -1;
memrev32ifbe(val);
return 0;
}

rdbSaveObjectType

保存redis对象类型,从redis对象类型映射到rdb对象类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int rdbSaveObjectType(rio *rdb, robj *o) {
switch (o->type) {
case OBJ_STRING:
return rdbSaveType(rdb,RDB_TYPE_STRING);
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST)
return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
else
serverPanic("Unknown list encoding");
case OBJ_SET:
if (o->encoding == OBJ_ENCODING_INTSET)
return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_SET);
else
serverPanic("Unknown set encoding");
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
else
serverPanic("Unknown sorted set encoding");
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM:
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
serverPanic("Unknown object type");
}
return -1; /* avoid warning */
}

rdbLoadObjectType

从rdb中读取对象类型

1
2
3
4
5
6
int rdbLoadObjectType(rio *rdb) {
int type;
if ((type = rdbLoadType(rdb)) == -1) return -1;
if (!rdbIsObjectType(type)) return -1;
return type;
}

rdbSaveStreamPEL

序列化消费者待处理列表,并写入rdb文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
ssize_t rdbSaveStreamPEL(rio *rdb, rax *pel, int nacks) {
ssize_t n, nwritten = 0;

/* Number of entries in the PEL. */
if ((n = rdbSaveLen(rdb,raxSize(pel))) == -1) return -1;
nwritten += n;

/* Save each entry. */
raxIterator ri;
raxStart(&ri,pel);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
/* We store IDs in raw form as 128 big big endian numbers, like
* they are inside the radix tree key. */
if ((n = rdbWriteRaw(rdb,ri.key,sizeof(streamID))) == -1) return -1;
nwritten += n;

if (nacks) {
streamNACK *nack = ri.data;
if ((n = rdbSaveMillisecondTime(rdb,nack->delivery_time)) == -1)
return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,nack->delivery_count)) == -1) return -1;
nwritten += n;
/* We don't save the consumer name: we'll save the pending IDs
* for each consumer in the consumer PEL, and resolve the consumer
* at loading time. */
}
}
raxStop(&ri);
return nwritten;
}

rdbSaveStreamConsumers

序列化流式消费者,并保存到rdb中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
ssize_t n, nwritten = 0;

/* Number of consumers in this consumer group. */
if ((n = rdbSaveLen(rdb,raxSize(cg->consumers))) == -1) return -1;
nwritten += n;

/* Save each consumer. */
raxIterator ri;
raxStart(&ri,cg->consumers);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamConsumer *consumer = ri.data;

/* Consumer name. */
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
nwritten += n;

/* Last seen time. */
if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1)
return -1;
nwritten += n;

/* Consumer PEL, without the ACKs (see last parameter of the function
* passed with value of 0), at loading time we'll lookup the ID
* in the consumer group global PEL and will put a reference in the
* consumer local PEL. */
if ((n = rdbSaveStreamPEL(rdb,consumer->pel,0)) == -1)
return -1;
nwritten += n;
}
raxStop(&ri);
return nwritten;
}

rdbSaveObject

保存redis对象内容,分为string,list,set,zset,hash,stream,module类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
ssize_t rdbSaveObject(rio *rdb, robj *o) {
ssize_t n = 0, nwritten = 0;

if (o->type == OBJ_STRING) {
/* Save a string value */
if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
nwritten += n;
} else if (o->type == OBJ_LIST) {
/* Save a list value */
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
quicklistNode *node = ql->head;

if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
nwritten += n;

while(node) {
if (quicklistNodeIsCompressed(node)) {
void *data;
size_t compress_len = quicklistGetLzf(node, &data);
if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
nwritten += n;
} else {
if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
nwritten += n;
}
node = node->next;
}
} else {
serverPanic("Unknown list encoding");
}
} else if (o->type == OBJ_SET) {
/* Save a set value */
if (o->encoding == OBJ_ENCODING_HT) {
dict *set = o->ptr;
dictIterator *di = dictGetIterator(set);
dictEntry *de;

if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) {
dictReleaseIterator(di);
return -1;
}
nwritten += n;

while((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
== -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
}
dictReleaseIterator(di);
} else if (o->encoding == OBJ_ENCODING_INTSET) {
size_t l = intsetBlobLen((intset*)o->ptr);

if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
} else {
serverPanic("Unknown set encoding");
}
} else if (o->type == OBJ_ZSET) {
/* Save a sorted set value */
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
size_t l = ziplistBlobLen((unsigned char*)o->ptr);

if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
zskiplist *zsl = zs->zsl;

if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
nwritten += n;

/* We save the skiplist elements from the greatest to the smallest
* (that's trivial since the elements are already ordered in the
* skiplist): this improves the load process, since the next loaded
* element will always be the smaller, so adding to the skiplist
* will always immediately stop at the head, making the insertion
* O(1) instead of O(log(N)). */
zskiplistNode *zn = zsl->tail;
while (zn != NULL) {
if ((n = rdbSaveRawString(rdb,
(unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
{
return -1;
}
nwritten += n;
if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
return -1;
nwritten += n;
zn = zn->backward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
} else if (o->type == OBJ_HASH) {
/* Save a hash value */
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
size_t l = ziplistBlobLen((unsigned char*)o->ptr);

if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
nwritten += n;

} else if (o->encoding == OBJ_ENCODING_HT) {
dictIterator *di = dictGetIterator(o->ptr);
dictEntry *de;

if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {
dictReleaseIterator(di);
return -1;
}
nwritten += n;

while((de = dictNext(di)) != NULL) {
sds field = dictGetKey(de);
sds value = dictGetVal(de);

if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
sdslen(field))) == -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
sdslen(value))) == -1)
{
dictReleaseIterator(di);
return -1;
}
nwritten += n;
}
dictReleaseIterator(di);
} else {
serverPanic("Unknown hash encoding");
}
} else if (o->type == OBJ_STREAM) {
/* Store how many listpacks we have inside the radix tree. */
stream *s = o->ptr;
rax *rax = s->rax;
if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
nwritten += n;

/* Serialize all the listpacks inside the radix tree as they are,
* when loading back, we'll use the first entry of each listpack
* to insert it back into the radix tree. */
raxIterator ri;
raxStart(&ri,rax);
raxSeek(&ri,"^",NULL,0);
while (raxNext(&ri)) {
unsigned char *lp = ri.data;
size_t lp_bytes = lpBytes(lp);
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
nwritten += n;
}
raxStop(&ri);

/* Save the number of elements inside the stream. We cannot obtain
* this easily later, since our macro nodes should be checked for
* number of items: not a great CPU / space tradeoff. */
if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
nwritten += n;
/* Save the last entry ID. */
if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
nwritten += n;

/* The consumer groups and their clients are part of the stream
* type, so serialize every consumer group. */

/* Save the number of groups. */
size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1;
nwritten += n;

if (num_cgroups) {
/* Serialize each consumer group. */
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;

/* Save the group name. */
if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1)
return -1;
nwritten += n;

/* Last ID. */
if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1;
nwritten += n;

/* Save the global PEL. */
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1;
nwritten += n;

/* Save the consumers of this group. */
if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1;
nwritten += n;
}
raxStop(&ri);
}
} else if (o->type == OBJ_MODULE) {
/* Save a module-specific value. */
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitIOContext(io,mt,rdb);

/* Write the "module" identifier as prefix, so that we'll be able
* to call the right module during loading. */
int retval = rdbSaveLen(rdb,mt->id);
if (retval == -1) return -1;
io.bytes += retval;

/* Then write the module-specific representation + EOF marker. */
mt->rdb_save(&io,mv->value);
retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
if (retval == -1) return -1;
io.bytes += retval;

if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
}
return io.error ? -1 : (ssize_t)io.bytes;
} else {
serverPanic("Unknown object type");
}
return nwritten;
}

rdbSavedObjectLen

获取redis对象如果写入文件时,所需要的长度,为了懒省事就用rdbSaveObject方法,rio参数传了NULL,这样只返回长度,实际上是没有写入到文件的

1
2
3
4
5
size_t rdbSavedObjectLen(robj *o) {
ssize_t len = rdbSaveObject(NULL,o);
serverAssertWithInfo(NULL,o,len != -1);
return len;
}

rdbSaveKeyValuePair

写入一对key、value数据到rdb中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

// 保存过期时间
if (expiretime != -1) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}

// 保存LRU信息
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000; /* Using seconds is enough and requires less space.*/
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}

// 保存LFU信息
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);
/* We can encode this in exactly two bytes: the opcode and an 8
* bit counter, since the frequency is logarithmic with a 0-255 range.
* Note that we do not store the halving time because to reset it
* a single time when loading does not affect the frequency much. */
if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}

//保存value类型,key,value
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val) == -1) return -1;
return 1;
}

rdbSaveAuxField

写入一组辅助信息到rdb中

1
2
3
4
5
6
7
8
9
10
11
12
13
ssize_t rdbSaveAuxField(rio *rdb, void *key, size_t keylen, void *val, size_t vallen) {
ssize_t ret, len = 0;
// 先写入标识标识次信息是辅助信息
if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;
len += ret;
// 写入key
if ((ret = rdbSaveRawString(rdb,key,keylen)) == -1) return -1;
len += ret;
// 写入value
if ((ret = rdbSaveRawString(rdb,val,vallen)) == -1) return -1;
len += ret;
return len;
}

rdbSaveAuxFieldStrStr

写入一对辅助信息到rdb文件,此方法应用于key,value可以用strlen确定长度的场景,否则用rdbSaveAuxField

1
2
3
ssize_t rdbSaveAuxFieldStrStr(rio *rdb, char *key, char *val) {
return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val));
}

rdbSaveAuxFieldStrInt

写入一对辅助信息到rdb文件,如果辅助信息的value是long long类型,最终还是转换成string存储

1
2
3
4
5
ssize_t rdbSaveAuxFieldStrInt(rio *rdb, char *key, long long val) {
char buf[LONG_STR_SIZE];
int vlen = ll2string(buf,sizeof(buf),val);
return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen);
}

rdbSaveInfoAuxFields

写入所有辅助信息到rdb文件中,写入辅助信息的入口,写入的信息包括redis-ver,redis-bits,ctime,used-mem,repl-stream-db,repl-id,repl-offset,aof-preamble

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
int aof_preamble = (flags & RDB_SAVE_AOF_PREAMBLE) != 0;

/* Add a few fields about the state when the RDB was created. */
if (rdbSaveAuxFieldStrStr(rdb,"redis-ver",REDIS_VERSION) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"redis-bits",redis_bits) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"ctime",time(NULL)) == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"used-mem",zmalloc_used_memory()) == -1) return -1;

/* Handle saving options that generate aux fields. */
if (rsi) {
if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
== -1) return -1;
if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
== -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
== -1) return -1;
}
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
return 1;
}

rdbSaveRio

rdb真正干事情的入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
uint64_t cksum;
size_t processed = 0;

if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
//写入 REDISXXXX(版本号) 开始
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;

//写入辅助信息
if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;

//遍历所有DB,依次写入每个DB
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);

// 写入select标识
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;

// 写入DB大小标识
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

// 遍历DB保存数据的hash表,保存每一对key,value
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;

initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;

// 如果此操作是aof需要的,处理一定数据量之后需要去读取一下aof父进程发送的差异数据
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}

// 保存lua脚本信息
if (rsi && dictSize(server.lua_scripts)) {
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}

// 写入RDB结束标识
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

// 写入校验码
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;

werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}

rdbSaveRioWithEOFMark

在正常的rdb内容前后加入一个前缀和后缀,前缀是$EOF:<40 bytes unguessable hex string>\r\n,后缀是<40 bytes unguessable hex string>,后缀是用来标识结束位置。这个方法主要是用在主从同步的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];

getRandomHexChars(eofmark,RDB_EOF_MARK_SIZE);
if (error) *error = 0;
if (rioWrite(rdb,"$EOF:",5) == 0) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb,"\r\n",2) == 0) goto werr;
if (rdbSaveRio(rdb,error,RDB_SAVE_NONE,rsi) == C_ERR) goto werr;
if (rioWrite(rdb,eofmark,RDB_EOF_MARK_SIZE) == 0) goto werr;
return C_OK;

werr: /* Write error. */
/* Set 'error' only if not already set by rdbSaveRio() call. */
if (error && *error == 0) *error = errno;
return C_ERR;
}

rdbSave

往文件中写入rdb内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
int rdbSave(char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp;
rio rdb;
int error = 0;

// 定义临时文件
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the RDB file %s (in server root dir %s) "
"for saving: %s",
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}

// 用文件初始化rio
rioInitWithFile(&rdb,fp);

// 设置sync策略
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

// 执行rdb操作
if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}

// 手动flush,确保os层面的buffer都写入
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;

// 重命名文件 临时->要求
if (rename(tmpfile,filename) == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Error moving temp DB file %s on the final "
"destination %s (in server root dir %s): %s",
tmpfile,
filename,
cwdp ? cwdp : "unknown",
strerror(errno));
unlink(tmpfile);
return C_ERR;
}

serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
return C_OK;

werr:
serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
}

rdbSaveBackground

创建子进程进行rdb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
pid_t childpid;
long long start;

if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
openChildInfoPipe();

start = ustime();
if ((childpid = fork()) == 0) {
int retval;

// 子进程
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);

if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}

server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
// 父进程
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
closeChildInfoPipe();
server.lastbgsave_status = C_ERR;
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return C_ERR;
}
serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
updateDictResizePolicy();
return C_OK;
}
return C_OK; /* unreached */
}

rdbRemoveTempFile

删除子进程生成的临时文件

1
2
3
4
5
6
void rdbRemoveTempFile(pid_t childpid) {
char tmpfile[256];

snprintf(tmpfile,sizeof(tmpfile),"temp-%d.rdb", (int) childpid);
unlink(tmpfile);
}

rdbLoadCheckModuleValue

读取check module数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
uint64_t opcode;
while((opcode = rdbLoadLen(rdb,NULL)) != RDB_MODULE_OPCODE_EOF) {
if (opcode == RDB_MODULE_OPCODE_SINT ||
opcode == RDB_MODULE_OPCODE_UINT)
{
uint64_t len;
if (rdbLoadLenByRef(rdb,NULL,&len) == -1) {
rdbExitReportCorruptRDB(
"Error reading integer from module %s value", modulename);
}
} else if (opcode == RDB_MODULE_OPCODE_STRING) {
robj *o = rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL);
if (o == NULL) {
rdbExitReportCorruptRDB(
"Error reading string from module %s value", modulename);
}
decrRefCount(o);
} else if (opcode == RDB_MODULE_OPCODE_FLOAT) {
float val;
if (rdbLoadBinaryFloatValue(rdb,&val) == -1) {
rdbExitReportCorruptRDB(
"Error reading float from module %s value", modulename);
}
} else if (opcode == RDB_MODULE_OPCODE_DOUBLE) {
double val;
if (rdbLoadBinaryDoubleValue(rdb,&val) == -1) {
rdbExitReportCorruptRDB(
"Error reading double from module %s value", modulename);
}
}
}
return createStringObject("module-dummy-value",18);
}

rdbLoadObject

从特殊文件读取redis对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
robj *rdbLoadObject(int rdbtype, rio *rdb) {
robj *o = NULL, *ele, *dec;
uint64_t len;
unsigned int i;

if (rdbtype == RDB_TYPE_STRING) {
/* Read string value */
if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
o = tryObjectEncoding(o);
} else if (rdbtype == RDB_TYPE_LIST) {
/* Read list value */
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;

o = createQuicklistObject();
quicklistSetOptions(o->ptr, server.list_max_ziplist_size,
server.list_compress_depth);

/* Load every single element of the list */
while(len--) {
if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
dec = getDecodedObject(ele);
size_t len = sdslen(dec->ptr);
quicklistPushTail(o->ptr, dec->ptr, len);
decrRefCount(dec);
decrRefCount(ele);
}
} else if (rdbtype == RDB_TYPE_SET) {
/* Read Set value */
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;

/* Use a regular set when there are too many entries. */
if (len > server.set_max_intset_entries) {
o = createSetObject();
/* It's faster to expand the dict to the right size asap in order
* to avoid rehashing */
if (len > DICT_HT_INITIAL_SIZE)
dictExpand(o->ptr,len);
} else {
o = createIntsetObject();
}

/* Load every single element of the set */
for (i = 0; i < len; i++) {
long long llval;
sds sdsele;

if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
== NULL) return NULL;

if (o->encoding == OBJ_ENCODING_INTSET) {
/* Fetch integer value from element. */
if (isSdsRepresentableAsLongLong(sdsele,&llval) == C_OK) {
o->ptr = intsetAdd(o->ptr,llval,NULL);
} else {
setTypeConvert(o,OBJ_ENCODING_HT);
dictExpand(o->ptr,len);
}
}

/* This will also be called when the set was just converted
* to a regular hash table encoded set. */
if (o->encoding == OBJ_ENCODING_HT) {
dictAdd((dict*)o->ptr,sdsele,NULL);
} else {
sdsfree(sdsele);
}
}
} else if (rdbtype == RDB_TYPE_ZSET_2 || rdbtype == RDB_TYPE_ZSET) {
/* Read list/set value. */
uint64_t zsetlen;
size_t maxelelen = 0;
zset *zs;

if ((zsetlen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
o = createZsetObject();
zs = o->ptr;

if (zsetlen > DICT_HT_INITIAL_SIZE)
dictExpand(zs->dict,zsetlen);

/* Load every single element of the sorted set. */
while(zsetlen--) {
sds sdsele;
double score;
zskiplistNode *znode;

if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
== NULL) return NULL;

if (rdbtype == RDB_TYPE_ZSET_2) {
if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL;
} else {
if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
}

/* Don't care about integer-encoded strings. */
if (sdslen(sdsele) > maxelelen) maxelelen = sdslen(sdsele);

znode = zslInsert(zs->zsl,score,sdsele);
dictAdd(zs->dict,sdsele,&znode->score);
}

/* Convert *after* loading, since sorted sets are not stored ordered. */
if (zsetLength(o) <= server.zset_max_ziplist_entries &&
maxelelen <= server.zset_max_ziplist_value)
zsetConvert(o,OBJ_ENCODING_ZIPLIST);
} else if (rdbtype == RDB_TYPE_HASH) {
uint64_t len;
int ret;
sds field, value;

len = rdbLoadLen(rdb, NULL);
if (len == RDB_LENERR) return NULL;

o = createHashObject();

/* Too many entries? Use a hash table. */
if (len > server.hash_max_ziplist_entries)
hashTypeConvert(o, OBJ_ENCODING_HT);

/* Load every field and value into the ziplist */
while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) {
len--;
/* Load raw strings */
if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
== NULL) return NULL;
if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
== NULL) return NULL;

/* Add pair to ziplist */
o->ptr = ziplistPush(o->ptr, (unsigned char*)field,
sdslen(field), ZIPLIST_TAIL);
o->ptr = ziplistPush(o->ptr, (unsigned char*)value,
sdslen(value), ZIPLIST_TAIL);

/* Convert to hash table if size threshold is exceeded */
if (sdslen(field) > server.hash_max_ziplist_value ||
sdslen(value) > server.hash_max_ziplist_value)
{
sdsfree(field);
sdsfree(value);
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
}
sdsfree(field);
sdsfree(value);
}

if (o->encoding == OBJ_ENCODING_HT && len > DICT_HT_INITIAL_SIZE)
dictExpand(o->ptr,len);

/* Load remaining fields and values into the hash table */
while (o->encoding == OBJ_ENCODING_HT && len > 0) {
len--;
/* Load encoded strings */
if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
== NULL) return NULL;
if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
== NULL) return NULL;

/* Add pair to hash table */
ret = dictAdd((dict*)o->ptr, field, value);
if (ret == DICT_ERR) {
rdbExitReportCorruptRDB("Duplicate keys detected");
}
}

/* All pairs should be read by now */
serverAssert(len == 0);
} else if (rdbtype == RDB_TYPE_LIST_QUICKLIST) {
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
o = createQuicklistObject();
quicklistSetOptions(o->ptr, server.list_max_ziplist_size,
server.list_compress_depth);

while (len--) {
unsigned char *zl =
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
if (zl == NULL) return NULL;
quicklistAppendZiplist(o->ptr, zl);
}
} else if (rdbtype == RDB_TYPE_HASH_ZIPMAP ||
rdbtype == RDB_TYPE_LIST_ZIPLIST ||
rdbtype == RDB_TYPE_SET_INTSET ||
rdbtype == RDB_TYPE_ZSET_ZIPLIST ||
rdbtype == RDB_TYPE_HASH_ZIPLIST)
{
unsigned char *encoded =
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
if (encoded == NULL) return NULL;
o = createObject(OBJ_STRING,encoded); /* Obj type fixed below. */

/* Fix the object encoding, and make sure to convert the encoded
* data type into the base type if accordingly to the current
* configuration there are too many elements in the encoded data
* type. Note that we only check the length and not max element
* size as this is an O(N) scan. Eventually everything will get
* converted. */
switch(rdbtype) {
case RDB_TYPE_HASH_ZIPMAP:
/* Convert to ziplist encoded hash. This must be deprecated
* when loading dumps created by Redis 2.4 gets deprecated. */
{
unsigned char *zl = ziplistNew();
unsigned char *zi = zipmapRewind(o->ptr);
unsigned char *fstr, *vstr;
unsigned int flen, vlen;
unsigned int maxlen = 0;

while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) {
if (flen > maxlen) maxlen = flen;
if (vlen > maxlen) maxlen = vlen;
zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL);
zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL);
}

zfree(o->ptr);
o->ptr = zl;
o->type = OBJ_HASH;
o->encoding = OBJ_ENCODING_ZIPLIST;

if (hashTypeLength(o) > server.hash_max_ziplist_entries ||
maxlen > server.hash_max_ziplist_value)
{
hashTypeConvert(o, OBJ_ENCODING_HT);
}
}
break;
case RDB_TYPE_LIST_ZIPLIST:
o->type = OBJ_LIST;
o->encoding = OBJ_ENCODING_ZIPLIST;
listTypeConvert(o,OBJ_ENCODING_QUICKLIST);
break;
case RDB_TYPE_SET_INTSET:
o->type = OBJ_SET;
o->encoding = OBJ_ENCODING_INTSET;
if (intsetLen(o->ptr) > server.set_max_intset_entries)
setTypeConvert(o,OBJ_ENCODING_HT);
break;
case RDB_TYPE_ZSET_ZIPLIST:
o->type = OBJ_ZSET;
o->encoding = OBJ_ENCODING_ZIPLIST;
if (zsetLength(o) > server.zset_max_ziplist_entries)
zsetConvert(o,OBJ_ENCODING_SKIPLIST);
break;
case RDB_TYPE_HASH_ZIPLIST:
o->type = OBJ_HASH;
o->encoding = OBJ_ENCODING_ZIPLIST;
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
default:
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
break;
}
} else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) {
o = createStreamObject();
stream *s = o->ptr;
uint64_t listpacks = rdbLoadLen(rdb,NULL);

while(listpacks--) {
/* Get the master ID, the one we'll use as key of the radix tree
* node: the entries inside the listpack itself are delta-encoded
* relatively to this ID. */
sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (sdslen(nodekey) != sizeof(streamID)) {
rdbExitReportCorruptRDB("Stream node key entry is not the "
"size of a stream ID");
}

/* Load the listpack. */
unsigned char *lp =
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
if (lp == NULL) return NULL;
unsigned char *first = lpFirst(lp);
if (first == NULL) {
/* Serialized listpacks should never be empty, since on
* deletion we should remove the radix tree key if the
* resulting listpack is empty. */
rdbExitReportCorruptRDB("Empty listpack inside stream");
}

/* Insert the key in the radix tree. */
int retval = raxInsert(s->rax,
(unsigned char*)nodekey,sizeof(streamID),lp,NULL);
sdsfree(nodekey);
if (!retval)
rdbExitReportCorruptRDB("Listpack re-added with existing key");
}
/* Load total number of items inside the stream. */
s->length = rdbLoadLen(rdb,NULL);
/* Load the last entry ID. */
s->last_id.ms = rdbLoadLen(rdb,NULL);
s->last_id.seq = rdbLoadLen(rdb,NULL);

/* Consumer groups loading */
size_t cgroups_count = rdbLoadLen(rdb,NULL);
while(cgroups_count--) {
/* Get the consumer group name and ID. We can then create the
* consumer group ASAP and populate its structure as
* we read more data. */
streamID cg_id;
sds cgname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (cgname == NULL) {
rdbExitReportCorruptRDB(
"Error reading the consumer group name from Stream");
}
cg_id.ms = rdbLoadLen(rdb,NULL);
cg_id.seq = rdbLoadLen(rdb,NULL);
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
if (cgroup == NULL)
rdbExitReportCorruptRDB("Duplicated consumer group name %s",
cgname);
sdsfree(cgname);

/* Load the global PEL for this consumer group, however we'll
* not yet populate the NACK structures with the message
* owner, since consumers for this group and their messages will
* be read as a next step. So for now leave them not resolved
* and later populate it. */
size_t pel_size = rdbLoadLen(rdb,NULL);
while(pel_size--) {
unsigned char rawid[sizeof(streamID)];
rdbLoadRaw(rdb,rawid,sizeof(rawid));
streamNACK *nack = streamCreateNACK(NULL);
nack->delivery_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
nack->delivery_count = rdbLoadLen(rdb,NULL);
if (!raxInsert(cgroup->pel,rawid,sizeof(rawid),nack,NULL))
rdbExitReportCorruptRDB("Duplicated gobal PEL entry "
"loading stream consumer group");
}

/* Now that we loaded our global PEL, we need to load the
* consumers and their local PELs. */
size_t consumers_num = rdbLoadLen(rdb,NULL);
while(consumers_num--) {
sds cname = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (cname == NULL) {
rdbExitReportCorruptRDB(
"Error reading the consumer name from Stream group");
}
streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
1);
sdsfree(cname);
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);

/* Load the PEL about entries owned by this specific
* consumer. */
pel_size = rdbLoadLen(rdb,NULL);
while(pel_size--) {
unsigned char rawid[sizeof(streamID)];
rdbLoadRaw(rdb,rawid,sizeof(rawid));
streamNACK *nack = raxFind(cgroup->pel,rawid,sizeof(rawid));
if (nack == raxNotFound)
rdbExitReportCorruptRDB("Consumer entry not found in "
"group global PEL");

/* Set the NACK consumer, that was left to NULL when
* loading the global PEL. Then set the same shared
* NACK structure also in the consumer-specific PEL. */
nack->consumer = consumer;
if (!raxInsert(consumer->pel,rawid,sizeof(rawid),nack,NULL))
rdbExitReportCorruptRDB("Duplicated consumer PEL entry "
" loading a stream consumer "
"group");
}
}
}
} else if (rdbtype == RDB_TYPE_MODULE || rdbtype == RDB_TYPE_MODULE_2) {
uint64_t moduleid = rdbLoadLen(rdb,NULL);
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];

if (rdbCheckMode && rdbtype == RDB_TYPE_MODULE_2) {
moduleTypeNameByID(name,moduleid);
return rdbLoadCheckModuleValue(rdb,name);
}

if (mt == NULL) {
moduleTypeNameByID(name,moduleid);
serverLog(LL_WARNING,"The RDB file contains module data I can't load: no matching module '%s'", name);
exit(1);
}
RedisModuleIO io;
moduleInitIOContext(io,mt,rdb);
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
/* Call the rdb_load method of the module providing the 10 bit
* encoding version in the lower 10 bits of the module ID. */
void *ptr = mt->rdb_load(&io,moduleid&1023);
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
}

/* Module v2 serialization has an EOF mark at the end. */
if (io.ver == 2) {
uint64_t eof = rdbLoadLen(rdb,NULL);
if (eof != RDB_MODULE_OPCODE_EOF) {
serverLog(LL_WARNING,"The RDB file contains module data for the module '%s' that is not terminated by the proper module value EOF marker", name);
exit(1);
}
}

if (ptr == NULL) {
moduleTypeNameByID(name,moduleid);
serverLog(LL_WARNING,"The RDB file contains module data for the module type '%s', that the responsible module is not able to load. Check for modules log above for additional clues.", name);
exit(1);
}
o = createModuleObject(mt,ptr);
} else {
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
}
return o;
}

startLoading

开始加载,设置全局变量

1
2
3
4
5
6
7
8
9
10
11
12
13
void startLoading(FILE *fp) {
struct stat sb;

/* Load the DB */
server.loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
if (fstat(fileno(fp), &sb) == -1) {
server.loading_total_bytes = 0;
} else {
server.loading_total_bytes = sb.st_size;
}
}

loadingProgress

更新加载进度

1
2
3
4
5
void loadingProgress(off_t pos) {
server.loading_loaded_bytes = pos;
if (server.stat_peak_memory < zmalloc_used_memory())
server.stat_peak_memory = zmalloc_used_memory();
}

stopLoading

停止加载,设置全局变量

1
2
3
void stopLoading(void) {
server.loading = 0;
}

rdbLoadProgressCallback

rdb加载回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
if (server.rdb_checksum)
rioGenericUpdateChecksum(r, buf, len);
if (server.loading_process_events_interval_bytes &&
(r->processed_bytes + len)/server.loading_process_events_interval_bytes > r->processed_bytes/server.loading_process_events_interval_bytes)
{
/* The DB can take some non trivial amount of time to load. Update
* our cached time since it is used to create and update the last
* interaction time with clients and for other important things. */
updateCachedTime();
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster();
loadingProgress(r->processed_bytes);
processEventsWhileBlocked();
}
}

rdbLoadRio

真正加载rdb文件的入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
uint64_t dbid;
int type, rdbver;
redisDb *db = server.db+0;
char buf[1024];

rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;

// 读取开头REDISXXXX 验证版本信息
if (rioRead(rdb,buf,9) == 0) goto eoferr;
buf[9] = '\0';
if (memcmp(buf,"REDIS",5) != 0) {
serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
errno = EINVAL;
return C_ERR;
}
rdbver = atoi(buf+5);
if (rdbver < 1 || rdbver > RDB_VERSION) {
serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
errno = EINVAL;
return C_ERR;
}

/* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
long long lru_clock = LRU_CLOCK();

while(1) {
robj *key, *val;

// 读取操作标识
if ((type = rdbLoadType(rdb)) == -1) goto eoferr;

// 判断操作类型

// 过期时间 格式:时间,key,value
if (type == RDB_OPCODE_EXPIRETIME) {
expiretime = rdbLoadTime(rdb);
expiretime *= 1000;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_EXPIRETIME_MS) {
// 过期时间毫秒
expiretime = rdbLoadMillisecondTime(rdb,rdbver);
continue;
} else if (type == RDB_OPCODE_FREQ) {
// LFU
uint8_t byte;
if (rioRead(rdb,&byte,1) == 0) goto eoferr;
lfu_freq = byte;
continue;
} else if (type == RDB_OPCODE_IDLE) {
// LRU
uint64_t qword;
if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
lru_idle = qword;
continue;
} else if (type == RDB_OPCODE_EOF) {
// RDB结束
break;
} else if (type == RDB_OPCODE_SELECTDB) {
// 选择DB,接下来是db索引
if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
if (dbid >= (unsigned)server.dbnum) {
serverLog(LL_WARNING,
"FATAL: Data file was created with a Redis "
"server configured to handle more than %d "
"databases. Exiting\n", server.dbnum);
exit(1);
}
db = server.db+dbid;
continue;
} else if (type == RDB_OPCODE_RESIZEDB) {
// 设置DB大小
uint64_t db_size, expires_size;
if ((db_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
dictExpand(db->dict,db_size);
dictExpand(db->expires,expires_size);
continue;
} else if (type == RDB_OPCODE_AUX) {
// 辅助信息,格式:key value
robj *auxkey, *auxval;
if ((auxkey = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
if ((auxval = rdbLoadStringObject(rdb)) == NULL) goto eoferr;

if (((char*)auxkey->ptr)[0] == '%') {
/* All the fields with a name staring with '%' are considered
* information fields and are logged at startup with a log
* level of NOTICE. */
serverLog(LL_NOTICE,"RDB '%s': %s",
(char*)auxkey->ptr,
(char*)auxval->ptr);
} else if (!strcasecmp(auxkey->ptr,"repl-stream-db")) {
if (rsi) rsi->repl_stream_db = atoi(auxval->ptr);
} else if (!strcasecmp(auxkey->ptr,"repl-id")) {
if (rsi && sdslen(auxval->ptr) == CONFIG_RUN_ID_SIZE) {
memcpy(rsi->repl_id,auxval->ptr,CONFIG_RUN_ID_SIZE+1);
rsi->repl_id_is_set = 1;
}
} else if (!strcasecmp(auxkey->ptr,"repl-offset")) {
if (rsi) rsi->repl_offset = strtoll(auxval->ptr,NULL,10);
} else if (!strcasecmp(auxkey->ptr,"lua")) {
/* Load the script back in memory. */
if (luaCreateFunction(NULL,server.lua,auxval) == NULL) {
rdbExitReportCorruptRDB(
"Can't load Lua script from RDB file! "
"BODY: %s", auxval->ptr);
}
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
serverLog(LL_DEBUG,"Unrecognized RDB AUX field: '%s'",
(char*)auxkey->ptr);
}

decrRefCount(auxkey);
decrRefCount(auxval);
continue;
} else if (type == RDB_OPCODE_MODULE_AUX) {
// 模块辅助信息
uint64_t moduleid = rdbLoadLen(rdb,NULL);
moduleType *mt = moduleTypeLookupModuleByID(moduleid);
char name[10];
moduleTypeNameByID(name,moduleid);

if (!rdbCheckMode && mt == NULL) {
/* Unknown module. */
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load: no matching module '%s'", name);
exit(1);
} else if (!rdbCheckMode && mt != NULL) {
/* This version of Redis actually does not know what to do
* with modules AUX data... */
serverLog(LL_WARNING,"The RDB file contains AUX module data I can't load for the module '%s'. Probably you want to use a newer version of Redis which implements aux data callbacks", name);
exit(1);
} else {
/* RDB check mode. */
robj *aux = rdbLoadCheckModuleValue(rdb,name);
decrRefCount(aux);
}
}

// 读取key
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
// 读取value
if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr;
// 检查是否过期,在redis启动使用rdb时
if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
} else {
//key是可用的,插入
dbAdd(db,key,val);

/* Set the expire time if needed */
if (expiretime != -1) setExpire(NULL,db,key,expiretime);

/* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);

/* Decrement the key refcount since dbAdd() will take its
* own reference. */
decrRefCount(key);
}

/* Reset the state that is key-specified and is populated by
* opcodes before the key, so that we start from scratch again. */
expiretime = -1;
lfu_freq = -1;
lru_idle = -1;
}

// 检查数据完整性
if (rdbver >= 5) {
uint64_t cksum, expected = rdb->cksum;

if (rioRead(rdb,&cksum,8) == 0) goto eoferr;
if (server.rdb_checksum) {
memrev64ifbe(&cksum);
if (cksum == 0) {
serverLog(LL_WARNING,"RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
serverLog(LL_WARNING,"Wrong RDB checksum. Aborting now.");
rdbExitReportCorruptRDB("RDB CRC error");
}
}
}
return C_OK;

eoferr: /* unexpected end of file is handled here with a fatal exit */
serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
rdbExitReportCorruptRDB("Unexpected EOF reading RDB file");
return C_ERR; /* Just to avoid warning */
}

rdbLoad

从指定文件中加载rdb

1
2
3
4
5
6
7
8
9
10
11
12
13
int rdbLoad(char *filename, rdbSaveInfo *rsi) {
FILE *fp;
rio rdb;
int retval;

if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoading(fp);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rsi,0);
fclose(fp);
stopLoading();
return retval;
}

backgroundSaveDoneHandlerDisk

以子进程执行rdb持久化时,子进程结束后,处理后续工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
// 成功:重新计算数据库更改次数`server.dirty`,归零保存开始时间,标记上一次操作状态为成功
if (!bysignal && exitcode == 0) {
serverLog(LL_NOTICE,
"Background saving terminated with success");
server.dirty = server.dirty - server.dirty_before_bgsave;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
} else if (!bysignal && exitcode != 0) {
serverLog(LL_WARNING, "Background saving error");
server.lastbgsave_status = C_ERR;
} else {
mstime_t latency;

serverLog(LL_WARNING,
"Background saving terminated by signal %d", bysignal);
latencyStartMonitor(latency);
rdbRemoveTempFile(server.rdb_child_pid);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* tirggering an error condition. */
if (bysignal != SIGUSR1)
server.lastbgsave_status = C_ERR;
}
server.rdb_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
server.rdb_save_time_start = -1;
/* Possibly there are slaves waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_DISK);
}

backgroundSaveDoneHandlerSocket

以子进程执行rdb,为了主从同步,子进程结束后,处理后续工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
uint64_t *ok_slaves;

if (!bysignal && exitcode == 0) {
serverLog(LL_NOTICE,
"Background RDB transfer terminated with success");
} else if (!bysignal && exitcode != 0) {
serverLog(LL_WARNING, "Background transfer error");
} else {
serverLog(LL_WARNING,
"Background transfer terminated by signal %d", bysignal);
}
server.rdb_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_save_time_start = -1;

/* If the child returns an OK exit code, read the set of slave client
* IDs and the associated status code. We'll terminate all the slaves
* in error state.
*
* If the process returned an error, consider the list of slaves that
* can continue to be empty, so that it's just a special case of the
* normal code path. */
ok_slaves = zmalloc(sizeof(uint64_t)); /* Make space for the count. */
ok_slaves[0] = 0;
if (!bysignal && exitcode == 0) {
int readlen = sizeof(uint64_t);

if (read(server.rdb_pipe_read_result_from_child, ok_slaves, readlen) ==
readlen)
{
readlen = ok_slaves[0]*sizeof(uint64_t)*2;

/* Make space for enough elements as specified by the first
* uint64_t element in the array. */
ok_slaves = zrealloc(ok_slaves,sizeof(uint64_t)+readlen);
if (readlen &&
read(server.rdb_pipe_read_result_from_child, ok_slaves+1,
readlen) != readlen)
{
ok_slaves[0] = 0;
}
}
}

close(server.rdb_pipe_read_result_from_child);
close(server.rdb_pipe_write_result_to_parent);

/* We can continue the replication process with all the slaves that
* correctly received the full payload. Others are terminated. */
listNode *ln;
listIter li;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
uint64_t j;
int errorcode = 0;

/* Search for the slave ID in the reply. In order for a slave to
* continue the replication process, we need to find it in the list,
* and it must have an error code set to 0 (which means success). */
for (j = 0; j < ok_slaves[0]; j++) {
if (slave->id == ok_slaves[2*j+1]) {
errorcode = ok_slaves[2*j+2];
break; /* Found in slaves list. */
}
}
if (j == ok_slaves[0] || errorcode != 0) {
serverLog(LL_WARNING,
"Closing slave %s: child->slave RDB transfer failed: %s",
replicationGetSlaveName(slave),
(errorcode == 0) ? "RDB transfer child aborted"
: strerror(errorcode));
freeClient(slave);
} else {
serverLog(LL_WARNING,
"Slave %s correctly received the streamed RDB file.",
replicationGetSlaveName(slave));
/* Restore the socket as non-blocking. */
anetNonBlock(NULL,slave->fd);
anetSendTimeout(NULL,slave->fd,0);
}
}
}
zfree(ok_slaves);

updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? C_OK : C_ERR, RDB_CHILD_TYPE_SOCKET);
}

backgroundSaveDoneHandler

以子进程执行rdb,进程结束后,处理后续工作总的入口,分为两类持久化到磁盘&主从同步

1
2
3
4
5
6
7
8
9
10
11
12
13
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
switch(server.rdb_child_type) {
case RDB_CHILD_TYPE_DISK:
backgroundSaveDoneHandlerDisk(exitcode,bysignal);
break;
case RDB_CHILD_TYPE_SOCKET:
backgroundSaveDoneHandlerSocket(exitcode,bysignal);
break;
default:
serverPanic("Unknown RDB child type.");
break;
}
}

rdbSaveToSlavesSockets

生成rdb文件,把该文件同步给从节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
int *fds;
uint64_t *clientids;
int numfds;
listNode *ln;
listIter li;
pid_t childpid;
long long start;
int pipefds[2];

if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

/* Before to fork, create a pipe that will be used in order to
* send back to the parent the IDs of the slaves that successfully
* received all the writes. */
if (pipe(pipefds) == -1) return C_ERR;
server.rdb_pipe_read_result_from_child = pipefds[0];
server.rdb_pipe_write_result_to_parent = pipefds[1];

/* Collect the file descriptors of the slaves we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
fds = zmalloc(sizeof(int)*listLength(server.slaves));
/* We also allocate an array of corresponding client IDs. This will
* be useful for the child process in order to build the report
* (sent via unix pipe) that will be sent to the parent. */
clientids = zmalloc(sizeof(uint64_t)*listLength(server.slaves));
numfds = 0;

listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;

if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
clientids[numfds] = slave->id;
fds[numfds++] = slave->fd;
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the children returns (since duped socket
* will share the O_NONBLOCK attribute with the parent). */
anetBlock(NULL,slave->fd);
anetSendTimeout(NULL,slave->fd,server.repl_timeout*1000);
}
}

/* Create the child process. */
openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
/* Child */
int retval;
rio slave_sockets;

rioInitWithFdset(&slave_sockets,fds,numfds);
zfree(fds);

closeListeningSockets(0);
redisSetProcTitle("redis-rdb-to-slaves");

retval = rdbSaveRioWithEOFMark(&slave_sockets,NULL,rsi);
if (retval == C_OK && rioFlush(&slave_sockets) == 0)
retval = C_ERR;

if (retval == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);

if (private_dirty) {
serverLog(LL_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}

server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_RDB);

/* If we are returning OK, at least one slave was served
* with the RDB file as expected, so we need to send a report
* to the parent via the pipe. The format of the message is:
*
* <len> <slave[0].id> <slave[0].error> ...
*
* len, slave IDs, and slave errors, are all uint64_t integers,
* so basically the reply is composed of 64 bits for the len field
* plus 2 additional 64 bit integers for each entry, for a total
* of 'len' entries.
*
* The 'id' represents the slave's client ID, so that the master
* can match the report with a specific slave, and 'error' is
* set to 0 if the replication process terminated with a success
* or the error code if an error occurred. */
void *msg = zmalloc(sizeof(uint64_t)*(1+2*numfds));
uint64_t *len = msg;
uint64_t *ids = len+1;
int j, msglen;

*len = numfds;
for (j = 0; j < numfds; j++) {
*ids++ = clientids[j];
*ids++ = slave_sockets.io.fdset.state[j];
}

/* Write the message to the parent. If we have no good slaves or
* we are unable to transfer the message to the parent, we exit
* with an error so that the parent will abort the replication
* process with all the childre that were waiting. */
msglen = sizeof(uint64_t)*(1+2*numfds);
if (*len == 0 ||
write(server.rdb_pipe_write_result_to_parent,msg,msglen)
!= msglen)
{
retval = C_ERR;
}
zfree(msg);
}
zfree(clientids);
rioFreeFdset(&slave_sockets);
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
if (childpid == -1) {
serverLog(LL_WARNING,"Can't save in background: fork: %s",
strerror(errno));

/* Undo the state change. The caller will perform cleanup on
* all the slaves in BGSAVE_START state, but an early call to
* replicationSetupSlaveForFullResync() turned it into BGSAVE_END */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
int j;

for (j = 0; j < numfds; j++) {
if (slave->id == clientids[j]) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
break;
}
}
}
close(pipefds[0]);
close(pipefds[1]);
closeChildInfoPipe();
} else {
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);

serverLog(LL_NOTICE,"Background RDB transfer started by pid %d",
childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
updateDictResizePolicy();
}
zfree(clientids);
zfree(fds);
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
}

saveCommand

客户端执行save命令,同步执行rdb

1
2
3
4
5
6
7
8
9
10
11
12
13
void saveCommand(client *c) {
if (server.rdb_child_pid != -1) {
addReplyError(c,"Background save already in progress");
return;
}
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rdbSave(server.rdb_filename,rsiptr) == C_OK) {
addReply(c,shared.ok);
} else {
addReply(c,shared.err);
}
}

bgsaveCommand

客户端执行bgsave命令,分两种情况,一种是现在立即执行,一种是计划任务(如果现在不能执行的话,在后面serverCron中执行,如果满足条件的话)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void bgsaveCommand(client *c) {
int schedule = 0;

/* The SCHEDULE option changes the behavior of BGSAVE when an AOF rewrite
* is in progress. Instead of returning an error a BGSAVE gets scheduled. */
if (c->argc > 1) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"schedule")) {
schedule = 1;
} else {
addReply(c,shared.syntaxerr);
return;
}
}

rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);

if (server.rdb_child_pid != -1) {
addReplyError(c,"Background save already in progress");
} else if (server.aof_child_pid != -1) {
if (schedule) {
server.rdb_bgsave_scheduled = 1;
addReplyStatus(c,"Background saving scheduled");
} else {
addReplyError(c,
"An AOF log rewriting in progress: can't BGSAVE right now. "
"Use BGSAVE SCHEDULE in order to schedule a BGSAVE whenever "
"possible.");
}
} else if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK) {
addReplyStatus(c,"Background saving started");
} else {
addReply(c,shared.err);
}
}

rdbPopulateSaveInfo

填充rdbSaveInfo结构体,结构体的信息影响到rdb的动作,主要是主从同步的附带信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT;
*rsi = rsi_init;

// 当前实例是主节点,且主节点同步缓冲区有内容
if (!server.masterhost && server.repl_backlog) {
// slaveseldb==-1 表示主节点从上次全量同步之后没有响应任何写命令。这时候就写-1,等下次响应写命令后,就会变化
rsi->repl_stream_db = server.slaveseldb == -1 ? 0 : server.slaveseldb;
return rsi;
}

// 当前节点是从节点,带上主节点的dbId
if (server.master) {
rsi->repl_stream_db = server.master->db->id;
return rsi;
}

if (server.cached_master) {
rsi->repl_stream_db = server.cached_master->db->id;
return rsi;
}
return NULL;
}