redis-aof

概述

aof是redis数据的一种持久化方式,本质是记录服务器处理的每一个写操作,使用redis协议进行追加存储。当redis重启时,通过还原操作,重建数据集。aof分为两个阶段,一是正常的操作追加;另一种则是aof重写。

aof流程

正常写入aof文件的时机有两点,一是服务端接收到写命令,二是数据库里某个key过期。当确定之后,会把命令按照redis协议,写入到aof缓存中,等到serverCronbeforeSleepprepareForShutdown 三个过程时写入文件。

服务端接收到写命令

当服务端接收到命令处理后,会根据这个命令执行前后数据库改变次数来判断此命令是否更改了数据库,来初步决定是否需要把改命令写入aof。一个命令能否写入aof是有三个因素决定的

  • 客户端 客户端有一个标志位,其中4位用来标识强制aof强制repl阻止aof阻止repl
  • 处理标志 处理标志有两个标志位标识执行aof执行repl
  • 命令本身 通过命令执行前后数据库改变次数变化判断该命令是否应该执行aof/repl

repl是主从同步

在此种情况下,一个命令最终产生的同步(aof/repl)命令来源于两方面,一个是命令本身产生的,一个是处理命令所经过的模块产生的,这两部分的同步条件时不一样的。

命令本身产生的同步命令要想同步的话,需要满足两个前提条件和优先级过滤限制

前提

  • 处理标志允许全部传播
  • 客户端未禁止所有的传播

在正常的执行过程中,会先清除客户端的强制aof强制repl阻止aof阻止repl标志位,交由具体的处理标志和命令本身决定。处理标志通常情况下都是1,也是满足的。(当模块发起的命令执行,此处是不太一样的)

优先级

  • 命令本身如果更改了数据库,aof,repl传播标志位置为1
  • 客户端标志中强制aof强制repl为1,对应的传播标志位置为1
  • 客户端有阻止aof阻止repl,对应的传播标志位置为0
  • 如果命令是模块相关的话,跳过

模块产生的处理逻辑:

  • 模块生成命令自带一个标志位
  • 根据处理标志,剔除aof/repl

数据库key过期

当数据库里某个key过期的时候,会生成一条del操作,放到aof的缓冲区内,并分发从节点进行同步。

aof重写

当aof文件增长速率达到一定规模,或者接收到客户端的请求,需要对aof文件进行重写。aof重写的过程是创建一个子进程,该进程将db中的数据写入临时文件,期间从父进程读取aof的增量差异数据,在写完db的数据后,把差异数据追加到文件后面。父进程则是在子进程工作结束之后,最后一次把差异数据追加写入,然后用临时文件重命名为正式文件,完成整个过程。

整个流程的关键点是,在aof重写开始后,父进程把aof数据不写入原先的buffer,改为写入server.aof_rewrite_buf_blocksaof重写数据块中。这个过程一直持续到子进程处理完成,父进程处理完所有后续。父进程把重写数据块的数据通过管道写入,子进程读取该差异数据存储。当子进程处理完db的数据时,会把从父进程读取的差异追加到文件。在一段时间内或者20次内没有读取到差异数据的话,子进程工作算是完成了大半。然后通过管道发送通知给父进程请求停止发送差异数据,父进程确认后停止发送。子进程收到反馈,最后一次读取差异数据并写入,然后子进程结束。父进程把差异数据追加到文件,重命名临时文件为正式文件,切换aof文件句柄,更改aof状态为正常,后续aof数据恢复正常,写入aof缓冲区。

整个流程如下图所示

aof文件

按照aof文件组成区分的话,有两种形式。一种是按照rdb的格式,使用RDB_OPCODE_XXXX去标识各种操作,另一种是使用set,rpush,sadd等具体的命令字符串去标识各种操作。依照此划分的话,aof的构成也是分为两部分,第一部分是基础数据,也就是redis数据库的数据,可以用以上两种方式之一表示;二是在基础数据之上的增量操作,这部分主要是执行aof重写开始到结束这段时间的操作增量,这部分数据都是具体的命令字符串表示的。

下面说一下使用rdb格式和不使用rdb格式的流程

使用rdb

以’REDIS’+具体RDB_VERSION开始,如版本9’REDIS0009’,然后依次写入
redis-ver+redis版本
redis-bits+64位还是/32位
ctime+当前时间
used-mem+使用内存
aof-preamble+1/0 是否来自aof

再遍历每个数据库
写入一个字节RDB_OPCODE_SELECTDB表示selectdb
写入一个字节表示数据库的索引
写入一个字节RDB_OPCODE_RESIZEDB表示数据库大小
写入数据库hash的大小
写入数据库过期hash的大小

遍历hash表里的每一个节点

写入RDB_OPCODE_EXPIRETIME_MS表示过期时间(如果未过期)
写入过期时间
写入lru 如果server.maxmemory_policy & MAXMEMORY_FLAG_LRU
写入lfu 如果server.maxmemory_policy & MAXMEMORY_FLAG_LFU
写入对象类型RDB_TYPE_XXXXX
写入key
写入value

