redis-rio

概述

rio是redis实现的一个流式I/O抽象,针对不同的输入/输出(文件、内存、fd),提供统一的读、写、查看offset等方法,在aofrdb经常用到文件类型和内存内类。

源码分析

数据结构

从数据结构的union中可以看出,目前支持三种类型,buffer内存、file文件、fdset描述符集。提供通用的读、写、查看offst、数据flush、计算校验方法。

buffer
内存类型,只有两个属性,数据块(动态字符串sds)和offset

file
文件类型,包含fdbufferedautosync三个属性。其中bufferedautosync是用来强制把数据flush到磁盘上的,autosync是指写入多少字节后执行flush,buffered则是记录上次flush后处理的字节数。

fdset
fd集合类型,主要是管道或者socket,把内存块的数据依次写入集合中的每一个fd,失败的情况有且只有集合内所有的fd都失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
struct _rio {
size_t (*read)(struct _rio *, void *buf, size_t len); // 读
size_t (*write)(struct _rio *, const void *buf, size_t len); //写
off_t (*tell)(struct _rio *); //查看offset
int (*flush)(struct _rio *); //数据flush

void (*update_cksum)(struct _rio *, const void *buf, size_t len); //计算数据校验

uint64_t cksum; //当前校验

size_t processed_bytes; //已处理的字节数(读/写)

size_t max_processing_chunk; //一次处理最大的数据量

union {
/* 内存 */
struct {
sds ptr; //数据
off_t pos; //处理offset
} buffer;
/* 标准文件 */
struct {
FILE *fp; //文件句柄
off_t buffered; //自从上次fsync后写入的字节数
off_t autosync; /* 当写入autosync字节后,开始执行fsync */
} file;
/* 多个fd socket */
struct {
int *fds; //文件描述符
int *state; // fds对应的状态 0表示ok,其余error
int numfds; //fds数量
off_t pos; //数据处理offset
sds buf; //操作的数据
} fdset;
} io;
};

通用接口

rioWrite

通用的写接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
while (len) {
// 控制每次写入的大小
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
//计算校验值
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
//数据具体写入
if (r->write(r,buf,bytes_to_write) == 0)
return 0;
//移动写入指针和数据长度
buf = (char*)buf + bytes_to_write;
len -= bytes_to_write;
//已处理字节数增加
r->processed_bytes += bytes_to_write;
}
return 1;
}
rioRead

通用的读接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static inline size_t rioRead(rio *r, void *buf, size_t len) {
while (len) {
// 控制每次写入的大小
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
//数据具体读取
if (r->read(r,buf,bytes_to_read) == 0)
return 0;
//计算校验值
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_read);
//移动读取指针和数据长度
buf = (char*)buf + bytes_to_read;
len -= bytes_to_read;
//已处理字节数增加
r->processed_bytes += bytes_to_read;
}
return 1;
}
rioTell

通用返回offset

1
2
3
static inline off_t rioTell(rio *r) {
return r->tell(r);
}
rioFlush

通用flush

1
2
3
static inline int rioFlush(rio *r) {
return r->flush(r);
}
rioGenericUpdateChecksum

计算校验

1
2
3
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
r->cksum = crc64(r->cksum,buf,len);
}
rioSetAutoSync

设置文件类型的autosync(写入多少字节后执行flush)

1
2
3
4
void rioSetAutoSync(rio *r, off_t bytes) {
serverAssert(r->read == rioFileIO.read);
r->io.file.autosync = bytes;
}
AOF相关

因为aof操作也是写文件,所以在写文件的基础上,有一些特有的操作,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 写bulkcount prefix为协议中的一种,详见redis协议,最终表现为 eg:*<count>\r\n *为prefix *5\r\n
size_t rioWriteBulkCount(rio *r, char prefix, long count) {
char cbuf[128];
int clen;

cbuf[0] = prefix;

// 数字count转字符串
clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,count);
cbuf[clen++] = '\r';
cbuf[clen++] = '\n';
if (rioWrite(r,cbuf,clen) == 0) return 0;
return clen;
}

