1. 程式人生 > >Redis原始碼剖析和註釋(二十)--- 網路連線庫剖析(client的建立/釋放、命令接收/回覆、Redis通訊協議分析等)

Redis原始碼剖析和註釋(二十)--- 網路連線庫剖析(client的建立/釋放、命令接收/回覆、Redis通訊協議分析等)

Redis 網路連線庫剖析

1. Redis網路連線庫介紹

Redis網路連線庫對應的檔案是networking.c。這個檔案主要負責

  • 客戶端的建立與釋放
  • 命令接收與命令回覆
  • Redis通訊協議分析
  • CLIENT 命令的實現

我們接下來就這幾塊內容分別列出原始碼,進行剖析。

2. 客戶端的建立與釋放

2.1客戶端的建立

Redis 伺服器是一個同時與多個客戶端建立連線的程式。當客戶端連線上伺服器時,伺服器會建立一個server.h/client結構來儲存客戶端的狀態資訊。所以在客戶端建立時,就會初始化這樣一個結構,客戶端的建立原始碼如下:

client *createClient(int
fd) { client *c = zmalloc(sizeof(client)); //分配空間 // 如果fd為-1,表示建立的是一個無網路連線的偽客戶端,用於執行lua指令碼的時候。 // 如果fd不等於-1,表示建立一個有網路連線的客戶端 if (fd != -1) { // 設定fd為非阻塞模式 anetNonBlock(NULL,fd); // 禁止使用 Nagle 演算法,client向核心遞交的每個資料包都會立即傳送給server出去,TCP_NODELAY anetEnableTcpNoDelay(NULL,fd); // 如果開啟了tcpkeepalive,則設定 SO_KEEPALIVE
if (server.tcpkeepalive) // 設定tcp連線的keep alive選項 anetKeepAlive(NULL,fd,server.tcpkeepalive); // 建立一個檔案事件狀態el,且監聽讀事件,開始接受命令的輸入 if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return
NULL; } } // 預設選0號資料庫 selectDb(c,0); // 設定client的ID c->id = server.next_client_id++; // client的套接字 c->fd = fd; // client的名字 c->name = NULL; // 回覆固定(靜態)緩衝區的偏移量 c->bufpos = 0; // 輸入快取區 c->querybuf = sdsempty(); // 輸入快取區的峰值 c->querybuf_peak = 0; // 請求協議型別,內聯或者多條命令,初始化為0 c->reqtype = 0; // 引數個數 c->argc = 0; // 引數列表 c->argv = NULL; // 當前執行的命令和最近一次執行的命令 c->cmd = c->lastcmd = NULL; // 查詢緩衝區剩餘未讀取命令的數量 c->multibulklen = 0; // 讀入引數的長度 c->bulklen = -1; // 已發的位元組數 c->sentlen = 0; // client的狀態 c->flags = 0; // 設定建立client的時間和最後一次互動的時間 c->ctime = c->lastinteraction = server.unixtime; // 認證狀態 c->authenticated = 0; // replication複製的狀態,初始為無 c->replstate = REPL_STATE_NONE; // 設定從節點的寫處理器為ack,是否在slave向master傳送ack c->repl_put_online_on_ack = 0; // replication複製的偏移量 c->reploff = 0; // 通過ack命令接收到的偏移量 c->repl_ack_off = 0; // 通過ack命令接收到的偏移量所用的時間 c->repl_ack_time = 0; // 從節點的埠號 c->slave_listening_port = 0; // 從節點IP地址 c->slave_ip[0] = '\0'; // 從節點的功能 c->slave_capa = SLAVE_CAPA_NONE; // 回覆連結串列 c->reply = listCreate(); // 回覆連結串列的位元組數 c->reply_bytes = 0; // 回覆緩衝區的記憶體大小軟限制 c->obuf_soft_limit_reached_time = 0; // 回覆連結串列的釋放和複製方法 listSetFreeMethod(c->reply,decrRefCountVoid); listSetDupMethod(c->reply,dupClientReplyValue); // 阻塞型別 c->btype = BLOCKED_NONE; // 阻塞超過時間 c->bpop.timeout = 0; // 造成阻塞的鍵字典 c->bpop.keys = dictCreate(&setDictType,NULL); // 儲存解除阻塞的鍵,用於儲存PUSH入元素的鍵,也就是dstkey c->bpop.target = NULL; // 阻塞狀態 c->bpop.numreplicas = 0; // 要達到的複製偏移量 c->bpop.reploffset = 0; // 全域性的複製偏移量 c->woff = 0; // 監控的鍵 c->watched_keys = listCreate(); // 訂閱頻道 c->pubsub_channels = dictCreate(&setDictType,NULL); // 訂閱模式 c->pubsub_patterns = listCreate(); // 被快取的peerid,peerid就是 ip:port c->peerid = NULL; // 訂閱釋出模式的釋放和比較方法 listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); // 將真正的client放在伺服器的客戶端連結串列中 if (fd != -1) listAddNodeTail(server.clients,c); // 初始化client的事物狀態 initClientMultiState(c); return c; }