当前rio文件已经处理的字节大于AOF_READ_DIFF_INTERVAL_BYTES,尝试从父进程读取差异数据,为了最后写入的量小一些

写入RDB_OPCODE_EOF表示EOF
写入校验值,如果值为0的话,加载数据的时候,会跳过校验

不使用rdb

依次遍历每个数据库
写入select
写入数据库索引

依次遍历hash里的每一个节点,进行以下操作
根据值的类型翻译成对应的redis命令,如
string : SET key value
list:RPUSH key 然后写入value
set: SADD key 然后写入value
……..

像list,set这种数据类型,一次写入的数量是AOF_REWRITE_ITEMS_PER_CMD(64),也就是说RPUSH,SADD 后跟的value最多是64个,如果多的话就再起一个RPUSH,SADD

写入过期时间,如果没有过期的话
PEXPIREAT key expiretime

当前rio文件已经处理的字节大于AOF_READ_DIFF_INTERVAL_BYTES,尝试从父进程读取差异数据,为了最后写入的量小一些

源码分析

数据格式

1
2
3
4
5
6
7
#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) //每个数据块的大小

//数据块,用于重写过程中存放aof数据
typedef struct aofrwblock {
unsigned long used, free; //使用大小,剩余大小
char buf[AOF_RW_BUF_BLOCK_SIZE]; //数据存放
} aofrwblock;

aofRewriteBufferReset

释放旧的数据块,初始化新的,初始化以及aof重写结束的时候用到

1
2
3
4
5
6
7
void aofRewriteBufferReset(void) {
if (server.aof_rewrite_buf_blocks)
listRelease(server.aof_rewrite_buf_blocks);

server.aof_rewrite_buf_blocks = listCreate();
listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
}

aofRewriteBufferSize

返回重写buffer已使用的大小,因为数据块是使用链表串接的,所以循环累加

1
2
3
4
5
6
7
8
9
10
11
12
unsigned long aofRewriteBufferSize(void) {
listNode *ln;
listIter li;
unsigned long size = 0;

listRewind(server.aof_rewrite_buf_blocks,&li);
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
size += block->used;
}
return size;
}

aofChildWriteDiffData

父进程通过管道发送增量数据到子进程,当子进程发送一个消息 ‘!’,要求停止发送,aof_stop_sending_diff 会被置为1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln;
aofrwblock *block;
ssize_t nwritten;
UNUSED(el);
UNUSED(fd);
UNUSED(privdata);
UNUSED(mask);

while(1) {
ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL;

// 停止发送 或者 没有增量数据,删除管道写事件的监听
if (server.aof_stop_sending_diff || !block) {
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}

// 发送数据 维护block的使用,释放大小
if (block->used > 0) {
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
block->used -= nwritten;
block->free += nwritten;
}

// 删除当前数据块节点,如果数据都处理完
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
}
}

aofRewriteBufferAppend

往aof重写数据块中写入数据,在重写的过程中,父进程有新的aof数据来的时候会调用此方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
listNode *ln = listLast(server.aof_rewrite_buf_blocks);
aofrwblock *block = ln ? ln->value : NULL;

while(len) {
// 已经有数据块了
if (block) {
unsigned long thislen = (block->free < len) ? block->free : len;
if (thislen) { /* The current block is not already full. */
memcpy(block->buf+block->used, s, thislen);
block->used += thislen;
block->free -= thislen;
s += thislen;
len -= thislen;
}
}

// 还有数据没处理,两种情况:数据块链表为空||上一个数据块已经满了
if (len) { /* First block to allocate, or need another block. */
int numblocks;

block = zmalloc(sizeof(*block));
block->free = AOF_RW_BUF_BLOCK_SIZE;
block->used = 0;
listAddNodeTail(server.aof_rewrite_buf_blocks,block);

// 数据块每到10或100 增加一条warning,notice级别日志
numblocks = listLength(server.aof_rewrite_buf_blocks);
if (((numblocks+1) % 10) == 0) {
int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
LL_NOTICE;
serverLog(level,"Background AOF buffer size: %lu MB",
aofRewriteBufferSize()/(1024*1024));
}
}
}

// 创建管道的写监听事件
if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
AE_WRITABLE, aofChildWriteDiffData, NULL);
}
}

aofRewriteBufferWrite

把aof重写数据块里的数据写入到fd中,此方法会在子进程结束后,父进程最后把差异数据写进临时文件用到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ssize_t aofRewriteBufferWrite(int fd) {
listNode *ln;
listIter li;
ssize_t count = 0;

listRewind(server.aof_rewrite_buf_blocks,&li);
while((ln = listNext(&li))) {
aofrwblock *block = listNodeValue(ln);
ssize_t nwritten;

if (block->used) {
nwritten = write(fd,block->buf,block->used);
if (nwritten != (ssize_t)block->used) {
if (nwritten == 0) errno = EIO;
return -1;
}
count += nwritten;
}
}
return count;
}

aof_background_fsync

创建bio任务,执行flush

