1. 程式人生 > >系統技術非業餘研究 » gen_tcp接收緩衝區易混淆概念糾正

系統技術非業餘研究 » gen_tcp接收緩衝區易混淆概念糾正

Erlang的每個TCP網路連結是由相應的gen_tcp物件來表示的,說白了就是個port, 實現Erlang網路相關的邏輯,其實現程式碼位於erts/emulator/drivers/common/inet_drv.c

參照inet:setopts文件,它有三個buffer相關的選項,非常讓人費解:

{buffer, Size}
Determines the size of the user-level software buffer used by the driver. Not to be confused with sndbuf and recbuf options which correspond to the kernel socket buffers. It is recommended to have val(buffer) >= max(val(sndbuf),val(recbuf)). In fact, the val(buffer) is automatically set to the above maximum when sndbuf or recbuf values are set.

{recbuf, Size}
Gives the size of the receive buffer to use for the socket.

{sndbuf, Size}
Gives the size of the send buffer to use for the socket.

其中sndbuf, recbuf選項比較好理解, 就是設定gen_tcp所擁有的socket控制代碼的核心的傳送和接收緩衝區,從程式碼可以驗證:

/* inet_drv.c */
#define INET_OPT_SNDBUF     6   /* set send buffer size */
#define INET_OPT_RCVBUF     7   /* set receive buffer size */
static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_OPT_SNDBUF:    type = SO_SNDBUF;
            DEBUGF(("inet_set_opts(%ld): s=%d, SO_SNDBUF=%d\r\n",
                    (long)desc->port, desc->s, ival));
            break;
        case INET_OPT_RCVBUF:    type = SO_RCVBUF;
            DEBUGF(("inet_set_opts(%ld): s=%d, SO_RCVBUF=%d\r\n",
                    (long)desc->port, desc->s, ival));
            break;
...
        res = sock_setopt           (desc->s, proto, type, arg_ptr, arg_sz);
...
}

那buffer是什麼呢,他們三者之間的關係? 從文件的描述來看:
It is recommended to have val(buffer) >= max(val(sndbuf),val(recbuf)). In fact, the val(buffer) is automatically set to the above maximum when sndbuf or recbuf values are set.

再對照原始碼:

/* inet_drv.c */

#define INET_DEF_BUFFER     1460        /* default buffer size */
#define INET_MIN_BUFFER     1           /* internal min buffer */
#define INET_LOPT_BUFFER      20  /* min buffer size hint */

static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
       case INET_LOPT_BUFFER:
            DEBUGF(("inet_set_opts(%ld): s=%d, BUFFER=%d\r\n",
                    (long)desc->port, desc->s, ival));
            if (ival < INET_MIN_BUFFER) ival = INET_MIN_BUFFER;
            desc->bufsz = ival;
            continue;

        DEBUGF(("inet_set_opts(%ld): s=%d returned %d\r\n",
                (long)desc->port, desc->s, res));
        if (type == SO_RCVBUF) {
            /* make sure we have desc->bufsz >= SO_RCVBUF */
            if (ival > desc->bufsz)
                desc->bufsz = ival;
        }
...
}

/* Allocate descriptor */
static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol)
{
...
    desc->bufsz = INET_DEF_BUFFER;
...
}

我們從原始碼看到在實現上inet:setopts的二點要素:
1. make sure we have desc->bufsz >= SO_RCVBUF
2. desc->bufsz min buffer size hint
3. 接收緩衝區預設長度 1460, 剛好是一個mtu長度
4. 最小的緩衝區大小為1
5. bufsz不繼承

但是關係還是沒搞明白。

好吧,通讀inet_drv原始碼,我們可以看出gen_tcp接收包的流程:

1. 當socket上面有資料的時候,epoll會通知到port,最終導致tcp_inet_drv_input被呼叫。
2. tcp_inet_drv_input 發現如果連線已建立,就會呼叫tcp_recv來處理網路封包。
3. tcp_recv在呼叫sock_recv真正準備接收資料包前:
a. 如果發現接收緩衝區是空的話,會分配一個緩衝區。如果包大小已知,緩衝區大小就是包大小,否則的話為desc->bufsz。
b. 如果緩衝區非空,這時候看看是否已經收了一個以上完整的包,如果是就通過tcp_deliver往上層投遞包,投遞後如果緩衝區裡面除了完整包以外,沒有其他資料的話,就會呼叫tcp_clear_input把輸入緩衝區釋放掉。
c. 如果緩衝區非空,而且緩衝區的大小無法容納包的話,就會呼叫tcp_expand_buffer來把緩衝區擴大到包大小。
4. 接收好網路封包後,根據packet型別進行進一步整理,投遞給上層,同時釋放接收快取區。

