redis-6.0-thread

其实在6.0之前,redis也是有多线程的,BIO这个模块就是使用多线程处理关闭文件刷新aof数据到磁盘惰性释放key这三件事情。在6.0版本中,redis引入的多线程,只是用来处理跟客户端的网络I/O,这里的多线程同时只在做一件事,只可能同时在读数据,或者同时在写。不会出现,一部分线程读数据一部分写数据的情况。

多线程仅仅是用在网络I/O这块,在执行命令阶段,还是跟之前一样,由主进程依次执行。在一个处理周期内,主线程把有读数据需求的客户端,平均分配到每个子线程的待处理池。然后打开开关,并设置此阶段所有线程的行为为读操作,之后所有线程开始读取数据,主线程会一直等到所有的子线程都处理完,然后在主线程中依次解析&执行命令。然后主线程把有写数据需求的客户端平均分配到每个子线程的待处理池,打开开关,并设置此阶段所有线程的操作行为为写操作,之后所有线程开始执行写数据。

上述是大概的多线程流程,实际情况下,并不是每一个处理周期都会用到多线程来处理网络I/O,详细的会在下面介绍。

redis多线程为什么只在网络I/O这块,因为redis的一个核心就是处理命令的时候,使用单线程处理,这样做避免了资源的竞争问题。如果执行命令的过程中改成多线程的话,则需要对大量的资源进行加锁来保证,这样对于redis来说改动是很大的,从DB,到里面的每一个key、value,再到每一个value的数据结构实现,都是需要考虑并发的问题的。

配置

redis多线程的配置有两个地方
io_threads_num : 表示线程的数量,有效值为[1,128]。为1的时候,redis的运行流程跟之前一样
io_threads_do_reads : 表示是否启用多线程进行读IO操作,默认是关闭的

多线程流程

流程如下图所示

初始化

main函数中,会调用一个InitServerLast方法,这个方法里除了调用bioInit初始化BIO多线程,也会调用initThreadedIO来初始化网络I/O多线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void initThreadedIO(void) {
//先关闭多线程开关
io_threads_active = 0;

//只起一个线程的话,那就是主线程处理
if (server.io_threads_num == 1) return;

//线程数量最大值128校验
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}

//创建子线程
for (int i = 0; i < server.io_threads_num; i++) {
//初始换 子线程后面需要处理的客户端池
io_threads_list[i] = listCreate();
//主线程的话,就是当前线程,不用创建
if (i == 0) continue;

//非主线程
pthread_t tid;
//初始化锁
pthread_mutex_init(&io_threads_mutex[i],NULL);
//子线程客户端池里的数量
io_threads_pending[i] = 0;

//锁住子进程,不让其启动
pthread_mutex_lock(&io_threads_mutex[i]);

//创建子线程,指定入口方法
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}

//记录子线程id
io_threads[i] = tid;
}
}

线程分配

当客户端跟服务端建立连接后,会指定处理客户端socket读的处理方法为readQueryFromClient,在这个方法中,会去调用postponeClientRead来判断,是否需要采用多线程的方法来处理。如果开启了多线程的话,会把当前需要读数据的客户端放在server.clients_pending_read列表中,然后等待后续分配。否则的话,就跟5.X的版本一样,直接从socket读数据,读取到一个完整命令后,直接解析执行。

读操作

redis在每次进入事件循环前,都会调用beforeSleep,在beforeSleep中,则会调用handleClientsWithPendingReadsUsingThreads,把上一步放在列表中的客户端按照轮询的方法,依次分给每个线程(io_threads_list[i])。然后设置多线程的统一操作为读操作,并打开处理开关(设置io_threads_pending[i]io_threads_list[i]的数量,子进程发现io_threads_pending[i]不为0的时候,就会开始工作)。主线程处理完后,会一直等待所有的子线程都处理完。

写操作