// 写bulkstring 分三步,bulkstring组成详见redis协议
size_t rioWriteBulkString(rio *r, const char *buf, size_t len) {
size_t nwritten;

// step1 先写 bulkstring开始 $<len>\r\n
if ((nwritten = rioWriteBulkCount(r,'$',len)) == 0) return 0;

// step2 再写中间真正字符串内容
if (len > 0 && rioWrite(r,buf,len) == 0) return 0;

//step3 写结尾的\r\n
if (rioWrite(r,"\r\n",2) == 0) return 0;
return nwritten+len+2;
}

// longlong当 bulkstring 写入
size_t rioWriteBulkLongLong(rio *r, long long l) {
char lbuf[32];
unsigned int llen;

llen = ll2string(lbuf,sizeof(lbuf),l);
return rioWriteBulkString(r,lbuf,llen);
}

// double 当 bulkstring 写入
size_t rioWriteBulkDouble(rio *r, double d) {
char dbuf[128];
unsigned int dlen;

dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
return rioWriteBulkString(r,dbuf,dlen);
}

内存类型

快速填充数据

快速填充rio结构体用到的数据,指定内存类型的读方法、写方法、tell方法、flush方法,其余的都为空

1
2
3
4
5
6
7
8
9
10
11
static const rio rioBufferIO = {
rioBufferRead,
rioBufferWrite,
rioBufferTell,
rioBufferFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
rioInitWithBuffer

初始化,使用内存快速填充数据给rio的各个字段赋值,指定操作内存数据源为s

1
2
3
4
5
void rioInitWithBuffer(rio *r, sds s) {
*r = rioBufferIO;
r->io.buffer.ptr = s;
r->io.buffer.pos = 0;
}
rioBufferWrite

写操作,把要写入的数据追加到原有sds后面

1
2
3
4
5
static size_t rioBufferWrite(rio *r, const void *buf, size_t len) {
r->io.buffer.ptr = sdscatlen(r->io.buffer.ptr,(char*)buf,len);
r->io.buffer.pos += len;
return 1;
}
rioBufferRead

读操作,剩余长度不满足读取长度len时直接返回错误,正常情况从offset位置 开始往buf里拷贝len长度

1
2
3
4
5
6
7
static size_t rioBufferRead(rio *r, void *buf, size_t len) {
if (sdslen(r->io.buffer.ptr)-r->io.buffer.pos < len)
return 0; /* not enough buffer to return len bytes. */
memcpy(buf,r->io.buffer.ptr+r->io.buffer.pos,len);
r->io.buffer.pos += len;
return 1;
}
rioBufferTell

返回offset

1
2
3
static off_t rioBufferTell(rio *r) {
return r->io.buffer.pos;
}
rioBufferFlush

数据落实flush,由于操作的对象是内存,不存在flush这一说

1
2
3
4
static int rioBufferFlush(rio *r) {
UNUSED(r);
return 1;
}

文件类型

快速填充数据

快速填充rio结构体用到的数据,指定文件类型的读方法、写方法、tell方法、flush方法,其余的都为空

1
2
3
4
5
6
7
8
9
10
11
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
rioFileFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
rioInitWithFile

初始化,使用文件快速填充数据给rio的各个字段赋值,指定操作文件为fp

1
2
3
4
5
6
void rioInitWithFile(rio *r, FILE *fp) {
*r = rioFileIO;
r->io.file.fp = fp;
r->io.file.buffered = 0;
r->io.file.autosync = 0;
}
rioFileWrite

写操作,往文件写入buf数据,并flush

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
size_t retval;

retval = fwrite(buf,len,1,r->io.file.fp);
r->io.file.buffered += len;

// 如果从上次fsync之后写入的大小达到了autosync,这时候需要执行flush操作
if (r->io.file.autosync &&
r->io.file.buffered >= r->io.file.autosync)
{
//强制把缓冲区内容写入磁盘
fflush(r->io.file.fp);
//更新文件的修改时间
redis_fsync(fileno(r->io.file.fp));
r->io.file.buffered = 0;
}
return retval;
}
rioFileRead

读操作,从文件读取数据,存入buf中

1
2
3
static size_t rioFileRead(rio *r, void *buf, size_t len) {
return fread(buf,len,1,r->io.file.fp);
}
rioFileTell

返回位置offset

1
2
3
static off_t rioFileTell(rio *r) {
return ftello(r->io.file.fp);
}
rioFileFlush

flush数据到磁盘

1
2
3
static int rioFileFlush(rio *r) {
return (fflush(r->io.file.fp) == 0) ? 1 : 0;
}

