其实在6.0之前,redis也是有多线程的,BIO
这个模块就是使用多线程处理关闭文件
,刷新aof数据到磁盘
,惰性释放key
这三件事情。在6.0版本中,redis引入的多线程,只是用来处理跟客户端的网络I/O,这里的多线程同时只在做一件事,只可能同时在读数据,或者同时在写。不会出现,一部分线程读数据一部分写数据的情况。
多线程仅仅是用在网络I/O这块,在执行命令阶段,还是跟之前一样,由主进程依次执行。在一个处理周期内,主线程把有读数据需求的客户端,平均分配到每个子线程的待处理池。然后打开开关,并设置此阶段所有线程的行为为读操作
,之后所有线程开始读取数据,主线程会一直等到所有的子线程都处理完,然后在主线程中依次解析&执行命令。然后主线程把有写数据需求的客户端平均分配到每个子线程的待处理池,打开开关,并设置此阶段所有线程的操作行为为写操作
,之后所有线程开始执行写数据。
上述是大概的多线程流程,实际情况下,并不是每一个处理周期都会用到多线程来处理网络I/O,详细的会在下面介绍。
redis多线程为什么只在网络I/O这块,因为redis的一个核心就是处理命令的时候,使用单线程处理,这样做避免了资源的竞争问题。如果执行命令的过程中改成多线程的话,则需要对大量的资源进行加锁来保证,这样对于redis来说改动是很大的,从DB,到里面的每一个key、value,再到每一个value的数据结构实现,都是需要考虑并发的问题的。
配置 redis多线程的配置有两个地方io_threads_num
: 表示线程的数量,有效值为[1,128]。为1的时候,redis的运行流程跟之前一样io_threads_do_reads
: 表示是否启用多线程进行读IO操作,默认是关闭的
多线程流程 流程如下图所示
初始化 在main
函数中,会调用一个InitServerLast
方法,这个方法里除了调用bioInit
初始化BIO
多线程,也会调用initThreadedIO
来初始化网络I/O多线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void initThreadedIO (void ) { io_threads_active = 0 ; if (server.io_threads_num == 1 ) return ; if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d." , IO_THREADS_MAX_NUM); exit (1 ); } for (int i = 0 ; i < server.io_threads_num; i++) { io_threads_list[i] = listCreate(); if (i == 0 ) continue ; pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL ); io_threads_pending[i] = 0 ; pthread_mutex_lock(&io_threads_mutex[i]); if (pthread_create(&tid,NULL ,IOThreadMain,(void *)(long )i) != 0 ) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread." ); exit (1 ); } io_threads[i] = tid; } }
线程分配 当客户端跟服务端建立连接后,会指定处理客户端socket读的处理方法为readQueryFromClient
,在这个方法中,会去调用postponeClientRead
来判断,是否需要采用多线程的方法来处理。如果开启了多线程的话,会把当前需要读数据的客户端放在server.clients_pending_read
列表中,然后等待后续分配。否则的话,就跟5.X的版本一样,直接从socket读数据,读取到一个完整命令后,直接解析执行。
读操作 redis在每次进入事件循环前,都会调用beforeSleep
,在beforeSleep
中,则会调用handleClientsWithPendingReadsUsingThreads
,把上一步放在列表中的客户端按照轮询的方法,依次分给每个线程(io_threads_list[i]
)。然后设置多线程的统一操作为读操作
,并打开处理开关(设置io_threads_pending[i]
为io_threads_list[i]
的数量,子进程发现io_threads_pending[i]
不为0的时候,就会开始工作)。主线程处理完后,会一直等待所有的子线程都处理完。
写操作 同样是在beforeSleep
中,在读操作完成,以及其他一些操作完成后,会调用handleClientsWithPendingWritesUsingThreads
利用多线程发送数据给客户端。 服务端需要给客户端发送数据时,该客户端会被放在server.clients_pending_write
列表中。然后在这个方法中,依次把列表中的客户端按照轮询的方法,依次分给每个线程(io_threads_list[i]
)。然后设置多线程的统一操作为写操作
,并打开处理开关,和读操作一样。主线程等待所有线程处理完成,然后进行后续操作。
当需要处理的客户端数量小于线程数的两倍时,则不会采用多线程处理,而是全部交由主线程处理。
源码分析 初始化线程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 void initThreadedIO (void ) { io_threads_active = 0 ; if (server.io_threads_num == 1 ) return ; if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d." , IO_THREADS_MAX_NUM); exit (1 ); } for (int i = 0 ; i < server.io_threads_num; i++) { io_threads_list[i] = listCreate(); if (i == 0 ) continue ; pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL ); io_threads_pending[i] = 0 ; pthread_mutex_lock(&io_threads_mutex[i]); if (pthread_create(&tid,NULL ,IOThreadMain,(void *)(long )i) != 0 ) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread." ); exit (1 ); } io_threads[i] = tid; } }
客户端进入读列表 客户端有数据过来时,放入列表,等待后面统一分配给多线程。主要是两个方法readQueryFromClient
和postponeClientRead
。6.0之前都是在readQueryFromClient
中直接读取数据。6.0之后在此方法中加入了postponeClientRead
来适配多线程处理,如果postponeClientRead
决定采用多线程处理,readQueryFromClient
就直接返回了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void readQueryFromClient (connection *conn) { client *c = connGetPrivateData(conn); int nread, readlen; size_t qblen; if (postponeClientRead(c)) return ; } int postponeClientRead (client *c) { if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) { c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c); return 1 ; } else { return 0 ; } }
客户端读过程-多线程分配&处理 beforeSleep中调用该方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 int handleClientsWithPendingReadsUsingThreads (void ) { if (!server.io_threads_active || !server.io_threads_do_reads) return 0 ; int processed = listLength(server.clients_pending_read); if (processed == 0 ) return 0 ; if (tio_debug) printf ("%d TOTAL READ pending clients\n" , processed); listIter li; listNode *ln; listRewind(server.clients_pending_read,&li); int item_id = 0 ; while ((ln = listNext(&li))) { client *c = listNodeValue(ln); int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } io_threads_op = IO_THREADS_OP_READ; for (int j = 1 ; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; } listRewind(io_threads_list[0 ],&li); while ((ln = listNext(&li))) { client *c = listNodeValue(ln); readQueryFromClient(c->conn); } listEmpty(io_threads_list[0 ]); while (1 ) { unsigned long pending = 0 ; for (int j = 1 ; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0 ) break ; } if (tio_debug) printf ("I/O READ All threads finshed\n" ); while (listLength(server.clients_pending_read)) { ln = listFirst(server.clients_pending_read); client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; listDelNode(server.clients_pending_read,ln); if (c->flags & CLIENT_PENDING_COMMAND) { c->flags &= ~CLIENT_PENDING_COMMAND; if (processCommandAndResetClient(c) == C_ERR) { continue ; } } processInputBuffer(c); } server.stat_io_reads_processed += processed; return processed; }
客户端进入写列表 服务端通过addReply
把数据发送给客户端,在6.0后,增加prepareClientToWrite
来判断要不要采用多线程处理,最后调用clientInstallWriteHandler
将客户端放入列表,等待后续分配。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 void addReply (client *c, robj *obj) { if (prepareClientToWrite(c) != C_OK) return ; if (sdsEncodedObject(obj)) { if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); } else if (obj->encoding == OBJ_ENCODING_INT) { char buf[32 ]; size_t len = ll2string(buf,sizeof (buf),(long )obj->ptr); if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyProtoToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()" ); } } int prepareClientToWrite (client *c) { if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK; if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR; if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR; if ((c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR; if (!c->conn) return C_ERR; if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c); return C_OK; } void clientInstallWriteHandler (client *c) { if (!(c->flags & CLIENT_PENDING_WRITE) && (c->replstate == REPL_STATE_NONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { c->flags |= CLIENT_PENDING_WRITE; listAddNodeHead(server.clients_pending_write,c); } }
客户端写过程-多线程分配&处理 beforeSleep中调用该方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 int handleClientsWithPendingWritesUsingThreads (void ) { int processed = listLength(server.clients_pending_write); if (processed == 0 ) return 0 ; if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { return handleClientsWithPendingWrites(); } if (!server.io_threads_active) startThreadedIO(); if (tio_debug) printf ("%d TOTAL WRITE pending clients\n" , processed); listIter li; listNode *ln; listRewind(server.clients_pending_write,&li); int item_id = 0 ; while ((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; if (c->flags & CLIENT_CLOSE_ASAP) { listDelNode(server.clients_pending_write, ln); continue ; } int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } io_threads_op = IO_THREADS_OP_WRITE; for (int j = 1 ; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; } listRewind(io_threads_list[0 ],&li); while ((ln = listNext(&li))) { client *c = listNodeValue(ln); writeToClient(c,0 ); } listEmpty(io_threads_list[0 ]); while (1 ) { unsigned long pending = 0 ; for (int j = 1 ; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0 ) break ; } if (tio_debug) printf ("I/O WRITE All threads finshed\n" ); listRewind(server.clients_pending_write,&li); while ((ln = listNext(&li))) { client *c = listNodeValue(ln); if (clientHasPendingReplies(c) && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) { freeClientAsync(c); } } listEmpty(server.clients_pending_write); server.stat_io_writes_processed += processed; return processed; }