1
2
3
void aof_background_fsync(int fd) {
bioCreateBackgroundJob(BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
}

killAppendOnlyChild

杀掉aof重写子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void killAppendOnlyChild(void) {
int statloc;
if (server.aof_child_pid == -1) return;
/* Kill AOFRW child, wait for child exit. */
serverLog(LL_NOTICE,"Killing running AOF rewrite child: %ld",
(long) server.aof_child_pid);
if (kill(server.aof_child_pid,SIGUSR1) != -1) {
while(wait3(&statloc,0,NULL) != server.aof_child_pid);
}

// 重置buffer、删除临时文件、aof进程id、关闭管道
aofRewriteBufferReset();
aofRemoveTempFile(server.aof_child_pid);
server.aof_child_pid = -1;
server.aof_rewrite_time_start = -1;

aofClosePipes();
}

stopAppendOnly

关闭aof,重置aof相关状态,当客户端使用config命令执行appendonly no

1
2
3
4
5
6
7
8
9
10
11
void stopAppendOnly(void) {
serverAssert(server.aof_state != AOF_OFF);
flushAppendOnlyFile(1);
redis_fsync(server.aof_fd);
close(server.aof_fd);

server.aof_fd = -1;
server.aof_selected_db = -1;
server.aof_state = AOF_OFF;
killAppendOnlyChild();
}

startAppendOnly

开启aof,当客户端使用config命令执行appendonly yes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int startAppendOnly(void) {
char cwd[MAXPATHLEN];
int newfd;

newfd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
serverAssert(server.aof_state == AOF_OFF);
if (newfd == -1) {
char *cwdp = getcwd(cwd,MAXPATHLEN);

serverLog(LL_WARNING,
"Redis needs to enable the AOF but can't open the "
"append only file %s (in server root dir %s): %s",
server.aof_filename,
cwdp ? cwdp : "unknown",
strerror(errno));
return C_ERR;
}

//rdb正在执行
if (server.rdb_child_pid != -1) {
server.aof_rewrite_scheduled = 1;
serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible.");
} else {
// aof正在执行,kill子进程
if (server.aof_child_pid != -1) {
serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now.");
killAppendOnlyChild();
}

//开启aof重写
if (rewriteAppendOnlyFileBackground() == C_ERR) {
close(newfd);
serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
return C_ERR;
}
}
server.aof_state = AOF_WAIT_REWRITE;
server.aof_last_fsync = server.unixtime;
server.aof_fd = newfd;
return C_OK;
}

aofWrite

往fd中写入数据,在aof非重写阶段,会调用此方法写入命令到aof文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ssize_t aofWrite(int fd, const char *buf, size_t len) {
ssize_t nwritten = 0, totwritten = 0;

while(len) {
nwritten = write(fd, buf, len);

if (nwritten < 0) {
if (errno == EINTR) {
continue;
}
return totwritten ? totwritten : -1;
}

len -= nwritten;
buf += nwritten;
totwritten += nwritten;
}

return totwritten;
}

flushAppendOnlyFile

把aof缓冲区的内容,输出到磁盘上,取决于flus策略,和参数是否强制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;

//缓冲区无数据
if (sdslen(server.aof_buf) == 0) return;

// 每秒flush情况
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;

// 如果上一次flush的时间到现在超了
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
// 记录开始
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
// 2秒内继续等
return;
}
// 超过两秒,可能磁盘忙
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}

latencyStartMonitor(latency);
//写入数据到文件
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);

if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);

/* We performed the write so reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;

if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;

/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}

/* Log the AOF write error and record the error code. */
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}

if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}

/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract with the user that on acknowledged write data
* is synced on disk. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = C_ERR;

/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
server.aof_current_size += nwritten;

if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}

/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite &&
(server.aof_child_pid != -1 || server.rdb_child_pid != -1))
return;

/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
}

catAppendOnlyGenericCommand

按照redis协议,把命令翻译成字符串,拼接起来,例如del操作,深入了解的话需要先去阅读redis协议这一章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
char buf[32];
int len, j;
robj *o;

// 数组标识开头,计算转换后长度,写入结尾
buf[0] = '*';
len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);

// 依次写入数组中每一个元素
for (j = 0; j < argc; j++) {
o = getDecodedObject(argv[j]);
buf[0] = '$';
len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
buf[len++] = '\r';
buf[len++] = '\n';
dst = sdscatlen(dst,buf,len);
dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
dst = sdscatlen(dst,"\r\n",2);
decrRefCount(o);
}
return dst;
}

catAppendOnlyExpireAtCommand

把expire、pexpire命令转换成pexpireat(时间是绝对的),然后转化成字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
long long when;
robj *argv[3];

seconds = getDecodedObject(seconds);
when = strtoll(seconds->ptr,NULL,10);

// 转换成ms
if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
cmd->proc == expireatCommand)
{
when *= 1000;
}

// 转换成绝对时间
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == setexCommand || cmd->proc == psetexCommand)
{
when += mstime();
}
decrRefCount(seconds);

argv[0] = createStringObject("PEXPIREAT",9);
argv[1] = key;
argv[2] = createStringObjectFromLongLong(when);
buf = catAppendOnlyGenericCommand(buf, 3, argv);
decrRefCount(argv[0]);
decrRefCount(argv[2]);
return buf;
}

feedAppendOnlyFile

