概述 rio是redis实现的一个流式I/O抽象,针对不同的输入/输出(文件、内存、fd),提供统一的读、写、查看offset等方法,在aof
,rdb
经常用到文件类型和内存内类。
源码分析 数据结构 从数据结构的union中可以看出,目前支持三种类型,buffer
内存、file
文件、fdset
描述符集。提供通用的读、写、查看offst、数据flush、计算校验方法。
buffer 内存类型,只有两个属性,数据块(动态字符串sds)和offset
file 文件类型,包含fd
、buffered
、autosync
三个属性。其中buffered
、autosync
是用来强制把数据flush到磁盘上的,autosync
是指写入多少字节后执行flush,buffered
则是记录上次flush后处理的字节数。
fdset fd集合类型,主要是管道或者socket,把内存块的数据依次写入集合中的每一个fd,失败的情况有且只有集合内所有的fd都失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 struct _rio { size_t (*read)(struct _rio *, void *buf, size_t len); size_t (*write)(struct _rio *, const void *buf, size_t len); off_t (*tell)(struct _rio *); int (*flush)(struct _rio *); void (*update_cksum)(struct _rio *, const void *buf, size_t len); uint64_t cksum; size_t processed_bytes; size_t max_processing_chunk; union { struct { sds ptr; off_t pos; } buffer; struct { FILE *fp; off_t buffered; off_t autosync; } file; struct { int *fds; int *state; int numfds; off_t pos; sds buf; } fdset; } io; };
通用接口 rioWrite 通用的写接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static inline size_t rioWrite (rio *r, const void *buf, size_t len) { while (len) { size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write); if (r->write(r,buf,bytes_to_write) == 0 ) return 0 ; buf = (char *)buf + bytes_to_write; len -= bytes_to_write; r->processed_bytes += bytes_to_write; } return 1 ; }
rioRead 通用的读接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static inline size_t rioRead (rio *r, void *buf, size_t len) { while (len) { size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len; if (r->read(r,buf,bytes_to_read) == 0 ) return 0 ; if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read); buf = (char *)buf + bytes_to_read; len -= bytes_to_read; r->processed_bytes += bytes_to_read; } return 1 ; }
rioTell 通用返回offset
1 2 3 static inline off_t rioTell (rio *r) { return r->tell(r); }
rioFlush 通用flush
1 2 3 static inline int rioFlush (rio *r) { return r->flush(r); }
rioGenericUpdateChecksum 计算校验
1 2 3 void rioGenericUpdateChecksum (rio *r, const void *buf, size_t len) { r->cksum = crc64(r->cksum,buf,len); }
rioSetAutoSync 设置文件类型的autosync(写入多少字节后执行flush)
1 2 3 4 void rioSetAutoSync (rio *r, off_t bytes) { serverAssert(r->read == rioFileIO.read); r->io.file.autosync = bytes; }
AOF相关 因为aof操作也是写文件,所以在写文件的基础上,有一些特有的操作,如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 size_t rioWriteBulkCount (rio *r, char prefix, long count) { char cbuf[128 ]; int clen; cbuf[0 ] = prefix; clen = 1 +ll2string(cbuf+1 ,sizeof (cbuf)-1 ,count); cbuf[clen++] = '\r' ; cbuf[clen++] = '\n' ; if (rioWrite(r,cbuf,clen) == 0 ) return 0 ; return clen; } size_t rioWriteBulkString (rio *r, const char *buf, size_t len) { size_t nwritten; if ((nwritten = rioWriteBulkCount(r,'$' ,len)) == 0 ) return 0 ; if (len > 0 && rioWrite(r,buf,len) == 0 ) return 0 ; if (rioWrite(r,"\r\n" ,2 ) == 0 ) return 0 ; return nwritten+len+2 ; } size_t rioWriteBulkLongLong (rio *r, long long l) { char lbuf[32 ]; unsigned int llen; llen = ll2string(lbuf,sizeof (lbuf),l); return rioWriteBulkString(r,lbuf,llen); } size_t rioWriteBulkDouble (rio *r, double d) { char dbuf[128 ]; unsigned int dlen; dlen = snprintf (dbuf,sizeof (dbuf),"%.17g" ,d); return rioWriteBulkString(r,dbuf,dlen); }
内存类型 快速填充数据 快速填充rio结构体用到的数据,指定内存类型的读方法、写方法、tell方法、flush方法,其余的都为空
1 2 3 4 5 6 7 8 9 10 11 static const rio rioBufferIO = { rioBufferRead, rioBufferWrite, rioBufferTell, rioBufferFlush, NULL , 0 , 0 , 0 , { { NULL , 0 } } };
rioInitWithBuffer 初始化,使用内存快速填充数据给rio的各个字段赋值,指定操作内存数据源为s
1 2 3 4 5 void rioInitWithBuffer (rio *r, sds s) { *r = rioBufferIO; r->io.buffer.ptr = s; r->io.buffer.pos = 0 ; }
rioBufferWrite 写操作,把要写入的数据追加到原有sds后面
1 2 3 4 5 static size_t rioBufferWrite (rio *r, const void *buf, size_t len) { r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char *)buf,len); r->io.buffer.pos += len; return 1 ; }
rioBufferRead 读操作,剩余长度不满足读取长度len时直接返回错误,正常情况从offset位置 开始往buf里拷贝len长度
1 2 3 4 5 6 7 static size_t rioBufferRead (rio *r, void *buf, size_t len) { if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len) return 0 ; memcpy (buf,r->io.buffer.ptr+r->io.buffer.pos,len); r->io.buffer.pos += len; return 1 ; }
rioBufferTell 返回offset
1 2 3 static off_t rioBufferTell (rio *r) { return r->io.buffer.pos; }
rioBufferFlush 数据落实flush,由于操作的对象是内存,不存在flush这一说
1 2 3 4 static int rioBufferFlush (rio *r) { UNUSED(r); return 1 ; }
文件类型 快速填充数据 快速填充rio结构体用到的数据,指定文件类型的读方法、写方法、tell方法、flush方法,其余的都为空
1 2 3 4 5 6 7 8 9 10 11 static const rio rioFileIO = { rioFileRead, rioFileWrite, rioFileTell, rioFileFlush, NULL , 0 , 0 , 0 , { { NULL , 0 } } };
rioInitWithFile 初始化,使用文件快速填充数据给rio的各个字段赋值,指定操作文件为fp
1 2 3 4 5 6 void rioInitWithFile (rio *r, FILE *fp) { *r = rioFileIO; r->io.file.fp = fp; r->io.file.buffered = 0 ; r->io.file.autosync = 0 ; }
rioFileWrite 写操作,往文件写入buf数据,并flush
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static size_t rioFileWrite (rio *r, const void *buf, size_t len) { size_t retval; retval = fwrite(buf,len,1 ,r->io.file.fp); r->io.file.buffered += len; if (r->io.file.autosync && r->io.file.buffered >= r->io.file.autosync) { fflush(r->io.file.fp); redis_fsync(fileno(r->io.file.fp)); r->io.file.buffered = 0 ; } return retval; }
rioFileRead 读操作,从文件读取数据,存入buf中
1 2 3 static size_t rioFileRead (rio *r, void *buf, size_t len) { return fread(buf,len,1 ,r->io.file.fp); }
rioFileTell 返回位置offset
1 2 3 static off_t rioFileTell (rio *r) { return ftello(r->io.file.fp); }
rioFileFlush flush数据到磁盘
1 2 3 static int rioFileFlush (rio *r) { return (fflush(r->io.file.fp) == 0 ) ? 1 : 0 ; }
fd类型 快速填充数据 快速填充rio结构体用到的数据,指定fd类型的读方法、写方法、tell方法、flush方法,其余的都为空
1 2 3 4 5 6 7 8 9 10 11 static const rio rioFdsetIO = { rioFdsetRead, rioFdsetWrite, rioFdsetTell, rioFdsetFlush, NULL , 0 , 0 , 0 , { { NULL , 0 } } };
rioInitWithFdset 初始化,使用fd快速填充数据给rio的各个字段赋值,使用参数fds填充rio的fd
1 2 3 4 5 6 7 8 9 10 11 12 void rioInitWithFdset (rio *r, int *fds, int numfds) { int j; *r = rioFdsetIO; r->io.fdset.fds = zmalloc(sizeof (int )*numfds); r->io.fdset.state = zmalloc(sizeof (int )*numfds); memcpy (r->io.fdset.fds,fds,sizeof (int )*numfds); for (j = 0 ; j < numfds; j++) r->io.fdset.state[j] = 0 ; r->io.fdset.numfds = numfds; r->io.fdset.pos = 0 ; r->io.fdset.buf = sdsempty(); }
rioFdsetWrite 写数据,往rio的fd集合中的每一个fd写入长度为len的数据,数据为buf,如果在这些fds集合中的有一个成功了,整个方法就算是成功。如果buf为NULL并且len为0,会去看rio的buf中是否还有未处理的数据,有的话就处理,这种情况下就相当于flush的功能了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 static size_t rioFdsetWrite (rio *r, const void *buf, size_t len) { ssize_t retval; int j; unsigned char *p = (unsigned char *) buf; int doflush = (buf == NULL && len == 0 ); if (len) { r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len); len = 0 ; if (sdslen(r->io.fdset.buf) > PROTO_IOBUF_LEN) doflush = 1 ; } if (doflush) { p = (unsigned char *) r->io.fdset.buf; len = sdslen(r->io.fdset.buf); } while (len) { size_t count = len < 1024 ? len : 1024 ; int broken = 0 ; for (j = 0 ; j < r->io.fdset.numfds; j++) { if (r->io.fdset.state[j] != 0 ) { broken++; continue ; } size_t nwritten = 0 ; while (nwritten != count) { retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten); if (retval <= 0 ) { if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT; break ; } nwritten += retval; } if (nwritten != count) { r->io.fdset.state[j] = errno; if (r->io.fdset.state[j] == 0 ) r->io.fdset.state[j] = EIO; } } if (broken == r->io.fdset.numfds) return 0 ; p += count; len -= count; r->io.fdset.pos += count; } if (doflush) sdsclear(r->io.fdset.buf); return 1 ; }
rioFdsetRead 该方法没有实际操作
1 2 3 4 5 6 static size_t rioFdsetRead (rio *r, void *buf, size_t len) { UNUSED(r); UNUSED(buf); UNUSED(len); return 0 ; }
rioFdsetTell 返回位置offset
1 2 3 static off_t rioFdsetTell (rio *r) { return r->io.fdset.pos; }
rioFdsetFlush flush数据
1 2 3 static int rioFdsetFlush (rio *r) { return rioFdsetWrite(r,NULL ,0 ); }
rioFreeFdset 释放fdset
1 2 3 4 5 void rioFreeFdset (rio *r) { zfree(r->io.fdset.fds); zfree(r->io.fdset.state); sdsfree(r->io.fdset.buf); }