根據傳入的檔案描述符fd,可以建立用於不同情景下的client。這個fd就是伺服器接收客戶端connect後所返回的檔案描述符。

  • fd == -1。表示建立一個無網路連線的客戶端。主要用於執行 lua 指令碼時。
  • fd != -1。表示接收到一個正常的客戶端連線,則會建立一個有網路連線的客戶端,也就是建立一個檔案事件,來監聽這個fd是否可讀,當客戶端傳送資料,則事件被觸發。建立客戶端時,還會禁用Nagle演算法。

Nagle演算法能自動連線許多的小緩衝器訊息,這一過程(稱為nagling)通過減少必須傳送包的個數來增加網路軟體系統的效率。但是伺服器和客戶端的對即使通訊性有很高的要求,因此禁止使用 Nagle 演算法,客戶端向核心遞交的每個資料包都會立即傳送給伺服器。

建立客戶端的過程,會將server.h/client結構的所有成員初始化,接下里會介紹部分重點的成員。

  • int id:伺服器對於每一個連線進來的都會建立一個ID,客戶端的ID從1開始。每次重啟伺服器會重新整理。
  • int fd:當前客戶端狀態描述符。分為無網路連線的客戶端和有網路連線的客戶端。
  • int flags:客戶端狀態的標誌。Redis 3.2.8 中在server.h中定義了23種狀態。
  • robj *name:預設建立的客戶端是沒有名字的,可以通過CLIENT SETNAME命令設定名字。後面會介紹該命令的實現。
  • int reqtype:請求協議的型別。因為Redis伺服器支援Telnet的連線,因此Telnet命令請求協議型別是PROTO_REQ_INLINE,而redis-cli命令請求的協議型別是PROTO_REQ_MULTIBULK

用於儲存伺服器接受客戶端命令的成員:

  • sds querybuf:儲存客戶端發來命令請求的輸入緩衝區。以Redis通訊協議的方式儲存。
  • size_t querybuf_peak:儲存輸入緩衝區的峰值。
  • int argc:命令引數個數。
  • robj *argv:命令引數列表。

用於儲存伺服器給客戶端回覆的成員:

  • char buf[16*1024]:儲存執行完命令所得命令回覆資訊的靜態緩衝區,它的大小是固定的,所以主要儲存的是一些比較短的回覆。分配client結構空間時,就會分配一個16K的大小。
  • int bufpos:記錄靜態緩衝區的偏移量,也就是buf陣列已經使用的位元組數。
  • list *reply:儲存命令回覆的連結串列。因為靜態緩衝區大小固定,主要儲存固定長度的命令回覆,當處理一些返回大量回復的命令,則會將命令回覆以連結串列的形式連線起來。
  • unsigned long long reply_bytes:儲存回覆連結串列的位元組數。
  • size_t sentlen:已傳送回覆的位元組數。

2.2 客戶端的釋放

客戶端的釋放freeClient()函式主要就是釋放各種資料結構和清空一些緩衝區等等操作,這裡就不列出原始碼。但是我們關注一下非同步釋放客戶端。原始碼如下:

// 非同步釋放client
void freeClientAsync(client *c) {
    // 如果是已經即將關閉或者是lua指令碼的偽client,則直接返回
    if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
    c->flags |= CLIENT_CLOSE_ASAP;
    // 將client加入到即將關閉的client連結串列中
    listAddNodeTail(server.clients_to_close,c);
}
  • server.clients_to_close:是伺服器儲存所有待關閉的client連結串列。

設定非同步釋放客戶端的目的主要是:防止底層函式正在向客戶端的輸出緩衝區寫資料的時候,關閉客戶端,這樣是不安全的。Redis會安排客戶端在serverCron()函式的安全時間釋放它。