把命令按照redis协议翻译到成字符串,写入aof

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];

// 确定db,避免和上一次操作db不同
if (dictid != server.aof_selected_db) {
char seldb[64];

snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
server.aof_selected_db = dictid;
}

// 转换过期命令,绝对时间
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
// 转换setex,psetex到set+pexpireat两条命令
tmpargv[0] = createStringObject("SET",3);
tmpargv[1] = argv[1];
tmpargv[2] = argv[3];
buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
decrRefCount(tmpargv[0]);
buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
} else if (cmd->proc == setCommand && argc > 3) {
// 转换set ex px 到 set + pexpireat两条命令
int i;
robj *exarg = NULL, *pxarg = NULL;
buf = catAppendOnlyGenericCommand(buf,3,argv);
for (i = 3; i < argc; i ++) {
if (!strcasecmp(argv[i]->ptr, "ex")) exarg = argv[i+1];
if (!strcasecmp(argv[i]->ptr, "px")) pxarg = argv[i+1];
}
serverAssert(!(exarg && pxarg));
if (exarg)
buf = catAppendOnlyExpireAtCommand(buf,server.expireCommand,argv[1],
exarg);
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,server.pexpireCommand,argv[1],
pxarg);
} else {
//其余的正常按照redis协议转换成字符串就行
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}

// 如果aof开启&非重写阶段,写入aof缓存
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));

// aof正在重写,写入aof重写数据块
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));

sdsfree(buf);
}

createFakeClient

创建一个假的客户端,用来加载aof文件,因为所有的命令执行都是基于客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct client *createFakeClient(void) {
struct client *c = zmalloc(sizeof(*c));

selectDb(c,0);
c->fd = -1;
c->name = NULL;
c->querybuf = sdsempty();
c->querybuf_peak = 0;
c->argc = 0;
c->argv = NULL;
c->bufpos = 0;
c->flags = 0;
c->btype = BLOCKED_NONE;
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
c->watched_keys = listCreate();
c->peerid = NULL;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
initClientMultiState(c);
return c;
}

freeFakeClientArgv

释放假客户端参数

1
2
3
4
5
6
7
void freeFakeClientArgv(struct client *c) {
int j;

for (j = 0; j < c->argc; j++)
decrRefCount(c->argv[j]);
zfree(c->argv);
}

freeFakeClient

释放假客户端

1
2
3
4
5
6
7
void freeFakeClient(struct client *c) {
sdsfree(c->querybuf);
listRelease(c->reply);
listRelease(c->watched_keys);
freeClientMultiState(c);
zfree(c);
}

loadAppendOnlyFile

加载aof文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
int loadAppendOnlyFile(char *filename) {
struct client *fakeClient;
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */

if (fp == NULL) {
serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
exit(1);
}

// 处理文件长度为0的情况
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return C_ERR;
}

// 关闭aof功能
server.aof_state = AOF_OFF;

fakeClient = createFakeClient();
startLoading(fp);

// 检查aof文件是以哪种格式存储的,rdb还是非rdb
char sig[5]; /* "REDIS" */
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
// 非rdb模式,回退指针到起始位置
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
} else {
// rdb模式
rio rdb;

serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;

// 创建文件类型rio,指定数据源为aof文件fd
rioInitWithFile(&rdb,fp);

// 读取数据,rdb格式的
if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
}
}

// 读取字符串命令格式数据,追加操作
while(1) {
int argc, j;
unsigned long len;
robj **argv;
char buf[128];
sds argsds;
struct redisCommand *cmd;

// 更新读取字节&处理客户端相应,这里和main流程的事件循环不一样,是非阻塞的
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
processEventsWhileBlocked();
}

if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp))
break;
else
goto readerr;
}
if (buf[0] != '*') goto fmterr;
if (buf[1] == '\0') goto readerr;
argc = atoi(buf+1);
if (argc < 1) goto fmterr;

argv = zmalloc(sizeof(robj*)*argc);
fakeClient->argc = argc;
fakeClient->argv = argv;

// 解析参数
for (j = 0; j < argc; j++) {
if (fgets(buf,sizeof(buf),fp) == NULL) {
fakeClient->argc = j; /* Free up to j-1. */
freeFakeClientArgv(fakeClient);
goto readerr;
}
if (buf[0] != '$') goto fmterr;
len = strtol(buf+1,NULL,10);
argsds = sdsnewlen(SDS_NOINIT,len);
if (len && fread(argsds,len,1,fp) == 0) {
sdsfree(argsds);
fakeClient->argc = j; /* Free up to j-1. */
freeFakeClientArgv(fakeClient);
goto readerr;
}
argv[j] = createObject(OBJ_STRING,argsds);
if (fread(buf,2,1,fp) == 0) {
fakeClient->argc = j+1; /* Free up to j. */
freeFakeClientArgv(fakeClient);
goto readerr; /* discard CRLF */
}
}

// 查找命令
cmd = lookupCommand(argv[0]->ptr);
if (!cmd) {
serverLog(LL_WARNING,
"Unknown command '%s' reading the append only file",
(char*)argv[0]->ptr);
exit(1);
}

