1. 程式人生 > >第09課:【實戰】Redis網絡通信模塊源碼分析(2)

第09課:【實戰】Redis網絡通信模塊源碼分析(2)

last ltib lstat big 管道 upm 是否 keys 調試工具

偵聽 fd 與客戶端 fd 是如何掛載到 EPFD 上去的

  同樣的方式,要把一個 fd 掛載到 EPFD 上去,需要調用系統 API epoll_ctl ,搜索一下這個函數名。在文件 ae_epoll.c 中我們找到 aeApiAddEvent 函數:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

  

  當把一個 fd 綁定到 EPFD 上去的時候,先從 eventLoop( aeEventLoop類型 )中尋找是否存在已關註的事件類型,如果已經有了,說明使用 epoll_ctl 是更改已綁定的 fd 事件類型( EPOLL_CTL_MOD ),否則就是添加 fd 到 EPFD 上。

  在 aeApiAddEvent 加個斷點,再重啟下 redis-server 。觸發斷點後的調用堆棧如下:

#0  aeCreateFileEvent (eventLoop=0x7ffff083a0a0, fd=15, mask=mask@entry=1, proc=0x437f50 <acceptTcpHandler>, clientData=clientData@entry=0x0) at ae.c:145
#1  0x000000000042f83b in initServer () at server.c:1927
#2  0x0000000000423803 in main (argc=<optimized out>, argv=0x7fffffffe588) at server.c:3857

  同樣在 initServer 函數中,結合上文分析的偵聽 fd 的創建過程,去掉無關代碼,抽出這個函數的主脈絡得到如下偽代碼:

void initServer(void) {

    //記錄程序進程 ID   
    server.pid = getpid();

    //創建程序的 aeEventLoop 對象和 epfd 對象
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

    //創建偵聽 fd
    listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)

    //將偵聽 fd 設置為非阻塞的
    anetNonBlock(NULL,server.sofd);

    //創建 Redis 的定時器,用於執行定時任務 cron
    /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR

    //將偵聽 fd 綁定到 epfd 上去
    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
     aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR

    //創建一個管道,用於在需要時去喚醒 epoll_wait 掛起的整個 EventLoop
    /* Register a readable event for the pipe used to awake the event loop
     * when a blocked client in a module needs attention. */
    aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR)
}

  

  註意:這裏所說的“主脈絡”是指我們關心的網絡通信的主脈絡,不代表這個函數中其他代碼就不是主要的。

  如何驗證這個斷點處掛載到 EPFD 上的 fd 就是偵聽 fd 呢?很簡單,創建偵聽 fd 時,用 GDB 記錄下這個 fd 的值。例如,當我的電腦某次運行時,偵聽 fd 的值是 15 。如下圖( 調試工具用的是 CGDB ):

技術分享圖片

然後在運行程序至綁定 fd 的地方,確認一下綁定到 EPFD 上的 fd 值:

技術分享圖片

  這裏的 fd 值也是 15 ,說明綁定的 fd 是偵聽 fd 。當然在綁定偵聽 fd 時,同時也指定了只關註可讀事件,並設置事件回調函數為 acceptTcpHandler 。對於偵聽 fd ,一般只要關註可讀事件就可以了,當觸發可讀事件,說明有新的連接到來。

aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR

  acceptTcpHandler 函數定義如下( 位於文件 networking.c 中 ):

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(cfd,0,cip);
    }
}

  anetTcpAccept 函數中調用的就是我們上面說的 anetGenericAccept 函數了。

int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
    int fd;
    struct sockaddr_storage sa;
    socklen_t salen = sizeof(sa);
    if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
        return ANET_ERR;

    if (sa.ss_family == AF_INET) {
        struct sockaddr_in *s = (struct sockaddr_in *)&sa;
        if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin_port);
    } else {
        struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
        if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
        if (port) *port = ntohs(s->sin6_port);
    }
    return fd;
}

  

  至此,這段流程總算連起來了,在 acceptTcpHandler 上加個斷點,然後重新運行一下 redis-server ,再開個 redis-cli 去連接 redis-server 。看看是否能觸發該斷點,如果能觸發該斷點,說明我們的分析是正確的。

  經驗證,確實觸發了該斷點。

技術分享圖片

  在 acceptTcpHandler 中成功接受新連接後,產生客戶端 fd ,然後調用 acceptCommonHandler 函數,在該函數中調用 createClient 函數,在 createClient 函數中先將客戶端 fd 設置成非阻塞的,然後將該 fd 關聯到 EPFD 上去,同時記錄到整個程序的 aeEventLoop 對象上。