fd类型

快速填充数据

快速填充rio结构体用到的数据,指定fd类型的读方法、写方法、tell方法、flush方法,其余的都为空

1
2
3
4
5
6
7
8
9
10
11
static const rio rioFdsetIO = {
rioFdsetRead,
rioFdsetWrite,
rioFdsetTell,
rioFdsetFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
rioInitWithFdset

初始化,使用fd快速填充数据给rio的各个字段赋值,使用参数fds填充rio的fd

1
2
3
4
5
6
7
8
9
10
11
12
void rioInitWithFdset(rio *r, int *fds, int numfds) {
int j;

*r = rioFdsetIO;
r->io.fdset.fds = zmalloc(sizeof(int)*numfds);
r->io.fdset.state = zmalloc(sizeof(int)*numfds);
memcpy(r->io.fdset.fds,fds,sizeof(int)*numfds);
for (j = 0; j < numfds; j++) r->io.fdset.state[j] = 0;
r->io.fdset.numfds = numfds;
r->io.fdset.pos = 0;
r->io.fdset.buf = sdsempty();
}
rioFdsetWrite

写数据,往rio的fd集合中的每一个fd写入长度为len的数据,数据为buf,如果在这些fds集合中的有一个成功了,整个方法就算是成功。如果buf为NULL并且len为0,会去看rio的buf中是否还有未处理的数据,有的话就处理,这种情况下就相当于flush的功能了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static size_t rioFdsetWrite(rio *r, const void *buf, size_t len) {
ssize_t retval;
int j;
unsigned char *p = (unsigned char*) buf;

// 是否是flush
int doflush = (buf == NULL && len == 0);

// 把buf内容追加到rio.fd.buf中,如果rio.fd.buf内容超出PROTO_IOBUF_LEN大小,flush也为1
if (len) {
r->io.fdset.buf = sdscatlen(r->io.fdset.buf,buf,len);
len = 0; /* Prevent entering the while below if we don't flush. */
if (sdslen(r->io.fdset.buf) > PROTO_IOBUF_LEN) doflush = 1;
}

// 如果flush为1,重定向写数据的开始点为rio.fd.buf
if (doflush) {
p = (unsigned char*) r->io.fdset.buf;
len = sdslen(r->io.fdset.buf);
}

// 分块发送最大1024字节
while(len) {
size_t count = len < 1024 ? len : 1024;
int broken = 0;

// 跳过fds结合中error的fd
for (j = 0; j < r->io.fdset.numfds; j++) {
if (r->io.fdset.state[j] != 0) {
/* Skip FDs alraedy in error. */
broken++;
continue;
}

// 写数据
size_t nwritten = 0;
while(nwritten != count) {
retval = write(r->io.fdset.fds[j],p+nwritten,count-nwritten);
if (retval <= 0) {
if (retval == -1 && errno == EWOULDBLOCK) errno = ETIMEDOUT;
break;
}
nwritten += retval;
}

// 更新fd状态error
if (nwritten != count) {
/* Mark this FD as broken. */
r->io.fdset.state[j] = errno;
if (r->io.fdset.state[j] == 0) r->io.fdset.state[j] = EIO;
}
}

// 如果所有的fd都error了 整个方法返回0(error)
if (broken == r->io.fdset.numfds) return 0; /* All the FDs in error. */

//移动发送数据指针
p += count;
len -= count;

// 处理rio处理offset
r->io.fdset.pos += count;
}

// 执行flush
if (doflush) sdsclear(r->io.fdset.buf);
return 1;
}
rioFdsetRead

该方法没有实际操作

1
2
3
4
5
6
static size_t rioFdsetRead(rio *r, void *buf, size_t len) {
UNUSED(r);
UNUSED(buf);
UNUSED(len);
return 0; /* Error, this target does not support reading. */
}
rioFdsetTell

返回位置offset

1
2
3
static off_t rioFdsetTell(rio *r) {
return r->io.fdset.pos;
}
rioFdsetFlush

flush数据

1
2
3
static int rioFdsetFlush(rio *r) {
return rioFdsetWrite(r,NULL,0);
}
rioFreeFdset

释放fdset

1
2
3
4
5
void rioFreeFdset(rio *r) {
zfree(r->io.fdset.fds);
zfree(r->io.fdset.state);
sdsfree(r->io.fdset.buf);
}