if (cmd == server.multiCommand) valid_before_multi = valid_up_to;

// 执行命令
fakeClient->cmd = cmd;
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
queueMultiCommand(fakeClient);
} else {
cmd->proc(fakeClient);
}

serverAssert(fakeClient->bufpos == 0 &&
listLength(fakeClient->reply) == 0);

serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);

freeFakeClientArgv(fakeClient);
fakeClient->cmd = NULL;
if (server.aof_load_truncated) valid_up_to = ftello(fp);
}

if (fakeClient->flags & CLIENT_MULTI) {
serverLog(LL_WARNING,
"Revert incomplete MULTI/EXEC transaction in AOF file");
valid_up_to = valid_before_multi;
goto uxeof;
}

loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
fclose(fp);
freeFakeClient(fakeClient);
server.aof_state = old_aof_state;
stopLoading();
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
return C_OK;

readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
if (!feof(fp)) {
if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
exit(1);
}

uxeof: /* Unexpected AOF end of file. */
if (server.aof_load_truncated) {
serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!");
serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!",
(unsigned long long) valid_up_to);
if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {
if (valid_up_to == -1) {
serverLog(LL_WARNING,"Last valid command offset is invalid");
} else {
serverLog(LL_WARNING,"Error truncating the AOF file: %s",
strerror(errno));
}
} else {
/* Make sure the AOF file descriptor points to the end of the
* file after the truncate call. */
if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {
serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s",
strerror(errno));
} else {
serverLog(LL_WARNING,
"AOF loaded anyway because aof-load-truncated is enabled");
goto loaded_ok;
}
}
}
if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
serverLog(LL_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.");
exit(1);

fmterr: /* Format error. */
if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */
serverLog(LL_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
exit(1);
}

rioWriteBulkObject

把redis对象按照redis协议输出成bulk string或者bulk long long字符串

1
2
3
4
5
6
7
8
9
int rioWriteBulkObject(rio *r, robj *obj) {
if (obj->encoding == OBJ_ENCODING_INT) {
return rioWriteBulkLongLong(r,(long)obj->ptr);
} else if (sdsEncodedObject(obj)) {
return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
} else {
serverPanic("Unknown string encoding");
}
}

rewriteListObject

把列表对象按照redis协议,转换成rpush命令写入文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int rewriteListObject(rio *r, robj *key, robj *o) {
long long count = 0, items = listTypeLength(o);

if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *list = o->ptr;
quicklistIter *li = quicklistGetIterator(list, AL_START_HEAD);
quicklistEntry entry;

while (quicklistNext(li,&entry)) {

// 控制一次rpush操作64个,避免一次性拥挤
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;
if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
}

if (entry.value) {
if (rioWriteBulkString(r,(char*)entry.value,entry.sz) == 0) return 0;
} else {
if (rioWriteBulkLongLong(r,entry.longval) == 0) return 0;
}
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
quicklistReleaseIterator(li);
} else {
serverPanic("Unknown list encoding");
}
return 1;
}

rewriteSetObject

把集合对象按照redis协议,转换成sadd命令写入文件,区分两种编码类型,本质和上面list类型一致,也控制一次sadd数量为64

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
int rewriteSetObject(rio *r, robj *key, robj *o) {
long long count = 0, items = setTypeSize(o);

if (o->encoding == OBJ_ENCODING_INTSET) {
int ii = 0;
int64_t llval;

while(intsetGet(o->ptr,ii++,&llval)) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;

if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
}
if (rioWriteBulkLongLong(r,llval) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
} else if (o->encoding == OBJ_ENCODING_HT) {
dictIterator *di = dictGetIterator(o->ptr);
dictEntry *de;

while((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;

if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
}
if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
dictReleaseIterator(di);
} else {
serverPanic("Unknown set encoding");
}
return 1;
}

rewriteSortedSetObject

把有序集合按照redis协议,转换成zadd命令写入文件,区分使用压缩表还是跳跃表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
long long count = 0, items = zsetLength(o);

//压缩表
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = o->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vll;
double score;

eptr = ziplistIndex(zl,0);
serverAssert(eptr != NULL);
sptr = ziplistNext(zl,eptr);
serverAssert(sptr != NULL);

while (eptr != NULL) {
serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
score = zzlGetScore(sptr);

if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;

if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
}
if (rioWriteBulkDouble(r,score) == 0) return 0;
if (vstr != NULL) {
if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0;
} else {
if (rioWriteBulkLongLong(r,vll) == 0) return 0;
}
zzlNext(zl,&eptr,&sptr);
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
//跳跃表
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;

while((de = dictNext(di)) != NULL) {
sds ele = dictGetKey(de);
double *score = dictGetVal(de);

if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;

if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
}
if (rioWriteBulkDouble(r,*score) == 0) return 0;
if (rioWriteBulkString(r,ele,sdslen(ele)) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}
dictReleaseIterator(di);
} else {
serverPanic("Unknown sorted zset encoding");
}
return 1;
}

rioWriteHashIteratorCursor

写入hash迭代器的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
if (hi->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *vstr = NULL;
unsigned int vlen = UINT_MAX;
long long vll = LLONG_MAX;

hashTypeCurrentFromZiplist(hi, what, &vstr, &vlen, &vll);
if (vstr)
return rioWriteBulkString(r, (char*)vstr, vlen);
else
return rioWriteBulkLongLong(r, vll);
} else if (hi->encoding == OBJ_ENCODING_HT) {
sds value = hashTypeCurrentFromHashTable(hi, what);
return rioWriteBulkString(r, value, sdslen(value));
}

serverPanic("Unknown hash encoding");
return 0;
}

rewriteHashObject

把哈希表按照redis协议,转换成hmset命令写入文件,一次写入64个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int rewriteHashObject(rio *r, robj *key, robj *o) {
hashTypeIterator *hi;
long long count = 0, items = hashTypeLength(o);

hi = hashTypeInitIterator(o);
while (hashTypeNext(hi) != C_ERR) {
if (count == 0) {
int cmd_items = (items > AOF_REWRITE_ITEMS_PER_CMD) ?
AOF_REWRITE_ITEMS_PER_CMD : items;

if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
}

if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_KEY) == 0) return 0;
if (rioWriteHashIteratorCursor(r, hi, OBJ_HASH_VALUE) == 0) return 0;
if (++count == AOF_REWRITE_ITEMS_PER_CMD) count = 0;
items--;
}

hashTypeReleaseIterator(hi);

return 1;
}

rioWriteBulkStreamID

写入流id

1
2
3
4
5
6
7
8
int rioWriteBulkStreamID(rio *r,streamID *id) {
int retval;

sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
if ((retval = rioWriteBulkString(r,replyid,sdslen(replyid))) == 0) return 0;
sdsfree(replyid);
return retval;
}

rioWriteStreamPendingEntry

写入xclaim 命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer, unsigned char *rawid, streamNACK *nack) {
/* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
RETRYCOUNT <count> JUSTID FORCE. */
streamID id;
streamDecodeID(rawid,&id);
if (rioWriteBulkCount(r,'*',12) == 0) return 0;
if (rioWriteBulkString(r,"XCLAIM",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0;
if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
if (rioWriteBulkString(r,"TIME",4) == 0) return 0;
if (rioWriteBulkLongLong(r,nack->delivery_time) == 0) return 0;
if (rioWriteBulkString(r,"RETRYCOUNT",10) == 0) return 0;
if (rioWriteBulkLongLong(r,nack->delivery_count) == 0) return 0;
if (rioWriteBulkString(r,"JUSTID",6) == 0) return 0;
if (rioWriteBulkString(r,"FORCE",5) == 0) return 0;
return 1;
}

rewriteStreamObject

写入流对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
int rewriteStreamObject(rio *r, robj *key, robj *o) {
stream *s = o->ptr;
streamIterator si;
streamIteratorStart(&si,s,NULL,NULL,0);
streamID id;
int64_t numfields;

if (s->length) {
/* Reconstruct the stream data using XADD commands. */
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */

/* Emit the XADD <key> <id> ...fields... command. */
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
while(numfields--) {
unsigned char *field, *value;
int64_t field_len, value_len;
streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
}
}
} else {
/* Use the XADD MAXLEN 0 trick to generate an empty stream if
* the key we are serializing is an empty string, which is possible
* for the Stream type. */
if (rioWriteBulkCount(r,'*',7) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
if (rioWriteBulkString(r,"x",1) == 0) return 0;
if (rioWriteBulkString(r,"y",1) == 0) return 0;
}

/* Append XSETID after XADD, make sure lastid is correct,
* in case of XDEL lastid. */
if (rioWriteBulkCount(r,'*',3) == 0) return 0;
if (rioWriteBulkString(r,"XSETID",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;


/* Create all the stream consumer groups. */
if (s->cgroups) {
raxIterator ri;
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *group = ri.data;
/* Emit the XGROUP CREATE in order to create the group. */
if (rioWriteBulkCount(r,'*',5) == 0) return 0;
if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0;
if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return 0;
if (rioWriteBulkStreamID(r,&group->last_id) == 0) return 0;

/* Generate XCLAIMs for each consumer that happens to
* have pending entries. Empty consumers have no semantical
* value so they are discarded. */
raxIterator ri_cons;
raxStart(&ri_cons,group->consumers);
raxSeek(&ri_cons,"^",NULL,0);
while(raxNext(&ri_cons)) {
streamConsumer *consumer = ri_cons.data;
/* For the current consumer, iterate all the PEL entries
* to emit the XCLAIM protocol. */
raxIterator ri_pel;
raxStart(&ri_pel,consumer->pel);
raxSeek(&ri_pel,"^",NULL,0);
while(raxNext(&ri_pel)) {
streamNACK *nack = ri_pel.data;
if (rioWriteStreamPendingEntry(r,key,(char*)ri.key,
ri.key_len,consumer,
ri_pel.key,nack) == 0)
{
return 0;
}
}
raxStop(&ri_pel);
}
raxStop(&ri_cons);
}
raxStop(&ri);
}

streamIteratorStop(&si);
return 1;
}

rewriteModuleObject

写入模块对象,此种类型非redis所能处理

1
2
3
4
5
6
7
8
9
10
11
12
int rewriteModuleObject(rio *r, robj *key, robj *o) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitIOContext(io,mt,r);
mt->aof_rewrite(&io,key,mv->value);
if (io.ctx) {
moduleFreeContext(io.ctx);
zfree(io.ctx);
}
return io.error ? 0 : 1;
}

aofReadDiffFromParent

子进程从管道读取父进程发送的差异数据

1
2
3
4
5
6
7
8
9
10
11
ssize_t aofReadDiffFromParent(void) {
char buf[65536]; /* Default pipe buffer size on most Linux systems. */
ssize_t nread, total = 0;

while ((nread =
read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
total += nread;
}
return total;
}

rewriteAppendOnlyFileRio

不使用rdb协议把数据库按照命令的方式依次写入到文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
int rewriteAppendOnlyFileRio(rio *aof) {
dictIterator *di = NULL;
dictEntry *de;
size_t processed = 0;
int j;

// 遍历数据库
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);

// 写入selectdb操作
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;

// 依次写入此db中所有的值
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;

keystr = dictGetKey(de);
o = dictGetVal(de);
initStaticStringObject(key,keystr);

expiretime = getExpire(db,&key);

/* Save the key and associated value */
if (o->type == OBJ_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
/* Key and value */
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkObject(aof,o) == 0) goto werr;
} else if (o->type == OBJ_LIST) {
if (rewriteListObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_SET) {
if (rewriteSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}

// 写入过期时间
if (expiretime != -1) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr;
}

// 处理一定量级后,读取差异数据,因为此过程可能耗时很长,避免最后读取大量数据
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {
processed = aof->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL;
}
return C_OK;

werr:
if (di) dictReleaseIterator(di);
return C_ERR;
}

rewriteAppendOnlyFile

重写aof文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp;
char tmpfile[256];
char byte;

// 临时文件
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}

// 初始化子进程的差异数据存储
server.aof_child_diff = sdsempty();
// 初始化rio对象,用临时文件fd
rioInitWithFile(&aof,fp);

// 设置rio对象的flush大小
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);