當然也可以取消非同步釋放,那麼就會呼叫freeClient()函式立即釋放。原始碼如下:

// 取消設定非同步釋放的client
void freeClientsInAsyncFreeQueue(void) {
    // 遍歷所有即將關閉的client
    while (listLength(server.clients_to_close)) {
        listNode *ln = listFirst(server.clients_to_close);
        client *c = listNodeValue(ln);

        // 取消立即關閉的標誌
        c->flags &= ~CLIENT_CLOSE_ASAP;
        freeClient(c);
        // 從即將關閉的client連結串列中刪除
        listDelNode(server.clients_to_close,ln);
    }
}

3. 命令接收與命令回覆

3.1 命令接收

當客戶端連線上Redis伺服器後,伺服器會得到一個檔案描述符fd,而且伺服器會監聽該檔案描述符的讀事件,這些在createClient()函式中,我們有分析。那麼當客戶端傳送了命令,觸發了AE_READABLE事件,那麼就會呼叫回撥函式readQueryFromClient()來從檔案描述符fd中讀發來的命令,並儲存在輸入緩衝區中querybuf。而這個回撥函式就是我們在Redis 事件處理實現一文中所提到的指向回撥函式的指標rfileProcwfileProc。那麼,我們先來分析sendReplyToClient()函式。

// 讀取client的輸入緩衝區的內容
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);

    // 讀入的長度,預設16MB
    readlen = PROTO_IOBUF_LEN;
    /* If this is a multi bulk request, and we are processing a bulk reply
     * that is large enough, try to maximize the probability that the query
     * buffer contains exactly the SDS string representing the object, even
     * at the risk of requiring more read(2) calls. This way the function
     * processMultiBulkBuffer() can avoid copying buffers to create the
     * Redis Object representing the argument. */
    // 如果是多條請求,根據請求的大小,設定讀入的長度readlen
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);

        if (remaining < readlen) readlen = remaining;
    }

    // 輸入緩衝區的長度
    qblen = sdslen(c->querybuf);
    // 更新緩衝區的峰值
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    // 擴充套件緩衝區的大小
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // 將client發來的命令,讀入到輸入緩衝區中
    nread = read(fd, c->querybuf+qblen, readlen);
    // 讀操作出錯
    if (nread == -1) {
        if (errno == EAGAIN) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    // 讀操作完成
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    }
    // 更新輸入緩衝區的已用大小和未用大小。
    sdsIncrLen(c->querybuf,nread);
    // 設定最後一次伺服器和client互動的時間
    c->lastinteraction = server.unixtime;
    // 如果是主節點,則更新複製操作的偏移量
    if (c->flags & CLIENT_MASTER) c->reploff += nread;
    // 更新從網路輸入的位元組數
    server.stat_net_input_bytes += nread;
    // 如果輸入緩衝區長度超過伺服器設定的最大緩衝區長度
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        // 將client資訊轉換為sds
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

        // 輸入緩衝區儲存在bytes中
        bytes = sdscatrepr(bytes,c->querybuf,64);
        // 列印到日誌
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        // 釋放空間
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }
    // 處理client輸入的命令內容
    processInputBuffer(c);
}

實際上,這個readQueryFromClient()函式是read函式的封裝,從檔案描述符fd中讀出資料到輸入緩衝區querybuf中,並更新輸入緩衝區的峰值querybuf_peak,而且會檢查讀的長度,如果大於了server.client_max_querybuf_len則會退出,而這個閥值在伺服器初始化為PROTO_MAX_QUERYBUF_LEN (1024*1024*1024)也就是1G大小。

回憶之前的各種命令實現,都是通過client的argvargc這兩個成員來處理的。因此,伺服器還需要將輸入緩衝區querybuf中的資料,處理成引數列表的物件,也就是上面的processInputBuffer()函式。原始碼如下:

// 處理client輸入的命令內容
void processInputBuffer(client *c) {
    server.current_client = c;
    /* Keep processing while there is something in the input buffer */
    // 一直讀輸入緩衝區的內容
    while(sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        // 如果處於暫停狀態,直接返回
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        /* Immediately abort if the client is in the middle of something. */
        // 如果client處於被阻塞狀態,直接返回
        if (c->flags & CLIENT_BLOCKED) break;

        // 如果client處於關閉狀態,則直接返回
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        /* Determine request type when unknown. */
        // 如果是未知的請求型別,則判定請求型別
        if (!c->reqtype) {
            // 如果是"*"開頭,則是多條請求,是client發來的
            if (c->querybuf[0] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            // 否則就是內聯請求,是Telnet發來的
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        // 如果是內聯請求
        if (c->reqtype == PROTO_REQ_INLINE) {
            // 處理Telnet發來的內聯命令,並建立成物件,儲存在client的引數列表中
            if (processInlineBuffer(c) != C_OK) break;
        // 如果是多條請求
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            // 將client的querybuf中的協議內容轉換為client的引數列表中的物件
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        // 如果引數為0,則重置client
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
            // 執行命令成功後重置client
            if (processCommand(c) == C_OK)
                resetClient(c);
            /* freeMemoryIfNeeded may flush slave output buffers. This may result
             * into a slave, that may be the active client, to be freed. */
            if (server.current_client == NULL) break;
        }
    }
    // 執行成功,則將用於崩潰報告的client設定為NULL
    server.current_client = NULL;
}

這個processInputBuffer()函式只要根據reqtype來判斷和設定請求的型別,之前提過,因為Redis伺服器支援Telnet的連線,因此Telnet命令請求協議型別是PROTO_REQ_INLINE,進而呼叫processInlineBuffer()函式處理,而redis-cli命令請求的協議型別是PROTO_REQ_MULTIBULK,進而呼叫processMultibulkBuffer()函式來處理。我們只要看processMultibulkBuffer()函式,是如果將Redis協議的命令,處理成引數列表的物件的。原始碼如下:

// 將client的querybuf中的協議內容轉換為client的引數列表中的物件
int processMultibulkBuffer(client *c) {
    char *newline = NULL;
    int pos = 0, ok;
    long long ll;

    // 引數列表中命令數量為0
    if (c->multibulklen == 0) {
        /* The client should have been reset */
        serverAssertWithInfo(c,NULL,c->argc == 0);

        /* Multi bulk length cannot be read without a \r\n */
        // 查詢第一個換行符
        newline = strchr(c->querybuf,'\r');
        // 沒有找到\r\n,表示不符合協議,返回錯誤
        if (newline == NULL) {
            if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
                addReplyError(c,"Protocol error: too big mbulk count string");
                setProtocolError(c,0);
            }
            return C_ERR;
        }

        /* Buffer should also contain \n */
        // 檢查格式
        if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
            return C_ERR;

        /* We know for sure there is a whole line since newline != NULL,
         * so go ahead and find out the multi bulk length. */
        // 保證第一個字元為'*'
        serverAssertWithInfo(c,NULL,c->querybuf[0] == '*');
        // 將'*'之後的數字轉換為整數。*3\r\n
        ok = string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll);
        if (!ok || ll > 1024*1024) {
            addReplyError(c,"Protocol error: invalid multibulk length");
            setProtocolError(c,pos);
            return C_ERR;
        }

        // 指向"*3\r\n"的"\r\n"之後的位置
        pos = (newline-c->querybuf)+2;
        // 空白命令,則將之前的刪除,保留未閱讀的部分
        if (ll <= 0) {
            sdsrange(c->querybuf,pos,-1);
            return C_OK;
        }

        // 引數數量
        c->multibulklen = ll;

        /* Setup argv array on client structure */
        // 分配client引數列表的空間
        if (c->argv) zfree(c->argv);
        c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
    }

    serverAssertWithInfo(c,NULL,c->multibulklen > 0);
    // 讀入multibulklen個引數,並建立物件儲存在引數列表中
    while(c->multibulklen) {
        /* Read bulk length if unknown */
        // 讀入引數的長度
        if (c->bulklen == -1) {
            // 找到換行符,確保"\r\n"存在
            newline = strchr(c->querybuf+pos,'\r');
            if (newline == NULL) {
                if (sdslen(c->querybuf) > PROTO_INLINE_MAX_SIZE) {
                    addReplyError(c,
                        "Protocol error: too big bulk count string");
                    setProtocolError(c,0);
                    return C_ERR;
                }
                break;
            }

            /* Buffer should also contain \n */
            // 檢查格式
            if (newline-(c->querybuf) > ((signed)sdslen(c->querybuf)-2))
                break;

            // $3\r\nSET\r\n...,確保是'$'字元,保證格式
            if (c->querybuf[pos] != '$') {
                addReplyErrorFormat(c,
                    "Protocol error: expected '$', got '%c'",
                    c->querybuf[pos]);
                setProtocolError(c,pos);
                return C_ERR;
            }

            // 將命令長度儲存到ll。
            ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll);
            if (!ok || ll < 0 || ll > 512*1024*1024) {
                addReplyError(c,"Protocol error: invalid bulk length");
                setProtocolError(c,pos);
                return C_ERR;
            }

            // 定位第一個引數的位置,也就是SET的S
            pos += newline-(c->querybuf+pos)+2;
            // 引數太長,進行優化
            if (ll >= PROTO_MBULK_BIG_ARG) {
                size_t qblen;

                /* If we are going to read a large object from network
                 * try to make it likely that it will start at c->querybuf
                 * boundary so that we can optimize object creation
                 * avoiding a large copy of data. */
                // 如果我們要從網路中讀取一個大的物件,嘗試使它可能從c-> querybuf邊界開始,以便我們可以優化物件建立,避免大量的資料副本
                // 儲存未讀取的部分
                sdsrange(c->querybuf,pos,-1);
                // 重置偏移量
                pos = 0;
                // 獲取querybuf中已使用的長度
                qblen = sdslen(c->querybuf);
                /* Hint the sds library about the amount of bytes this string is
                 * going to contain. */
                // 擴充套件querybuf的大小
                if (qblen < (size_t)ll+2)
                    c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-qblen);
            }
            // 儲存引數的長度
            c->bulklen = ll;
        }

        /* Read bulk argument */
        // 因為只讀了multibulklen位元組的資料,讀到的資料不夠,則直接跳出迴圈,執行processInputBuffer()函式迴圈讀取
        if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) {
            /* Not enough data (+2 == trailing \r\n) */
            break;
        // 為引數建立了物件
        } else {
            /* Optimization: if the buffer contains JUST our bulk element
             * instead of creating a new object by *copying* the sds we
             * just use the current sds string. */
            // 如果讀入的長度大於32k
            if (pos == 0 &&
                c->bulklen >= PROTO_MBULK_BIG_ARG &&
                (signed) sdslen(c->querybuf) == c->bulklen+2)
            {
                c->argv[c->argc++] = createObject(OBJ_STRING,c->querybuf);
                // 跳過換行
                sdsIncrLen(c->querybuf,-2); /* remove CRLF */
                /* Assume that if we saw a fat argument we'll see another one
                 * likely... */
                // 設定一個新長度
                c->querybuf = sdsnewlen(NULL,c->bulklen+2);
                sdsclear(c->querybuf);
                pos = 0;
            // 建立物件儲存在client的引數列表中
            } else {
                c->argv[c->argc++] =
                    createStringObject(c->querybuf+pos,c->bulklen);
                pos += c->bulklen+2;
            }
            // 清空命令內容的長度
            c->bulklen = -1;
            // 未讀取命令引數的數量,讀取一個,該值減1
            c->multibulklen--;
        }
    }

    /* Trim to pos */
    // 刪除已經讀取的,保留未讀取的
    if (pos) sdsrange(c->querybuf,pos,-1);

    /* We're done when c->multibulk == 0 */
    // 命令的引數全部被讀取完
    if (c->multibulklen == 0) return C_OK;

    /* Still not read to process the command */
    return C_ERR;
}

