加入收藏 | 设为首页 | 会员中心 | 我要投稿 肇庆站长网 (https://www.0758zz.cn/)- 数据分析、分布式云、安全管理、云计算、物联设备!
当前位置: 首页 > 云计算 > 正文

Redis 6 中的多线程是如何实现的 !

发布时间:2022-08-02 12:23:57 所属栏目:云计算 来源:互联网
导读:Redis 是一个高性能服务端的典范。它通过多路复用 epoll 来管理海量的用户连接,只使用一个线程来通过事件循环来处理所有用户请求,就可以达到每秒数万 QPS 的处理能力。下图是单线程版本 Redis 工作的核心原理图(详情参见:单线程 Redis 如何做到每秒数万
    Redis 是一个高性能服务端的典范。它通过多路复用 epoll 来管理海量的用户连接,只使用一个线程来通过事件循环来处理所有用户请求,就可以达到每秒数万 QPS 的处理能力。下图是单线程版本 Redis 工作的核心原理图(详情参见:单线程 Redis 如何做到每秒数万 QPS 的超高处理能力!)。
 
 
 
    单线程的 Redis 虽然性能很高,但是却有两个问题。一个问题是没有办法充分发挥现代 CPU 的多核处理能力,一个实例只能使用一个核的能力。二是如果某个用户请求的处理过程卡住一段时间,会导致其它所有的请求都会出现超时的情况。所以,在线上的 redis 使用过程时是明确禁止使用 keys * 等长耗时的操作的。
 
    那如何改进呢,思路和方向其实很明确。那就是和其它的主流程序一样引入多线程,用更多的线程来分担这些可能耗时的操作。事实上 Redis 也确实这么干了,在 6.0 以后的版本里,开始支持了多线程。我们今天就来领略一下 Redis 的多线程是如何实现的。
 
    一、多线程 Redis 服务启动
    首先获取多线程版本 Redis 的源码
 
    复制
    # git clone https://github.com/redis/redis
    # cd redis
    # git checkout -b 6.2.0 6.2.0
    1.
    2.
    3.
    默认情况下多线程是默认关闭的。如果想要启动多线程,需要在配置文件中做适当的修改。相关的配置项是 io-threads 和 io-threads-do-reads 两个。
 
    复制
    #vi /usr/local/soft/redis6/conf/redis.conf  
    io-threads 4 #启用的 io 线程数量
    io-threads-do-reads yes #读请求也使用io线程
    1.
    2.
    3.
    其中 io-threads 表示要启动的 io 线程的数量。io-threads-do-reads 表示是否在读阶段也使用 io 线程,默认是只在写阶段使用 io 线程的。
 
    现在假设我们已经打开了如上两项多线程配置。带着这个假设,让我们进入到 Redis 的 main 入口函数。
 
    复制
    //file: src/server.c
    int main(int argc, char **argv) {
       ......
       // 1.1 主线程初始化
       initServer();
       // 1.2 启动 io 线程
       InitServerLast();
       // 进入事件循环
       aeMain(server.el);
    }
    1.
    2.
    3.
    4.
    5.
    6.
    7.
    8.
    9.
    10.
    1.1 主线程初始化
    在 initServer 这个函数内,Redis 主线程做了这么几件重要的事情。
 
 
 
    初始化读任务队列、写任务队列
    创建一个 epoll 对象
    对配置的监听端口进行 listen
    把 listen socket 让 epoll 给管理起来
    复制
    //file: src/server.c
    void initServer() {
       // 1 初始化 server 对象
       server.clients_pending_write = listCreate();
       server.clients_pending_read = listCreate();
       ......
       // 2 初始化回调 events,创建 epoll
       server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
       // 3 绑定监听服务端口
       listenToPort(server.port,server.ipfd,&server.ipfd_count);
       // 4 注册 accept 事件处理器
       for (j = 0; j < server.ipfd_count; j++) {
           aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
               acceptTcpHandler,NULL);
       }
       ...
    }
    1.
    2.
    3.
    4.
    5.
    6.
    7.
    8.
    9.
    10.
    11.
    12.
    13.
    14.
    15.
    16.
    17.
    接下来我们分别来看。
 
    初始化 server 对象
 
    在 initServer 的一开头,先是对 server 的各种成员变量进行初始化。值得注意的是 clients_pending_write 和 clients_pending_read 这两个成员,它们分别是写任务队列和读任务队列。将来主线程产生的任务都会放在放在这两个任务队列里。
 
    主线程会根据这两个任务队列来进行任务哈希散列,以将任务分配到多个线程中进行处理。
 
    aeCreateEventLoop 处理
 
    我们来看 aeCreateEventLoop 详细逻辑。它会初始化事件回调 event,并且创建了一个 epoll 对象出来。
 
    复制
    //file:src/ae.c
    aeEventLoop *aeCreateEventLoop(int setsize) {
       aeEventLoop *eventLoop;
       eventLoop = zmalloc(sizeof(*eventLoop);
       //将来的各种回调事件就都会存在这里
       eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
       ......
       aeApiCreate(eventLoop);
       return eventLoop;
    }
    1.
    2.
    3.
    4.
    5.
    6.
    7.
    8.
    9.
    10.
    我们注意一下 eventLoop->events,将来在各种事件注册的时候都会保存到这个数组里。
 
    复制
    //file:src/ae.h
    typedef struct aeEventLoop {
       ......
       aeFileEvent *events; /* Registered events */
    }
    1.
    2.
    3.
    4.
    5.
    具体创建 epoll 的过程在 ae_epoll.c 文件下的 aeApiCreate 中。在这里,真正调用了 epoll_create
 
    复制
    //file:src/ae_epoll.c
    static int aeApiCreate(aeEventLoop *eventLoop) {
       aeApiState *state = zmalloc(sizeof(aeApiState));
        state->epfd = epoll_create(1024);  
       eventLoop->apidata = state;
       return 0;
    }
    1.
    2.
    3.
    4.
    5.
    6.
    7.
    绑定监听服务端口
 
    我们再来看 Redis 中的 listen 过程,它在 listenToPort 函数中。调用链条很长,依次是 listenToPort => anetTcpServer => _anetTcpServer => anetListen。在 anetListen 中,就是简单的 bind 和 listen 的调用。
 
    复制
    //file:src/anet.c
    static int anetListen(......) {
       bind(s,sa,len);
       listen(s, backlog);
       ......
    }
    1.
    2.
    3.
    4.
    5.
    6.
    注册事件回调函数
 
    前面我们调用 aeCreateEventLoop 创建了 epoll,调用 listenToPort 进行了服务端口的 bind 和 listen。接着就调用的 aeCreateFileEvent 就是来注册一个 accept 事件处理器。
 
    我们来看 aeCreateFileEvent 具体代码。
 
    复制
    //file: src/ae.c
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
           aeFileProc *proc, void *clientData)
    {
       // 取出一个文件事件结构
       aeFileEvent *fe = &eventLoop->events[fd];
       // 监听指定 fd 的指定事件
       aeApiAddEvent(eventLoop, fd, mask);
       // 设置文件事件类型,以及事件的处理器
       fe->mask |= mask;
       if (mask & AE_READABLE) fe->rfileProc = proc;
       if (mask & AE_WRITABLE) fe->wfileProc = proc;
       // 私有数据
       fe->clientData = clientData;
    }
    1.
    2.
    3.
    4.
    5.
    6.
    7.
    8.
    9.
    10.
    11.
    12.
    13.
    14.
    15.
    函数 aeCreateFileEvent 一开始,从 eventLoop->events 获取了一个 aeFileEvent 对象。
 
    接下来调用 aeApiAddEvent。这个函数其实就是对 epoll_ctl 的一个封装。主要就是实际执行 epoll_ctl EPOLL_CTL_ADD。
 
    复制
    //file:src/ae_epoll.c
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
       // add or mod
       int op = eventLoop->events[fd].mask == AE_NONE ?
               EPOLL_CTL_ADD : EPOLL_CTL_MOD;
       ......
       // epoll_ctl 添加事件
       epoll_ctl(state->epfd,op,fd,&ee);
       return 0;
    }
    1.
    2.
    3.
    4.
    5.
    6.
    7.
    8.
    9.
    10.
    每一个 eventLoop->events 元素都指向一个 aeFileEvent 对象。在这个对象上,设置了三个关键东西
 
    rfileProc:读事件回调
    wfileProc:写事件回调
    clientData:一些额外的扩展数据
    将来 当 epoll_wait 发现某个 fd 上有事件发生的时候,这样 redis 首先根据 fd 到 eventLoop->events 中查找 aeFileEvent 对象,然后再看 rfileProc、wfileProc 就可以找到读、写回调处理函数。
 
    回头看 initServer 调用 aeCreateFileEvent 时传参来看。
 
    复制
    //file: src/server.c
    void initServer() {
       ......
       for (j = 0; j < server.ipfd_count; j++) {
           aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
               acceptTcpHandler,NULL);
       }
    }
    1.
    2.
    3.
    4.
    5.
    6.
    7.
    8.
    listen fd 对应的读回调函数 rfileProc 事实上就被设置成了 acceptTcpHandler,写回调没有设置,私有数据 client_data 也为 null。
 
    1.2 io 线程启动
    在主线程启动以后,会调用 InitServerLast => initThreadedIO 来创建多个 io 线程。
 
 
 
    将来这些 IO 线程会配合主线程一起共同来处理所有的 read 和 write 任务。
 
 
 
    我们来看 InitServerLast 创建 IO 线程的过程。
 
    复制
    //file:src/server.c
    void InitServerLast() {
       initThreadedIO();
       ......
    }
    1.
    2.
    3.
    4.
    5.
    复制
    //file:src/networking.c
    void initThreadedIO(void) {
       //如果没开启多 io 线程配置就不创建了
       if (server.io_threads_num == 1) return;
       //开始 io 线程的创建
       for (int i = 0; i < server.io_threads_num; i++) {
           pthread_t tid;
           pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i)
           io_threads[i] = tid;
       }
    }
     在 initThreadedIO 中调用 pthread_create 库函数创建线程,并且注册线程回调函数 IOThreadMain。
 
    复制
    //file:src/networking.c
    void *IOThreadMain(void *myid) {
       long id = (unsigned long)myid;
       while(1) {
           //循环等待任务
           for (int j = 0; j < 1000000; j++) {
               if (getIOPendingCount(id) != 0) break;
           }
           //允许主线程来关闭自己
           ......
           //遍历当前线程等待队列里的请求 client
           listIter li;
           listNode *ln;
           listRewind(io_threads_list[id],&li);
           while((ln = listNext(&li))) {
               client *c = listNodeValue(ln);
               if (io_threads_op == IO_THREADS_OP_WRITE) {
                   writeToClient(c,0);
               } else if (io_threads_op == IO_THREADS_OP_READ) {
                   readQueryFromClient(c->conn);
               } else {
                   serverPanic("io_threads_op value is unknown");
               }
           }
           listEmpty(io_threads_list[id]);
       }
    }
  
    是将当前线程等待队列 io_threads_list[id] 里所有的请求 client,依次取出处理。其中读操作通过 readQueryFromClient 处理, 写操作通过 writeToClient 处理。其中 io_threads_list[id] 中的任务是主线程分配过来的,后面我们将会看到。
 
    二、主线程事件循环
    接着我们进入到 Redis 最重要的 aeMain,这个函数就是一个死循环(Redis 不退出的话),不停地执行 aeProcessEvents 函数。
 
    复制
    void aeMain(aeEventLoop *eventLoop) {
       eventLoop->stop = 0;
       while (!eventLoop->stop) {
           aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                      AE_CALL_BEFORE_SLEEP|
                                      AE_CALL_AFTER_SLEEP);
       }
    }
    1.
    2.
    3.
    4.
    5.
    6.
    7.
    8.
    其中 aeProcessEvents 就是所谓的事件分发器。它通过调用 epoll_wait 来发现所发生的各种事件,然后调用事先注册好的处理函数进行处理。
 
    a

(编辑:肇庆站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读