// 写入当前db数据,是否采用rdb协议
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}

// 为了让下一次flush快一些,先执行一次
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;

// db数据处理完了,只剩下差异数据了,尝试1s内读取,同时限制次数,如果20次都没有读取到数据的话,就不浪费时间了
int nodata = 0;
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0;
aofReadDiffFromParent();
}

// 通过管道,给父进程发送请求,停止发送差异数据
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;

// 从管道收到父进程停止发送的确定
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");

// 再读取一边差异数据,防止在请求停止的过程中,依旧有数据过来
aofReadDiffFromParent();

// 写入差异数据
serverLog(LL_NOTICE,
"Concatenating %.2f MB of AOF diff received from parent.",
(double) sdslen(server.aof_child_diff) / (1024*1024));
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;

// flush数据到磁盘
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;

// 重命名文件
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
return C_OK;

werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
return C_ERR;
}

aofChildPipeReadable

父进程收到子进程的停止发送差异请求,发送确认给子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
char byte;
UNUSED(el);
UNUSED(privdata);
UNUSED(mask);

if (read(fd,&byte,1) == 1 && byte == '!') {
serverLog(LL_NOTICE,"AOF rewrite child asks to stop sending diffs.");
server.aof_stop_sending_diff = 1;
if (write(server.aof_pipe_write_ack_to_child,"!",1) != 1) {
serverLog(LL_WARNING,"Can't send ACK to AOF child: %s",
strerror(errno));
}
}

aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
}

aofCreatePipes

创建父进程和子进程通信的管道,有三个,分别用来:传输差异数据、请求停止发送、确认停止发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int aofCreatePipes(void) {
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;

if (pipe(fds) == -1) goto error; // 父进程->子进程发送数据
if (pipe(fds+2) == -1) goto error; // 子进程请求停止->父进程
if (pipe(fds+4) == -1) goto error; //父进程确认停止->子进程
// 管道都是非阻塞的
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

server.aof_pipe_write_data_to_child = fds[1];
server.aof_pipe_read_data_from_parent = fds[0];
server.aof_pipe_write_ack_to_parent = fds[3];
server.aof_pipe_read_ack_from_child = fds[2];
server.aof_pipe_write_ack_to_child = fds[5];
server.aof_pipe_read_ack_from_parent = fds[4];
server.aof_stop_sending_diff = 0;
return C_OK;

error:
serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
strerror(errno));
for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
return C_ERR;
}

aofClosePipes

关闭管道

1
2
3
4
5
6
7
8
9
10
void aofClosePipes(void) {
aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE);
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE);
close(server.aof_pipe_write_data_to_child);
close(server.aof_pipe_read_data_from_parent);
close(server.aof_pipe_write_ack_to_parent);
close(server.aof_pipe_read_ack_from_child);
close(server.aof_pipe_write_ack_to_child);
close(server.aof_pipe_read_ack_from_parent);
}

rewriteAppendOnlyFileBackground