client *createClient(int fd) {
    //將客戶端 fd 設置成非阻塞的
    anetNonBlock(NULL,fd);
    //啟用 tcp NoDelay 選項
    anetEnableTcpNoDelay(NULL,fd);
    //根據配置,決定是否啟動 tcpkeepalive 選項
    if (server.tcpkeepalive)
        anetKeepAlive(NULL,fd,server.tcpkeepalive);
    //將客戶端 fd 綁定到 epfd,同時記錄到 aeEventLoop 上,關註的事件為 AE_READABLE,回調函數為
    //readQueryFromClient
    aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR

    return c;
}

  

如何處理 fd 可讀事件

  客戶端 fd 觸發可讀事件後,回調函數是 readQueryFromClient 。該函數實現如下( 位於 networking.c 文件中):

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);

    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);

        if (remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We‘ll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }

    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    /* Time to process the buffer. If the client is a master we need to
     * compute the difference between the applied offset before and after
     * processing the buffer, to understand how much of the replication stream
     * was actually applied to the master state: this quantity, and its
     * corresponding part of the replication stream, will be propagated to
     * the sub-slaves and to the replication backlog. */
    if (!(c->flags & CLIENT_MASTER)) {
        processInputBuffer(c);
    } else {
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        size_t applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}

  給這個函數加個斷點,然後重新運行下 redis-server ,再啟動一個客戶端,然後嘗試給服務器發送一個命令“set hello world”。但是在我們實際調試的時候會發現。只要 redis-cli 一連接成功,GDB 就觸發該斷點,此時並沒有發送我們預想的命令。我們單步調試 readQueryFromClient 函數,將收到的數據打印出來,得到如下字符串:

(gdb) p c->querybuf 
$8 = (sds) 0x7ffff09b8685 "*1\r\n$7\r\nCOMMAND\r\n"

  c → querybuf 是什麽呢?這裏 c 的類型是 client 結構體,它是上文中連接接收成功後產生的新客戶端 fd 綁定回調函數時產生的、並傳遞給 readQueryFromClient 函數的參數。我們可以在 server.h 中找到它的定義:

* With multiplexing we need to take per-client state.
 * Clients are taken in a linked list. */
typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    int fd;                 /* Client socket. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    robj *name;             /* As set by CLIENT SETNAME. */
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    //省略掉部分字段
} client;

  

  client 實際上是存儲每個客戶端連接信息的對象,其 fd 字段就是當前連接的 fd,querybuf 字段就是當前連接的接收緩沖區,也就是說每個新客戶端連接都會產生這樣一個對象。從 fd 上收取數據後就存儲在這個 querybuf 字段中。

  我們貼一下完整的 createClient 函數的代碼:

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));

    /* passing -1 as fd it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (fd != -1) {
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }

    selectDb(c,0);
    uint64_t client_id;
    atomicGetIncr(server.next_client_id,client_id,1);
    c->id = client_id;
    c->fd = fd;
    c->name = NULL;
    c->bufpos = 0;
    c->querybuf = sdsempty();
    c->pending_querybuf = sdsempty();
    c->querybuf_peak = 0;
    c->reqtype = 0;
    c->argc = 0;
    c->argv = NULL;
    c->cmd = c->lastcmd = NULL;
    c->multibulklen = 0;
    c->bulklen = -1;
    c->sentlen = 0;
    c->flags = 0;
    c->ctime = c->lastinteraction = server.unixtime;
    c->authenticated = 0;
    c->replstate = REPL_STATE_NONE;
    c->repl_put_online_on_ack = 0;
    c->reploff = 0;
    c->read_reploff = 0;
    c->repl_ack_off = 0;
    c->repl_ack_time = 0;
    c->slave_listening_port = 0;
    c->slave_ip[0] = ‘\0‘;
    c->slave_capa = SLAVE_CAPA_NONE;
    c->reply = listCreate();
    c->reply_bytes = 0;
    c->obuf_soft_limit_reached_time = 0;
    listSetFreeMethod(c->reply,freeClientReplyValue);
    listSetDupMethod(c->reply,dupClientReplyValue);
    c->btype = BLOCKED_NONE;
    c->bpop.timeout = 0;
    c->bpop.keys = dictCreate(&objectKeyPointerValueDictType,NULL);
    c->bpop.target = NULL;
    c->bpop.numreplicas = 0;
    c->bpop.reploffset = 0;
    c->woff = 0;
    c->watched_keys = listCreate();
    c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
    c->pubsub_patterns = listCreate();
    c->peerid = NULL;
    listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
    listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
    if (fd != -1) listAddNodeTail(server.clients,c);
    initClientMultiState(c);
    return c;
}

  

第09課:【實戰】Redis網絡通信模塊源碼分析(2)