/* clear CURRENT input buffer */
static void tcp_clear_input(tcp_descriptor* desc)
{
    if (desc->i_buf != NULL)
        free_buffer(desc->i_buf);
    desc->i_buf = NULL;
    desc->i_remain    = 0;
    desc->i_ptr       = NULL;
    desc->i_ptr_start = NULL;
    desc->i_bufsz     = 0;
}
/*                                                                                                                        
** Set new size on buffer, used when packet size is determined                                                            
** and the buffer is to small.                                                                                            
** buffer must have a size of at least len bytes (counting from ptr_start!)                                               
*/
static int tcp_expand_buffer(tcp_descriptor* desc, int len)
{
   ...
    if (desc->i_bufsz >= ulen) /* packet will fit */
        return 0;
    else if (desc->i_buf->orig_size >= ulen) { /* buffer is large enough */
        desc->i_bufsz = ulen;  /* set "virtual" size */
        return 0;
    }

    offs1 = desc->i_ptr_start - desc->i_buf->orig_bytes;
    offs2 = desc->i_ptr - desc->i_ptr_start;

    if ((bin = driver_realloc_binary(desc->i_buf, ulen)) == NULL)
        return -1;

    desc->i_buf = bin;
    desc->i_ptr_start = bin->orig_bytes + offs1;
    desc->i_ptr       = desc->i_ptr_start + offs2;
...
}
/*                                                                                                                        
** Deliver all packets ready                                                                                              
** if len == 0 then check start with a check for ready packet                                                             
*/
static int tcp_deliver(tcp_descriptor* desc, int len)
{
...
    while (len > 0) {
        int code;

        inet_input_count(INETP(desc), len);

        /* deliver binary? */
        if (len*4 >= desc->i_buf->orig_size*3) { /* >=75% */
            code = tcp_reply_binary_data(desc, desc->i_buf,
                                         (desc->i_ptr_start -
                                          desc->i_buf->orig_bytes),
                                         len);
            if (code < 0)
                return code;

            /* something after? */
            if (desc->i_ptr_start + len == desc->i_ptr) { /* no */
                tcp_clear_input(desc);
            }
            else { /* move trail to beginning of a new buffer */
                ErlDrvBinary* bin = alloc_buffer(desc->i_bufsz);
                char* ptr_end = desc->i_ptr_start + len;
                int sz = desc->i_ptr - ptr_end;

                memcpy(bin->orig_bytes, ptr_end, sz);
                free_buffer(desc->i_buf);
                desc->i_buf = bin;
                desc->i_ptr_start = desc->i_buf->orig_bytes;
                desc->i_ptr = desc->i_ptr_start + sz;
                desc->i_remain = 0;
...
}
}

除了上面的邏輯外,這裡需要強調幾點:
1. 預設情況下gen_tcp建立的時候,接收緩衝區是空的。
2. 接收完整的包投遞後,釋放接收緩衝區。
3. 接收緩衝區大小由包的大小決定,如果包未知,由desc->bufsz決定。
4. INET_LOPT_BUFFER僅僅影響接收緩衝區,傳送無需緩衝區,因為傳送的時候,sendv可以直接傳送佇列裡面的資料。
5. INET_LOPT_BUFFER只是給個緩衝區大小的hint, 而非強制。

分析到這裡為止,我們可以把這三個緩衝區的概念搞清楚了。接下來就是如何用好這些緩衝區的實踐了:

1. INET_LOPT_BUFFER由於指示的是inet_drv這個層面接收緩衝區的預設大小,所以這個緩衝區最好是比操作核心SO_RCVBUF指示的接收緩衝區要大。
2. INET_LOPT_BUFFER只是個hint, 在包大小未知的情況下,影響接收緩衝區的大小,而如果要接收的包大於接收緩衝區的時候,就要擴充套件緩衝區,通過realloc來實現的。所以通過統計包的平均大小,設定一個比較合理的hint, 減少expand緩衝區的發生。inets:getstat(Socket, [recv_avg]). 可以幫我們統計到平均包大小。

這裡還需要指出個問題,通過前面的分析,我們知道接收緩衝區不停的分配,釋放,這對記憶體分配器造成很大的壓力。 所以inet_drv實現了一套小型的記憶體分配池。為了減少衝突,每個CPU一個分配池. 每個池維護最近使用的buffer, 達到最快分配到buffer的目的。

參看程式碼如下:

static ErlDrvBinary* alloc_buffer(ErlDrvSizeT minsz)
{
    InetDrvBufStk *bs = get_bufstk();
    if (bs && bs->buf.pos > 0) {
        long size;
        ErlDrvBinary* buf = bs->buf.stk[--bs->buf.pos];
        size = buf->orig_size;
        bs->buf.mem_size -= size;

        if (size >= minsz)
            return buf;

        driver_free_binary(buf);
    }

    return driver_alloc_binary(minsz);
}

static void release_buffer(ErlDrvBinary* buf)
{
...
    bs = get_bufstk();
    if (!bs
        || (bs->buf.mem_size + size > BUFFER_STACK_MAX_MEM_SIZE)
        || (bs->buf.pos >= BUFFER_STACK_SIZE)) {
    free_binary:
        driver_free_binary(buf);
    }
    else {
        bs->buf.mem_size += size;
        bs->buf.stk[bs->buf.pos++] = buf;
    }
...
}

有了高速的記憶體分配器,gen_tcp的接收緩衝區的管理的代價就不算太大。gen_tcp這樣設計接收緩衝區的目的是為了能夠在大量網路連結的情況下,儘可能的節約記憶體,典型的用時間換空間的設計。

小結: 原始碼是最好的答案,文件不是。
祝玩得開心!

Post Footer automatically generated by wp-posturl plugin for wordpress.

相關推薦

系統技術業餘研究 » gen_tcp接收緩衝區混淆概念糾正

Erlang的每個TCP網路連結是由相應的gen_tcp物件來表示的,說白了就是個port, 實現Erlang網路相關的邏輯,其實現程式碼位於erts/emulator/drivers/common/inet_drv.c 參照inet:setopts文件,它有三個buffer相關的選項,非常讓人

系統技術業餘研究 » gen_tcp傳送緩衝區以及水位線問題分析

前段時間有同學在線上問了個問題: 伺服器端我是這樣設的:gen_tcp:listen(8000, [{active, false}, {recbuf,1}, {buffer,1}]). 客戶端是這樣設的:gen_tcp:connect(“localhost”, 8000, [{active, f

系統技術業餘研究 » gen_tcp呼叫程序收到{empty_out_q, Port}訊息奇怪行為分析

今天有同學在gmail裡面問了一個Erlang的問題,問題描述的非常好, 如下: 問題的背景是: 1、我開發了一個服務端程式,接收客戶端的連線。同一時刻會有多個客戶端來連線,連線後,接收客戶端請求後,再發送響應訊息,然後客戶端主動斷連。

系統技術業餘研究 » gen_tcp接受連結時enfile的問題分析及解決

最近我們為了安全方面的原因,在RDS伺服器上做了個代理程式把普通的MYSQL TCP連線變成了SSL連結,在測試的時候,皓庭同學發現Tsung發起了幾千個TCP連結後Erlang做的SSL PROXY老是報告gen_tcp:accept返回{error, enfile}錯誤。針對這個問題,我展開了

系統技術業餘研究 » gen_tcp:send的深度解刨和使用指南(初稿)

在大家的印象中, gen_tcp:send是個很樸素的函式, 一呼叫資料就喀嚓喀嚓到了對端. 這是個很大的誤解, Erlang的otp文件寫的很不清楚. 而且這個功能對於大部分的網路程式是至關重要的, 它的使用對否極大了影響了應用的效能. 我聽到很多同學在抱怨erlang的效能低或者出了很奇怪的問

系統技術業餘研究 » gen_tcp容易誤用的一點解釋

前天有同學在玩erlang gen_tcp的時候碰到了點小麻煩,描述如下: 比如說連線到baidu.com,發個http請求,然後馬上接收資料,發現接收出錯,wireshark抓包發現數據都有往返傳送,比較鬱悶。 我把問題演示下: $ erl Erlang R14B03 (erts-5.8

系統技術業餘研究 » gen_tcp傳送程序被掛起起因分析及對策

最近有同學在gmail上問關於gen_tcp傳送程序被掛起的問題,問題描述的非常好,見底下: 第一個問題是關於port_command和gen_tcp:send的。從專案上線至今,我在tcp傳送的地方遇到過兩次問題,都跟port_command有關係。 起初程式的效能不好,我從各方面嘗試分析和優化

系統技術業餘研究 » gen_tcp如何限制封包大小

我們在做tcp伺服器的時候,通常會從安全考慮,限制封包的大小,預防被無端攻擊或者避免極端的請求對業務造成損害。 我們的tcp伺服器通常是erlang做的,那麼就涉及到gen_tcp如何限制封包的大小. gen_tcp對封包的獲取有2種方式: 1. {active, false} 封包透過gen_

系統技術業餘研究 » gen_tcp連線半關閉問題

很久之前我發在javaeye論壇上,預防丟了抄過來: 原文:http://erlang.group.iteye.com/group/wiki/1422-gen_tcp-half-closed 當tcp對端呼叫shutdown(RD/WR) 時候, 宿主程序預設將收到{tcp_closed, Soc

系統技術業餘研究 » 未公開的gen_tcp:unrecv以及接收緩衝區行為分析

gen_tcp:unrecv是個未公開的函式,作用是往tcp的接收緩衝區裡面填入指定的資料。別看這小小的函式,用起來很舒服的。 我們先看下它的程式碼實現,Erlang程式碼部分: %%gen_tcp.erl:L299 unrecv(S, Data) when is_port(S) ->

系統技術業餘研究 » R14A新增新指令優化Ref訊息的接收

Erlang的慣用法之一就是在訊息匹配的時候,如果需要唯一性,通常會通過make_ref搞個唯一的Ref來作為訊息的一部分來匹配。這個慣用法用在gen_server:call或者demonitor這樣的使用頻度很高的函式裡面。由於erlang的訊息匹配是再訊息佇列裡面挨個遍歷來匹配,特別是訊息佇列

系統技術業餘研究 » Erlang gen_tcp相關問題彙編索引

gen_tcp是erlang做網路應用最核心的一個模組,實踐中使用起來會有很多問題,我把團隊和我自己過去碰到的問題彙編下,方便大家對症下藥. 以下是gen_tcp,tcp,port相關的博文: 待續,歡迎補充! 祝玩得開心! Post Footer automatically generate

系統技術業餘研究

ItPub寫的文章“2017 年度 DB-Engines 資料庫冠軍得主:PostgreSQL 封王!”, 點選 這裡 進一步閱讀 升的最快的幾個資料庫,我簡單的無責任點評: PG資料庫是很老的資料庫,不過這幾年冉冉升起,因為是學院派的,有很好的學術和智力的支援,一直以來在資料庫的體系結構,程式碼

系統技術業餘研究 » MySQL資料庫架構的演化觀察

MySQL資料庫架構的演化觀察 December 14th, 2017 Categories: 資料庫 Tags: mysql

系統技術業餘研究 » inet_dist_connect_options

Erlang 17.5版本引入了inet_dist_{listen,connect}_options,對於結點間的互聯socket可以有更精細的控制,RPC的時候效能可以微調: raimo/inet_tcp_dist-priority-option/OTP-12476: Document ke

系統技術業餘研究 » 推薦工作機會

最後更新時間:2014/11/28 請賜簡歷至:[email protected], 感謝您對加入我們公司有興趣,我們希望能早日和您共事。 以下幾個職位1年內有效,歡迎內部轉崗:
 資深資料工程師 公司:阿里(核心系統資料庫組) 工作地點:杭州(西溪園區) 崗位描述: 分析雲服務產生的海

系統技術業餘研究 » 新的工作和研究方向

和大家更新下: 做了將近8年資料庫後,我的工作和研究方向將會延伸到虛擬化和計算相關的雲服務,希望能夠和大家一起進步,Happy New Year! 預祝大家玩得開心! Post Footer automatically generated by wp-posturl plugin for w

系統技術業餘研究 » 叢集引入inet_dist_{listen,connect}_options更精細引數微調

Erlang 17.5版本引入了inet_dist_{listen,connect}_options,對於結點間的互聯socket可以有更精細的控制,RPC的時候效能可以微調: raimo/inet_tcp_dist-priority-option/OTP-12476: Document ke

系統技術業餘研究 » 2017升的最快的幾個資料庫無責任點評

ItPub寫的文章“2017 年度 DB-Engines 資料庫冠軍得主:PostgreSQL 封王!”, 點選 這裡 進一步閱讀 升的最快的幾個資料庫,我簡單的無責任點評: PG資料庫是很老的資料庫,不過這幾年冉冉升起,因為是學院派的,有很好的學術和智力的支援,一直以來在資料庫的體系結構,程式碼

系統技術業餘研究 » Erlang 17.5引入+hpds命令列控制程序預設字典大小

Erlang 17.5釋出引入控制程序預設字典大小的命令列引數: Erlang/OTP 17.5 has been released Written by Henrik, 01 Apr 2015 Some highlights of the release are: ERTS: Added co