创建子进程执行aof重写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
int rewriteAppendOnlyFileBackground(void) {
pid_t childpid;
long long start;

// 已经有rdb或aof子进程
if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
if (aofCreatePipes() != C_OK) return C_ERR;
openChildInfoPipe();
start = ustime();
if ((childpid = fork()) == 0) {
char tmpfile[256];

// 子进程
closeListeningSockets(0);
redisSetProcTitle("redis-aof-rewrite");

// 创建临时文件
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
// 执行重写
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
size_t private_dirty = zmalloc_get_private_dirty(-1);

if (private_dirty) {
serverLog(LL_NOTICE,
"AOF rewrite: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}

server.child_info_data.cow_size = private_dirty;
sendChildInfo(CHILD_INFO_TYPE_AOF);
exitFromChild(0);
} else {
exitFromChild(1);
}
} else {
// 父进程
server.stat_fork_time = ustime()-start;
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
if (childpid == -1) {
closeChildInfoPipe();
serverLog(LL_WARNING,
"Can't rewrite append only file in background: fork: %s",
strerror(errno));
aofClosePipes();
return C_ERR;
}
serverLog(LL_NOTICE,
"Background append only file rewriting started by pid %d",childpid);
server.aof_rewrite_scheduled = 0;
server.aof_rewrite_time_start = time(NULL);
server.aof_child_pid = childpid;
updateDictResizePolicy();

server.aof_selected_db = -1;
replicationScriptCacheFlush();
return C_OK;
}
return C_OK;
}

bgrewriteaofCommand

执行客户端的aof重写命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void bgrewriteaofCommand(client *c) {
// aof重写进程中
if (server.aof_child_pid != -1) {
addReplyError(c,"Background append only file rewriting already in progress");
} else if (server.rdb_child_pid != -1) {
// rdb进行中
server.aof_rewrite_scheduled = 1;
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
addReplyStatus(c,"Background append only file rewriting started");
} else {
addReply(c,shared.err);
}
}

aofRemoveTempFile

删除临时文件

1
2
3
4
5
6
void aofRemoveTempFile(pid_t childpid) {
char tmpfile[256];

snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
unlink(tmpfile);
}

aofUpdateCurrentSize

更新aof大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void aofUpdateCurrentSize(void) {
struct redis_stat sb;
mstime_t latency;

latencyStartMonitor(latency);
if (redis_fstat(server.aof_fd,&sb) == -1) {
serverLog(LL_WARNING,"Unable to obtain the AOF file length. stat: %s",
strerror(errno));
} else {
server.aof_current_size = sb.st_size;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fstat",latency);
}

backgroundRewriteDoneHandler

父进程根据子进程的退出码和信号处理后续,正常是在子进程重写aof文件的基础上,写入差异数据,重命名为正式aof文件,切换aof状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {

// 子进程正常退出
if (!bysignal && exitcode == 0) {
int newfd, oldfd;
char tmpfile[256];
long long now = ustime();
mstime_t latency;

serverLog(LL_NOTICE,
"Background AOF rewrite terminated with success");

latencyStartMonitor(latency);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
(int)server.aof_child_pid);
newfd = open(tmpfile,O_WRONLY|O_APPEND);
if (newfd == -1) {
serverLog(LL_WARNING,
"Unable to open the temporary AOF produced by the child: %s", strerror(errno));
goto cleanup;
}

// 写入差异数据
if (aofRewriteBufferWrite(newfd) == -1) {
serverLog(LL_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);

serverLog(LL_NOTICE,
"Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));


if (server.aof_fd == -1) {
// aof未开始
oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
} else {
// aof开启
oldfd = -1;
}

// 临时->正式
latencyStartMonitor(latency);
if (rename(tmpfile,server.aof_filename) == -1) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF file %s into %s: %s",
tmpfile,
server.aof_filename,
strerror(errno));
close(newfd);
if (oldfd != -1) close(oldfd);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rename",latency);

// aof未开启,就不用切换fd,下次开启时,直接打开就行
if (server.aof_fd == -1) {
close(newfd);
} else {
// aof开启情况,需要切换一下fd
oldfd = server.aof_fd;
server.aof_fd = newfd;
if (server.aof_fsync == AOF_FSYNC_ALWAYS)
redis_fsync(newfd);
else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
aof_background_fsync(newfd);
server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;

//清空数据
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}

server.aof_lastbgrewrite_status = C_OK;

serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");

// 更新aof状态 重写->正常
if (server.aof_state == AOF_WAIT_REWRITE)
server.aof_state = AOF_ON;

// 异步关闭旧aof fd
if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);

serverLog(LL_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
if (bysignal != SIGUSR1)
server.aof_lastbgrewrite_status = C_ERR;
serverLog(LL_WARNING,
"Background AOF rewrite terminated with error");
} else {
server.aof_lastbgrewrite_status = C_ERR;

serverLog(LL_WARNING,
"Background AOF rewrite terminated by signal %d", bysignal);
}

cleanup:
aofClosePipes();
aofRewriteBufferReset();
aofRemoveTempFile(server.aof_child_pid);
server.aof_child_pid = -1;
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
server.aof_rewrite_time_start = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
if (server.aof_state == AOF_WAIT_REWRITE)
server.aof_rewrite_scheduled = 1;
}