同样是在beforeSleep中,在读操作完成,以及其他一些操作完成后,会调用handleClientsWithPendingWritesUsingThreads利用多线程发送数据给客户端。
服务端需要给客户端发送数据时,该客户端会被放在server.clients_pending_write列表中。然后在这个方法中,依次把列表中的客户端按照轮询的方法,依次分给每个线程(io_threads_list[i])。然后设置多线程的统一操作为写操作,并打开处理开关,和读操作一样。主线程等待所有线程处理完成,然后进行后续操作。

当需要处理的客户端数量小于线程数的两倍时,则不会采用多线程处理,而是全部交由主线程处理。

源码分析

初始化线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void initThreadedIO(void) {
//先关闭多线程开关
io_threads_active = 0;

//只起一个线程的话,那就是主线程处理
if (server.io_threads_num == 1) return;

//线程数量最大值128校验
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}

//创建子线程
for (int i = 0; i < server.io_threads_num; i++) {
//初始换 子线程后面需要处理的客户端池
io_threads_list[i] = listCreate();
//主线程的话,就是当前线程,不用创建
if (i == 0) continue;

//非主线程
pthread_t tid;
//初始化锁
pthread_mutex_init(&io_threads_mutex[i],NULL);
//子线程客户端池里的数量
io_threads_pending[i] = 0;

//锁住子进程,不让其启动
pthread_mutex_lock(&io_threads_mutex[i]);

//创建子线程,指定入口方法
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}

//记录子线程id
io_threads[i] = tid;
}
}

客户端进入读列表

客户端有数据过来时,放入列表,等待后面统一分配给多线程。主要是两个方法readQueryFromClientpostponeClientRead。6.0之前都是在readQueryFromClient中直接读取数据。6.0之后在此方法中加入了postponeClientRead来适配多线程处理,如果postponeClientRead决定采用多线程处理,readQueryFromClient就直接返回了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;

//6.0新增内容,由postponeClientRead决定要不要走多线程
if (postponeClientRead(c)) return;

//其余代码省略
//直接从fd读取数据
}

int postponeClientRead(client *c) {
//多线程开启&且是统一读,放入列表,等待分配
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}

客户端读过程-多线程分配&处理

beforeSleep中调用该方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
int handleClientsWithPendingReadsUsingThreads(void) {
//未开启多线程,或者此时多线程并不是用于读
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;

//需要处理的客户端数量为0
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;

if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);

//轮训分配给每个线程
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}

//多线程开关置为读
io_threads_op = IO_THREADS_OP_READ;

//开启多线程处理
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}

//主线程处理分配给自己的
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);

//等待其他线程都完成
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");

//处理等待过程中,新进来的数据
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);

if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
continue;
}
}
processInputBuffer(c);
}

//统计处理次数
server.stat_io_reads_processed += processed;

return processed;
}

客户端进入写列表

服务端通过addReply把数据发送给客户端,在6.0后,增加prepareClientToWrite来判断要不要采用多线程处理,最后调用clientInstallWriteHandler将客户端放入列表,等待后续分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;

if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyProtoToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}

int prepareClientToWrite(client *c) {
if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;

if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;

if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;

if ((c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;

if (!c->conn) return C_ERR; /* Fake client for AOF loading. */

if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);

return C_OK;
}

void clientInstallWriteHandler(client *c) {

if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
}

客户端写过程-多线程分配&处理

beforeSleep中调用该方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
int handleClientsWithPendingWritesUsingThreads(void) {
//待写的客户端数量为0
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0;

//线程数量为1,或者待写客户端数量<线程数量*2,都交由主线程处理
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}

//启用多线程
if (!server.io_threads_active) startThreadedIO();

if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

//轮训分配客户端给每个线程
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;

if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}

int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}

//开启每个线程
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}

//处理主线程的那一份
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);

//等待所有线程都结束
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");

//处理等待阶段新产生的内容
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);

if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);

//更新统计
server.stat_io_writes_processed += processed;

return processed;
}