我們結合一個多條批量回復進行分析。一個多條批量回復以 *<argc>\r\n 為字首,後跟多條不同的批量回復,其中 argc 為這些批量回復的數量。那麼SET nmykey nmyvalue命令轉換為Redis協議內容如下:

"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"

當進入processMultibulkBuffer()函式之後,如果是第一次執行該函式,那麼argv中未讀取的命令數量為0,也就是說引數列表為空,那麼會執行if (c->multibulklen == 0)的程式碼,這裡的程式碼會解析*3\r\n,將3儲存到multibulklen中,表示後面的引數個數,然後根據引數個數,為argv分配空間。

接著,執行multibulklen次while迴圈,每次讀一個引數,例如$3\r\nSET\r\n,也是先讀出引數長度,儲存在bulklen中,然後將引數SET儲存構建成物件儲存到引數列表中。每次讀一個引數,multibulklen就會減1,當等於0時,就表示命令的引數全部讀取到引數列表完畢。

於是命令接收的整個過程完成。

3.2 命令回覆

命令回覆的函式,也是事件處理程式的回撥函式之一。當伺服器的client的回覆緩衝區有資料,那麼就會呼叫aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c)函式,將檔案描述符fdAE_WRITABLE事件關聯起來,當客戶端可寫時,就會觸發事件,呼叫sendReplyToClient()函式,執行寫事件。我們重點看這個函式的程式碼:

// 寫事件處理程式,只是傳送回覆給client
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    UNUSED(el);
    UNUSED(mask);
    // 傳送完資料會刪除fd的可讀事件
    writeToClient(fd,privdata,1);
}

這個函式直接呼叫了writeToClient()函式,該函式原始碼如下:

// 將輸出緩衝區的資料寫給client,如果client被釋放則返回C_ERR,沒被釋放則返回C_OK
int writeToClient(int fd, client *c, int handler_installed) {
    ssize_t nwritten = 0, totwritten = 0;
    size_t objlen;
    size_t objmem;
    robj *o;

    // 如果指定的client的回覆緩衝區中還有資料,則返回真,表示可以寫socket
    while(clientHasPendingReplies(c)) {
        // 固定緩衝區傳送未完成
        if (c->bufpos > 0) {
            // 將緩衝區的資料寫到fd中
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            // 寫失敗跳出迴圈
            if (nwritten <= 0) break;
            // 更新發送的資料計數器
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* If the buffer was sent, set bufpos to zero to continue with
             * the remainder of the reply. */
            // 如果傳送的資料等於buf的偏移量,表示傳送完成
            if ((int)c->sentlen == c->bufpos) {
                // 則將其重置
                c->bufpos = 0;
                c->sentlen = 0;
            }
        // 固定緩衝區傳送完成,傳送回覆連結串列的內容
        } else {
            // 回覆連結串列的第一條回覆物件,和物件值的長度和所佔的記憶體
            o = listNodeValue(listFirst(c->reply));
            objlen = sdslen(o->ptr);
            objmem = getStringObjectSdsUsedMemory(o);

            // 跳過空物件,並刪除這個物件
            if (objlen == 0) {
                listDelNode(c->reply,listFirst(c->reply));
                c->reply_bytes -= objmem;
                continue;
            }

            // 將當前節點的值寫到fd中
            nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
            // 寫失敗跳出迴圈
            if (nwritten <= 0) break;
            // 更新發送的資料計數器
            c->sentlen += nwritten;
            totwritten += nwritten;

            /* If we fully sent the object on head go to the next one */
            // 傳送完成,則刪除該節點,重置傳送的資料長度,更新回覆連結串列的總位元組數
            if (c->sentlen == objlen) {
                listDelNode(c->reply,listFirst(c->reply));
                c->sentlen = 0;
                c->reply_bytes -= objmem;
            }
        }
        // 更新寫到網路的位元組數
        server.stat_net_output_bytes += totwritten;
        // 如果這次寫的總量大於NET_MAX_WRITES_PER_EVENT的限制,則會中斷本次的寫操作,將處理時間讓給其他的client,以免一個非常的回覆獨佔伺服器,剩餘的資料下次繼續在寫
        // 但是,如果當伺服器的記憶體數已經超過maxmemory,即使超過最大寫NET_MAX_WRITES_PER_EVENT的限制,也會繼續執行寫入操作,是為了儘快寫入給客戶端
        if (totwritten > NET_MAX_WRITES_PER_EVENT &&
            (server.maxmemory == 0 ||
             zmalloc_used_memory() < server.maxmemory)) break;
    }
    // 處理寫入失敗
    if (nwritten == -1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            serverLog(LL_VERBOSE,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return C_ERR;
        }
    }
    // 寫入成功
    if (totwritten > 0) {
        // 如果不是主節點伺服器,則更新最近和伺服器互動的時間
        if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
    }
    // 如果指定的client的回覆緩衝區中已經沒有資料,傳送完成
    if (!clientHasPendingReplies(c)) {
        c->sentlen = 0;
        // 刪除當前client的可讀事件的監聽
        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);

        /* Close connection after entire reply has been sent. */
        // 如果指定了寫入按成之後立即關閉的標誌,則釋放client
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
            freeClient(c);
            return C_ERR;
        }
    }
    return C_OK;
}

這個函式實際上是對write()函式的封裝,將靜態回覆緩衝區buf或回覆連結串列reply中的資料迴圈寫到檔案描述符fd中。如果寫完了,則將當前客戶端的AE_WRITABLE事件刪除。

至此,命令回覆就執行完畢。

3.3 伺服器連線應答函式

我們在上面的分析中,將檔案事件的兩種處理程式,命令接受和命令回覆分別分析了,那麼就乾脆將剩下的伺服器連線應答函式的原始碼也列出來,可以根據Redis 事件處理實現原始碼剖析來一起學習。

連線應答函式分兩種,分別是本地和TCP連線,但是都是對accept()函式的封裝。

#define MAX_ACCEPTS_PER_CALL 1000
// TCP連線處理程式,建立一個client的連線狀態
static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    // 建立一個新的client
    if ((c = createClient(fd)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
    // 如果新的client超過server規定的maxclients的限制,那麼想新client的fd寫入錯誤資訊,關閉該client
    // 先建立client,在進行數量檢查,是因為更好的寫入錯誤資訊
    if (listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";

        /* That's a best effort error message, don't check write errors */
        if (write(c->fd,err,strlen(err)) == -1) {
            /* Nothing to do, Just to avoid the warning... */
        }
        // 更新拒接連線的個數
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
    // 如果伺服器正在以保護模式執行(預設),且沒有設定密碼,也沒有繫結指定的介面,我們就不接受非迴環介面的請求。相反,如果需要,我們會嘗試解釋使用者如何解決問題
    if (server.protected_mode &&
        server.bindaddr_count == 0 &&
        server.requirepass == NULL &&
        !(flags & CLIENT_UNIX_SOCKET) &&
        ip != NULL)
    {
        if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
            char *err =
                "-DENIED Redis is running in protected mode because protected "
                //太長省略。。。
                "the server to start accepting connections from the outside.\r\n";
            if (write(c->fd,err,strlen(err)) == -1) {
                /* Nothing to do, Just to avoid the warning... */
            }
            // 更新拒接連線的個數
            server.stat_rejected_conn++;
            freeClient(c);
            return;
        }
    }

    // 更新連線的數量
    server.stat_numconnections++;
    // 更新client狀態的標誌
    c->flags |= flags;
}

// 建立一個TCP的連線處理程式
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL; //最大一個處理1000次連線
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        // accept接受client的連線
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        // 列印連線的日誌
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        // 建立一個連線狀態的client
        acceptCommonHandler(cfd,0,cip);
    }
}

// 建立一個本地連線處理程式
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cfd, max = MAX_ACCEPTS_PER_CALL;
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        // accept接受client的連線
        cfd = anetUnixAccept(server.neterr, fd);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket);
        // 建立一個本地連線狀態的client
        acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL);
    }
}

4. Redis通訊協議分析

4.1 協議的目標:

  • 易於實現
  • 可以高效地被計算機分析(parse)
  • 可以很容易地被人類讀懂

4.2 協議的一般形式

*<引數數量> CR LF
$<引數 1 的位元組數量> CR LF
<引數 1 的資料> CR LF
...
$<引數 N 的位元組數量> CR LF
<引數 N 的資料> CR LF
//命令本身會被當做一個引數來發送

之前在命令接收我們已經分析過協議了,這了就不在仔細分析了。

4.3 回覆的型別

Redis 命令會返回多種不同型別的回覆。

通過檢查伺服器發回資料的第一個位元組,可以確定這個回覆是什麼型別:

  • 狀態回覆(status reply)的第一個位元組是 "+"
  • 錯誤回覆(error reply)的第一個位元組是 "-"
  • 整數回覆(integer reply)的第一個位元組是 ":"
  • 批量回復(bulk reply)的第一個位元組是 "$"
  • 多條批量回復(multi bulk reply)的第一個位元組是 "*"

我們用Telnet連線伺服器,來看看這些回覆的型別:

➜  ~ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
GET key                     //傳送 GET key 命令
$5                         //批量回復型別
value
EXISTS key                  //傳送 EXISTS key 命令
:1                          //整數回覆型別
SS                          //傳送 SS 命令
-ERR unknown command 'SS'   //錯誤回覆型別
SET key hello               //傳送 SET key hello 命令
+OK                         //狀態回覆型別
SMEMBERS set                //傳送 SMEMBERS set 命令
*2                          //多條批量回復型別
$2
m1
$2
m2

5. CLIENT 命令的實現

關於CLIENT的命令,Redis 3.2.8一共有6條,分別是:redis 網路連結庫的原始碼詳細註釋

CLIENT KILL [ip:port] [ID client-id] [TYPE normal|master|slave|pubsub] [ADDR ip:port] [SKIPME yes/no] 
CLIENT GETNAME
CLIENT LIST
CLIENT PAUSE timeout 
CLIENT REPLY ON|OFF|SKIP 
CLIENT SETNAME connection-name 

直接結合原始碼和操作檢視實現吧。CLIENT 命令的實現的原始碼如下:

// client 命令的實現
void clientCommand(client *c) {
    listNode *ln;
    listIter li;
    client *client;

    //  CLIENT LIST 的實現
    if (!strcasecmp(c->argv[1]->ptr,"list") && c->argc == 2) {
        /* CLIENT LIST */
        // 獲取所有的client資訊
        sds o = getAllClientsInfoString();
        // 新增到到輸入緩衝區中
        addReplyBulkCBuffer(c,o,sdslen(o));
        sdsfree(o);
    // CLIENT REPLY ON|OFF|SKIP 命令實現
    } else if (!strcasecmp(c->argv[1]->ptr,"reply") && c->argc == 3) {
        /* CLIENT REPLY ON|OFF|SKIP */
        // 如果是 ON
        if (!strcasecmp(c->argv[2]->ptr,"on")) {
            // 取消 off 和 skip 的標誌
            c->flags &= ~(CLIENT_REPLY_SKIP|CLIENT_REPLY_OFF);
            // 回覆 +OK
            addReply(c,shared.ok);
        // 如果是 OFF
        } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
            // 開啟 OFF標誌
            c->flags |= CLIENT_REPLY_OFF;
        // 如果是 SKIP
        } else if (!strcasecmp(c->argv[2]->ptr,"skip")) {
            // 沒有設定 OFF 則設定 SKIP 標誌
            if (!(c->flags & CLIENT_REPLY_OFF))
                c->flags |= CLIENT_REPLY_SKIP_NEXT;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    //  CLIENT KILL [ip:port] [ID client-id] [TYPE normal | master | slave | pubsub] [ADDR ip:port] [SKIPME yes / no]
    } else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
        /* CLIENT KILL <ip:port>
         * CLIENT KILL <option> [value] ... <option> [value] */
        char *addr = NULL;
        int type = -1;
        uint64_t id = 0;
        int skipme = 1;
        int killed = 0, close_this_client = 0;

        // CLIENT KILL addr:port只能通過地址殺死client,舊版本相容
        if (c->argc == 3) {
            /* Old style syntax: CLIENT KILL <addr> */
            addr = c->argv[2]->ptr;
            skipme = 0; /* With the old form, you can kill yourself. */
        // 新版本可以根據[ID client-id] [master|normal|slave|pubsub] [ADDR ip:port] [SKIPME yes/no]殺死client
        } else if (c->argc > 3) {
            int i = 2; /* Next option index. */

            /* New style syntax: parse options. */
            // 解析語法
            while(i < c->argc) {
                int moreargs = c->argc > i+1;

                // CLIENT KILL [ID client-id]
                if (!strcasecmp(c->argv[i]->ptr,"id") && moreargs) {
                    long long tmp;
                    // 獲取client的ID
                    if (getLongLongFromObjectOrReply(c,c->argv[i+1],&tmp,NULL)
                        != C_OK) return;
                    id = tmp;
                // CLIENT KILL TYPE type, 這裡的 type 可以是 [master|normal|slave|pubsub]
                } else if (!strcasecmp(c->argv[i]->ptr,"type") && moreargs) {
                    // 獲取client的型別,[master|normal|slave|pubsub]四種之一
                    type = getClientTypeByName(c->argv[i+1]->ptr);
                    if (type == -1) {
                        addReplyErrorFormat(c,"Unknown client type '%s'",
                            (char*) c->argv[i+1]->ptr);
                        return;
                    }
                // CLIENT KILL [ADDR ip:port]
                } else if (!strcasecmp(c->argv[i]->ptr,"addr") && moreargs) {
                    // 獲取ip:port
                    addr = c->argv[i+1]->ptr;
                // CLIENT KILL [SKIPME yes/no]
                } else if (!strcasecmp(c->argv[i]->ptr,"skipme") && morea