serverLog(LL_WARNING, "Redis needs to enable the AOF but can't open the " "append only file %s (in server root dir %s): %s", server.aof_filename, cwdp ? cwdp : "unknown", strerror(errno)); return C_ERR; }
//rdb正在执行 if (server.rdb_child_pid != -1) { server.aof_rewrite_scheduled = 1; serverLog(LL_WARNING,"AOF was enabled but there is already a child process saving an RDB file on disk. An AOF background was scheduled to start when possible."); } else { // aof正在执行,kill子进程 if (server.aof_child_pid != -1) { serverLog(LL_WARNING,"AOF was enabled but there is already an AOF rewriting in background. Stopping background AOF and starting a rewrite now."); killAppendOnlyChild(); }
//开启aof重写 if (rewriteAppendOnlyFileBackground() == C_ERR) { close(newfd); serverLog(LL_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error."); return C_ERR; } } server.aof_state = AOF_WAIT_REWRITE; server.aof_last_fsync = server.unixtime; server.aof_fd = newfd; return C_OK; }
// 如果上一次flush的时间到现在超了 if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { if (sync_in_progress) { if (server.aof_flush_postponed_start == 0) { // 记录开始 server.aof_flush_postponed_start = server.unixtime; return; } elseif (server.unixtime - server.aof_flush_postponed_start < 2) { // 2秒内继续等 return; } // 超过两秒,可能磁盘忙 server.aof_delayed_fsync++; serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } }
/* We performed the write so reset the postponed flush sentinel to zero. */ server.aof_flush_postponed_start = 0;
if (nwritten != (ssize_t)sdslen(server.aof_buf)) { statictime_t last_write_error_log = 0; int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */ if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) { can_log = 1; last_write_error_log = server.unixtime; }
/* Log the AOF write error and record the error code. */ if (nwritten == -1) { if (can_log) { serverLog(LL_WARNING,"Error writing to the AOF file: %s", strerror(errno)); server.aof_last_write_errno = errno; } } else { if (can_log) { serverLog(LL_WARNING,"Short write while writing to " "the AOF file: (nwritten=%lld, " "expected=%lld)", (longlong)nwritten, (longlong)sdslen(server.aof_buf)); }
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { if (can_log) { serverLog(LL_WARNING, "Could not remove short write " "from the append-only file. Redis may refuse " "to load the AOF the next time it starts. " "ftruncate: %s", strerror(errno)); } } else { /* If the ftruncate() succeeded we can set nwritten to * -1 since there is no longer partial data into the AOF. */ nwritten = -1; } server.aof_last_write_errno = ENOSPC; }
/* Handle the AOF write error. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* We can't recover when the fsync policy is ALWAYS since the * reply for the client is already in the output buffers, and we * have the contract with the user that on acknowledged write data * is synced on disk. */ serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..."); exit(1); } else { /* Recover from failed write leaving data into the buffer. However * set an error to stop accepting writes as long as the error * condition is not cleared. */ server.aof_last_write_status = C_ERR;
/* Trim the sds buffer if there was a partial write, and there * was no way to undo it with ftruncate(2). */ if (nwritten > 0) { server.aof_current_size += nwritten; sdsrange(server.aof_buf,nwritten,-1); } return; /* We'll try again on the next call... */ } } else { if (server.aof_last_write_status == C_ERR) { serverLog(LL_WARNING, "AOF write error looks solved, Redis can write again."); server.aof_last_write_status = C_OK; } } server.aof_current_size += nwritten;
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (server.aof_no_fsync_on_rewrite && (server.aof_child_pid != -1 || server.rdb_child_pid != -1)) return;
/* Perform the fsync if needed. */ if (server.aof_fsync == AOF_FSYNC_ALWAYS) { /* redis_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ latencyStartMonitor(latency); redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); server.aof_last_fsync = server.unixtime; } elseif ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { if (!sync_in_progress) aof_background_fsync(server.aof_fd); server.aof_last_fsync = server.unixtime; } }
freeFakeClientArgv(fakeClient); fakeClient->cmd = NULL; if (server.aof_load_truncated) valid_up_to = ftello(fp); }
if (fakeClient->flags & CLIENT_MULTI) { serverLog(LL_WARNING, "Revert incomplete MULTI/EXEC transaction in AOF file"); valid_up_to = valid_before_multi; goto uxeof; }
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ fclose(fp); freeFakeClient(fakeClient); server.aof_state = old_aof_state; stopLoading(); aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; return C_OK;
readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */ if (!feof(fp)) { if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */ serverLog(LL_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno)); exit(1); }
uxeof: /* Unexpected AOF end of file. */ if (server.aof_load_truncated) { serverLog(LL_WARNING,"!!! Warning: short read while loading the AOF file !!!"); serverLog(LL_WARNING,"!!! Truncating the AOF at offset %llu !!!", (unsignedlonglong) valid_up_to); if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) { if (valid_up_to == -1) { serverLog(LL_WARNING,"Last valid command offset is invalid"); } else { serverLog(LL_WARNING,"Error truncating the AOF file: %s", strerror(errno)); } } else { /* Make sure the AOF file descriptor points to the end of the * file after the truncate call. */ if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) { serverLog(LL_WARNING,"Can't seek the end of the AOF file: %s", strerror(errno)); } else { serverLog(LL_WARNING, "AOF loaded anyway because aof-load-truncated is enabled"); goto loaded_ok; } } } if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */ serverLog(LL_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server."); exit(1);
fmterr: /* Format error. */ if (fakeClient) freeFakeClient(fakeClient); /* avoid valgrind warning */ serverLog(LL_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>"); exit(1); }
rioWriteBulkObject
把redis对象按照redis协议输出成bulk string或者bulk long long字符串
if (s->length) { /* Reconstruct the stream data using XADD commands. */ while(streamIteratorGetID(&si,&id,&numfields)) { /* Emit a two elements array for each item. The first is * the ID, the second is an array of field-value pairs. */
/* Emit the XADD <key> <id> ...fields... command. */ if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return0; if (rioWriteBulkString(r,"XADD",4) == 0) return0; if (rioWriteBulkObject(r,key) == 0) return0; if (rioWriteBulkStreamID(r,&id) == 0) return0; while(numfields--) { unsignedchar *field, *value; int64_t field_len, value_len; streamIteratorGetField(&si,&field,&value,&field_len,&value_len); if (rioWriteBulkString(r,(char*)field,field_len) == 0) return0; if (rioWriteBulkString(r,(char*)value,value_len) == 0) return0; } } } else { /* Use the XADD MAXLEN 0 trick to generate an empty stream if * the key we are serializing is an empty string, which is possible * for the Stream type. */ if (rioWriteBulkCount(r,'*',7) == 0) return0; if (rioWriteBulkString(r,"XADD",4) == 0) return0; if (rioWriteBulkObject(r,key) == 0) return0; if (rioWriteBulkString(r,"MAXLEN",6) == 0) return0; if (rioWriteBulkString(r,"0",1) == 0) return0; if (rioWriteBulkStreamID(r,&s->last_id) == 0) return0; if (rioWriteBulkString(r,"x",1) == 0) return0; if (rioWriteBulkString(r,"y",1) == 0) return0; }
/* Append XSETID after XADD, make sure lastid is correct, * in case of XDEL lastid. */ if (rioWriteBulkCount(r,'*',3) == 0) return0; if (rioWriteBulkString(r,"XSETID",6) == 0) return0; if (rioWriteBulkObject(r,key) == 0) return0; if (rioWriteBulkStreamID(r,&s->last_id) == 0) return0;
/* Create all the stream consumer groups. */ if (s->cgroups) { raxIterator ri; raxStart(&ri,s->cgroups); raxSeek(&ri,"^",NULL,0); while(raxNext(&ri)) { streamCG *group = ri.data; /* Emit the XGROUP CREATE in order to create the group. */ if (rioWriteBulkCount(r,'*',5) == 0) return0; if (rioWriteBulkString(r,"XGROUP",6) == 0) return0; if (rioWriteBulkString(r,"CREATE",6) == 0) return0; if (rioWriteBulkObject(r,key) == 0) return0; if (rioWriteBulkString(r,(char*)ri.key,ri.key_len) == 0) return0; if (rioWriteBulkStreamID(r,&group->last_id) == 0) return0;
/* Generate XCLAIMs for each consumer that happens to * have pending entries. Empty consumers have no semantical * value so they are discarded. */ raxIterator ri_cons; raxStart(&ri_cons,group->consumers); raxSeek(&ri_cons,"^",NULL,0); while(raxNext(&ri_cons)) { streamConsumer *consumer = ri_cons.data; /* For the current consumer, iterate all the PEL entries * to emit the XCLAIM protocol. */ raxIterator ri_pel; raxStart(&ri_pel,consumer->pel); raxSeek(&ri_pel,"^",NULL,0); while(raxNext(&ri_pel)) { streamNACK *nack = ri_pel.data; if (rioWriteStreamPendingEntry(r,key,(char*)ri.key, ri.key_len,consumer, ri_pel.key,nack) == 0) { return0; } } raxStop(&ri_pel); } raxStop(&ri_cons); } raxStop(&ri); }
voidbackgroundRewriteDoneHandler(int exitcode, int bysignal){
// 子进程正常退出 if (!bysignal && exitcode == 0) { int newfd, oldfd; char tmpfile[256]; longlong now = ustime(); mstime_t latency;
serverLog(LL_NOTICE, "Background AOF rewrite terminated with success");
latencyStartMonitor(latency); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int)server.aof_child_pid); newfd = open(tmpfile,O_WRONLY|O_APPEND); if (newfd == -1) { serverLog(LL_WARNING, "Unable to open the temporary AOF produced by the child: %s", strerror(errno)); goto cleanup; }
// 写入差异数据 if (aofRewriteBufferWrite(newfd) == -1) { serverLog(LL_WARNING, "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); close(newfd); goto cleanup; } latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-rewrite-diff-write",latency);
serverLog(LL_NOTICE, "Residual parent diff successfully flushed to the rewritten AOF (%.2f MB)", (double) aofRewriteBufferSize() / (1024*1024));
// 异步关闭旧aof fd if (oldfd != -1) bioCreateBackgroundJob(BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
serverLog(LL_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now); } elseif (!bysignal && exitcode != 0) { if (bysignal != SIGUSR1) server.aof_lastbgrewrite_status = C_ERR; serverLog(LL_WARNING, "Background AOF rewrite terminated with error"); } else { server.aof_lastbgrewrite_status = C_ERR;
serverLog(LL_WARNING, "Background AOF rewrite terminated by signal %d", bysignal); }
cleanup: aofClosePipes(); aofRewriteBufferReset(); aofRemoveTempFile(server.aof_child_pid); server.aof_child_pid = -1; server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start; server.aof_rewrite_time_start = -1; /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */ if (server.aof_state == AOF_WAIT_REWRITE) server.aof_rewrite_scheduled = 1; }