Redis 6 中的多线程是如何实现的 !
发布时间:2022-08-02 12:23:57 所属栏目:云计算 来源:互联网
导读:Redis 是一个高性能服务端的典范。它通过多路复用 epoll 来管理海量的用户连接,只使用一个线程来通过事件循环来处理所有用户请求,就可以达到每秒数万 QPS 的处理能力。下图是单线程版本 Redis 工作的核心原理图(详情参见:单线程 Redis 如何做到每秒数万
Redis 是一个高性能服务端的典范。它通过多路复用 epoll 来管理海量的用户连接,只使用一个线程来通过事件循环来处理所有用户请求,就可以达到每秒数万 QPS 的处理能力。下图是单线程版本 Redis 工作的核心原理图(详情参见:单线程 Redis 如何做到每秒数万 QPS 的超高处理能力!)。 单线程的 Redis 虽然性能很高,但是却有两个问题。一个问题是没有办法充分发挥现代 CPU 的多核处理能力,一个实例只能使用一个核的能力。二是如果某个用户请求的处理过程卡住一段时间,会导致其它所有的请求都会出现超时的情况。所以,在线上的 redis 使用过程时是明确禁止使用 keys * 等长耗时的操作的。 那如何改进呢,思路和方向其实很明确。那就是和其它的主流程序一样引入多线程,用更多的线程来分担这些可能耗时的操作。事实上 Redis 也确实这么干了,在 6.0 以后的版本里,开始支持了多线程。我们今天就来领略一下 Redis 的多线程是如何实现的。 一、多线程 Redis 服务启动 首先获取多线程版本 Redis 的源码 复制 # git clone https://github.com/redis/redis # cd redis # git checkout -b 6.2.0 6.2.0 1. 2. 3. 默认情况下多线程是默认关闭的。如果想要启动多线程,需要在配置文件中做适当的修改。相关的配置项是 io-threads 和 io-threads-do-reads 两个。 复制 #vi /usr/local/soft/redis6/conf/redis.conf io-threads 4 #启用的 io 线程数量 io-threads-do-reads yes #读请求也使用io线程 1. 2. 3. 其中 io-threads 表示要启动的 io 线程的数量。io-threads-do-reads 表示是否在读阶段也使用 io 线程,默认是只在写阶段使用 io 线程的。 现在假设我们已经打开了如上两项多线程配置。带着这个假设,让我们进入到 Redis 的 main 入口函数。 复制 //file: src/server.c int main(int argc, char **argv) { ...... // 1.1 主线程初始化 initServer(); // 1.2 启动 io 线程 InitServerLast(); // 进入事件循环 aeMain(server.el); } 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 1.1 主线程初始化 在 initServer 这个函数内,Redis 主线程做了这么几件重要的事情。 初始化读任务队列、写任务队列 创建一个 epoll 对象 对配置的监听端口进行 listen 把 listen socket 让 epoll 给管理起来 复制 //file: src/server.c void initServer() { // 1 初始化 server 对象 server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); ...... // 2 初始化回调 events,创建 epoll server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); // 3 绑定监听服务端口 listenToPort(server.port,server.ipfd,&server.ipfd_count); // 4 注册 accept 事件处理器 for (j = 0; j < server.ipfd_count; j++) { aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL); } ... } 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16. 17. 接下来我们分别来看。 初始化 server 对象 在 initServer 的一开头,先是对 server 的各种成员变量进行初始化。值得注意的是 clients_pending_write 和 clients_pending_read 这两个成员,它们分别是写任务队列和读任务队列。将来主线程产生的任务都会放在放在这两个任务队列里。 主线程会根据这两个任务队列来进行任务哈希散列,以将任务分配到多个线程中进行处理。 aeCreateEventLoop 处理 我们来看 aeCreateEventLoop 详细逻辑。它会初始化事件回调 event,并且创建了一个 epoll 对象出来。 复制 //file:src/ae.c aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; eventLoop = zmalloc(sizeof(*eventLoop); //将来的各种回调事件就都会存在这里 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); ...... aeApiCreate(eventLoop); return eventLoop; } 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 我们注意一下 eventLoop->events,将来在各种事件注册的时候都会保存到这个数组里。 复制 //file:src/ae.h typedef struct aeEventLoop { ...... aeFileEvent *events; /* Registered events */ } 1. 2. 3. 4. 5. 具体创建 epoll 的过程在 ae_epoll.c 文件下的 aeApiCreate 中。在这里,真正调用了 epoll_create 复制 //file:src/ae_epoll.c static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); state->epfd = epoll_create(1024); eventLoop->apidata = state; return 0; } 1. 2. 3. 4. 5. 6. 7. 绑定监听服务端口 我们再来看 Redis 中的 listen 过程,它在 listenToPort 函数中。调用链条很长,依次是 listenToPort => anetTcpServer => _anetTcpServer => anetListen。在 anetListen 中,就是简单的 bind 和 listen 的调用。 复制 //file:src/anet.c static int anetListen(......) { bind(s,sa,len); listen(s, backlog); ...... } 1. 2. 3. 4. 5. 6. 注册事件回调函数 前面我们调用 aeCreateEventLoop 创建了 epoll,调用 listenToPort 进行了服务端口的 bind 和 listen。接着就调用的 aeCreateFileEvent 就是来注册一个 accept 事件处理器。 我们来看 aeCreateFileEvent 具体代码。 复制 //file: src/ae.c int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { // 取出一个文件事件结构 aeFileEvent *fe = &eventLoop->events[fd]; // 监听指定 fd 的指定事件 aeApiAddEvent(eventLoop, fd, mask); // 设置文件事件类型,以及事件的处理器 fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; // 私有数据 fe->clientData = clientData; } 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 函数 aeCreateFileEvent 一开始,从 eventLoop->events 获取了一个 aeFileEvent 对象。 接下来调用 aeApiAddEvent。这个函数其实就是对 epoll_ctl 的一个封装。主要就是实际执行 epoll_ctl EPOLL_CTL_ADD。 复制 //file:src/ae_epoll.c static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { // add or mod int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ...... // epoll_ctl 添加事件 epoll_ctl(state->epfd,op,fd,&ee); return 0; } 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 每一个 eventLoop->events 元素都指向一个 aeFileEvent 对象。在这个对象上,设置了三个关键东西 rfileProc:读事件回调 wfileProc:写事件回调 clientData:一些额外的扩展数据 将来 当 epoll_wait 发现某个 fd 上有事件发生的时候,这样 redis 首先根据 fd 到 eventLoop->events 中查找 aeFileEvent 对象,然后再看 rfileProc、wfileProc 就可以找到读、写回调处理函数。 回头看 initServer 调用 aeCreateFileEvent 时传参来看。 复制 //file: src/server.c void initServer() { ...... for (j = 0; j < server.ipfd_count; j++) { aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL); } } 1. 2. 3. 4. 5. 6. 7. 8. listen fd 对应的读回调函数 rfileProc 事实上就被设置成了 acceptTcpHandler,写回调没有设置,私有数据 client_data 也为 null。 1.2 io 线程启动 在主线程启动以后,会调用 InitServerLast => initThreadedIO 来创建多个 io 线程。 将来这些 IO 线程会配合主线程一起共同来处理所有的 read 和 write 任务。 我们来看 InitServerLast 创建 IO 线程的过程。 复制 //file:src/server.c void InitServerLast() { initThreadedIO(); ...... } 1. 2. 3. 4. 5. 复制 //file:src/networking.c void initThreadedIO(void) { //如果没开启多 io 线程配置就不创建了 if (server.io_threads_num == 1) return; //开始 io 线程的创建 for (int i = 0; i < server.io_threads_num; i++) { pthread_t tid; pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) io_threads[i] = tid; } } 在 initThreadedIO 中调用 pthread_create 库函数创建线程,并且注册线程回调函数 IOThreadMain。 复制 //file:src/networking.c void *IOThreadMain(void *myid) { long id = (unsigned long)myid; while(1) { //循环等待任务 for (int j = 0; j < 1000000; j++) { if (getIOPendingCount(id) != 0) break; } //允许主线程来关闭自己 ...... //遍历当前线程等待队列里的请求 client listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); } } 是将当前线程等待队列 io_threads_list[id] 里所有的请求 client,依次取出处理。其中读操作通过 readQueryFromClient 处理, 写操作通过 writeToClient 处理。其中 io_threads_list[id] 中的任务是主线程分配过来的,后面我们将会看到。 二、主线程事件循环 接着我们进入到 Redis 最重要的 aeMain,这个函数就是一个死循环(Redis 不退出的话),不停地执行 aeProcessEvents 函数。 复制 void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } } 1. 2. 3. 4. 5. 6. 7. 8. 其中 aeProcessEvents 就是所谓的事件分发器。它通过调用 epoll_wait 来发现所发生的各种事件,然后调用事先注册好的处理函数进行处理。 a (编辑:肇庆站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |