1. 程式人生 > >系統技術非業餘研究 » 未公開的gen_tcp:unrecv以及接收緩衝區行為分析

系統技術非業餘研究 » 未公開的gen_tcp:unrecv以及接收緩衝區行為分析

gen_tcp:unrecv是個未公開的函式,作用是往tcp的接收緩衝區裡面填入指定的資料。別看這小小的函式,用起來很舒服的。
我們先看下它的程式碼實現,Erlang程式碼部分:

%%gen_tcp.erl:L299
unrecv(S, Data) when is_port(S) ->
    case inet_db:lookup_socket(S) of
        {ok, Mod} ->
            Mod:unrecv(S, Data);
        Error ->
            Error
    end.
%%inet_tcp.erl:L58 
unrecv(Socket, Data) -> prim_inet:unrecv(Socket, Data).

%%prim_inet.erl:L983
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                              
%%                                                                                                                          
%% UNRECV(insock(), data) -> ok | {error, Reason}                                                                           
%%                                                                                                                          
%%                                                                                                                          
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                              
unrecv(S, Data) ->
    case ctl_cmd(S, ?TCP_REQ_UNRECV, Data) of
        {ok, _} -> ok;
        Error  -> Error
    end.

執行期c程式碼部分:

//inet_drv.c:L8123
/* TCP requests from Erlang */
static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len,
                        char** rbuf, int rsize)
{
...
    case TCP_REQ_UNRECV: {
        DEBUGF(("tcp_inet_ctl(%ld): UNRECV\r\n", (long)desc->inet.port));
        if (!IS_CONNECTED(INETP(desc)))
            return ctl_error(ENOTCONN, rbuf, rsize);
        tcp_push_buffer(desc, buf, len);
        if (desc->inet.active)
            tcp_deliver(desc, 0);
        return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
    }
...
}

static int tcp_push_buffer(tcp_descriptor* desc, char* buf, int len)
{
    ErlDrvBinary* bin;

    if (desc->i_buf == NULL) {
        bin = alloc_buffer(len);
        sys_memcpy(bin->orig_bytes, buf, len);
        desc->i_buf = bin;
        desc->i_bufsz = len;
        desc->i_ptr_start = desc->i_buf->orig_bytes;
        desc->i_ptr = desc->i_ptr_start + len;
    }
    else {
        char* start =  desc->i_buf->orig_bytes;
        int sz_before = desc->i_ptr_start - start;
        int sz_filled = desc->i_ptr - desc->i_ptr_start;

        if (len <= sz_before) {
            sys_memcpy(desc->i_ptr_start - len, buf, len);
            desc->i_ptr_start -= len;
        }
        else {
            bin = alloc_buffer(desc->i_bufsz+len);
            sys_memcpy(bin->orig_bytes, buf, len);
            sys_memcpy(bin->orig_bytes+len, desc->i_ptr_start, sz_filled);
            free_buffer(desc->i_buf);
            desc->i_bufsz += len;
            desc->i_buf = bin;
            desc->i_ptr_start = bin->orig_bytes;
            desc->i_ptr = desc->i_ptr_start + sz_filled + len;
        }
    }
    desc->i_remain = 0;
    return 0;
}

實現上很簡單,就是透過tcp ctl命令往驅動接收緩衝區裡面填資料。
但是什麼是gen_tcp接收緩衝區, 它的大小是多大呢?
在回答這個問題之前,我們先看下inet:setopts文件,參見這裡

setopts(Socket, Options) -> ok | {error, posix()}

Types:

Socket = term()
Options = [{Opt, Val} | {raw, Protocol, Option, ValueBin}]
Protocol = integer()
OptionNum = integer()
ValueBin = binary()
Opt, Val — see below
Sets one or more options for a socket. The following options are available:

{active, true | false | once}
If the value is true, which is the default, everything received from the socket will be sent as messages to the receiving process. If the value is false (passive mode), the process must explicitly receive incoming data by calling gen_tcp:recv/2,3 or gen_udp:recv/2,3 (depending on the type of socket).

If the value is once ({active, once}), one data message from the socket will be sent to the process. To receive one more message, setopts/2 must be called again with the {active, once} option.

When using {active, once}, the socket changes behaviour automatically when data is received. This can sometimes be confusing in combination with connection oriented sockets (i.e. gen_tcp) as a socket with {active, false} behaviour reports closing differently than a socket with {active, true} behaviour. To make programming easier, a socket where the peer closed and this was detected while in {active, false} mode, will still generate the message {tcp_closed,Socket} when set to {active, once} or {active, true} mode. It is therefore safe to assume that the message {tcp_closed,Socket}, possibly followed by socket port termination (depending on the exit_on_close option) will eventually appear when a socket changes back and forth between {active, true} and {active, false} mode. However, when peer closing is detected is all up to the underlying TCP/IP stack and protocol.

Note that {active,true} mode provides no flow control; a fast sender could easily overflow the receiver with incoming messages. Use active mode only if your high-level protocol provides its own flow control (for instance, acknowledging received messages) or the amount of data exchanged is small. {active,false} mode or use of the {active, once} mode provides flow control; the other side will not be able send faster than the receiver can read.
{packet, PacketType}(TCP/IP sockets)
Defines the type of packets to use for a socket. The following values are valid:

raw | 0
No packaging is done.

1 | 2 | 4
Packets consist of a header specifying the number of bytes in the packet, followed by that number of bytes. The length of header can be one, two, or four bytes; containing an unsigned integer in big-endian byte order. Each send operation will generate the header, and the header will be stripped off on each receive operation.

In current implementation the 4-byte header is limited to 2Gb.

asn1 | cdr | sunrm | fcgi | tpkt | line
These packet types only have effect on receiving. When sending a packet, it is the responsibility of the application to supply a correct header. On receiving, however, there will be one message sent to the controlling process for each complete packet received, and, similarly, each call to gen_tcp:recv/2,3 returns one complete packet. The header is not stripped off.

The meanings of the packet types are as follows:
asn1 – ASN.1 BER,
sunrm – Sun’s RPC encoding,
cdr – CORBA (GIOP 1.1),
fcgi – Fast CGI,
tpkt – TPKT format [RFC1006],
line – Line mode, a packet is a line terminated with newline, lines longer than the receive buffer are truncated.

http | http_bin
The Hypertext Transfer Protocol. The packets are returned with the format according to HttpPacket described in erlang:decode_packet/3. A socket in passive mode will return {ok, HttpPacket} from gen_tcp:recv while an active socket will send messages like {http, Socket, HttpPacket}.

Note that the packet type httph is not needed when reading from a socket.

{packet_size, Integer}(TCP/IP sockets)
Sets the max allowed length of the packet body. If the packet header indicates that the length of the packet is longer than the max allowed length, the packet is considered invalid. The same happens if the packet header is too big for the socket receive buffer.

{recbuf, Integer}
Gives the size of the receive buffer to use for the socket.

我把和接收緩衝區大小有關的引數都列出來了,結合inet_drv.c的程式碼:

#define INET_DEF_BUFFER     1460        /* default buffer size */
#define INET_MIN_BUFFER     1           /* internal min buffer */

#define TCP_MAX_PACKET_SIZE 0x4000000  /* 64 M */


/* LOPT is local options */
#define INET_LOPT_BUFFER      20  /* min buffer size hint */

typedef struct {
...
       unsigned int psize;         /* max packet size (TCP only?) */
...
       int   bufsz;                /* minimum buffer constraint */
...
} inet_descriptor;

typedef struct {
    inet_descriptor inet;       /* common data structure (DON'T MOVE) */
  ...
    int   i_bufsz;              /* current input buffer size (<= bufsz) */
    ErlDrvBinary* i_buf;        /* current binary buffer */
    char*         i_ptr;        /* current pos in buf */
    char*         i_ptr_start;  /* packet start pos in buf */
    int           i_remain;     /* remaining chars to read */
   ...
} tcp_descriptor;


static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_LOPT_BUFFER:
            DEBUGF(("inet_set_opts(%ld): s=%d, BUFFER=%d\r\n",
                    (long)desc->port, desc->s, ival));
            if (ival < INET_MIN_BUFFER) ival = INET_MIN_BUFFER;
            desc->bufsz = ival;
            continue;
...
	if (type == SO_RCVBUF) {
            /* make sure we have desc->bufsz >= SO_RCVBUF */
            if (ival > desc->bufsz)
                desc->bufsz = ival;
        }
...
        case INET_LOPT_PACKET_SIZE:
            DEBUGF(("inet_set_opts(%ld): s=%d, PACKET_SIZE=%d\r\n",
                    (long)desc->port, desc->s, ival));
            desc->psize = (unsigned int)ival;
            continue;
...
}

static int inet_fill_opts(inet_descriptor* desc,
                          char* buf, int len, char** dest, int destlen)
{
...
case INET_OPT_RCVBUF:
            type = SO_RCVBUF;
            break;
...
}

從文件和程式碼結合中可以看出,一個TCP報文的最大大小由{packet_size, Integer}決定,最大不超過64M. 每個TCP報文由一定的頭,比如(1 | 2 | 4)位元組的報文長度,由{packet, PacketType}決定。

那麼接收緩衝區的預設大小多大呢?
未公開的{buffer,Integer}選項:

enc_opt(buffer)          -> ?INET_LOPT_BUFFER;

以及{recbuf, Integer},接收緩衝區的預設大小取他們中間最大的一個,在預設情況下是1460,一個TCP段的大小。

那麼接收快取區的大小會如何變化呢?
我們知道gen_tcp由主動和被動模式,在主動模式下報文會根據packettype來準備好報文體,扔給宿主程序,在這種情況下,如果接收緩衝區比報文的長度小的話,那麼就把緩衝區擴充套件到該報文的大小,然後等待所有的報文體都接收完畢就好。

那麼什麼時候,把緩衝區主動搞小呢?

/*                                                                                                                          
** Deliver all packets ready                                                                                                
** if len == 0 then check start with a check for ready packet                                                               
*/
static int tcp_deliver(tcp_descriptor* desc, int len)
{
...
 while (len > 0) {
        int code = 0;

        inet_input_count(INETP(desc), len);

        /* deliver binary? */
        if (len*4 >= desc->i_buf->orig_size*3) { /* >=75% */
            /* something after? */
            if (desc->i_ptr_start + len == desc->i_ptr) { /* no */
                code = tcp_reply_binary_data(desc, desc->i_buf,
                                             (desc->i_ptr_start -
                                              desc->i_buf->orig_bytes),
                                             len);
                tcp_clear_input(desc);
            }
     else { /* move trail to beginning of a new buffer */
                ErlDrvBinary* bin;
                char* ptr_end = desc->i_ptr_start + len;
                int sz = desc->i_ptr - ptr_end;

                bin = alloc_buffer(desc->i_bufsz);
                memcpy(bin->orig_bytes, ptr_end, sz);

                code = tcp_reply_binary_data(desc, desc->i_buf,
                                             (desc->i_ptr_start-
                                              desc->i_buf->orig_bytes),
                                             len);
                free_buffer(desc->i_buf);
                desc->i_buf = bin;
                desc->i_ptr_start = desc->i_buf->orig_bytes;
                desc->i_ptr = desc->i_ptr_start + sz;
                desc->i_remain = 0;
            }
        }
   else {
            code = tcp_reply_data(desc, desc->i_ptr_start, len);
            /* XXX The buffer gets thrown away on error  (code < 0)    */
            /* Windows needs workaround for this in tcp_inet_event...  */
            desc->i_ptr_start += len;
            if (desc->i_ptr_start == desc->i_ptr)
                tcp_clear_input(desc);
            else
                desc->i_remain = 0;

        }

當接收到的報文提交出去的時候,長度大於75%緩衝區的長度或者緩衝區剛好只有這個報文的時候,就把緩衝區釋放掉。

被動模式則由使用者指定要接收多少資料,如果指定0,則表明有多少要多少。那麼這種情況下,緩衝區的長度會取報文的長度和使用者要求的長度中的最大值。

根據分析,我們知道接收緩衝區佔用的記憶體比較複雜,我們如果在程式中要精確的控制記憶體,需要調整上面的引數。

接下來我們看下分析下unrecv的用途,首先我們參考下misultin小型的erlang web伺服器,專案在 這裡
這個專案就很靈活的使用了packet型別和active模式的結合來利用erts已有的協議分析,我簡單的演示如下:
grep下程式碼,可以看到:

$ grep -rin setopts .
./misultin_socket.erl:106:	inet:setopts(Sock, [{active, once}]),
./misultin_socket.erl:130:	inet:setopts(Sock, [{active, once}]),
./misultin_socket.erl:194:      inet:setopts(Sock, [{packet, raw}, {active, false}]),
./misultin_socket.erl:202:	inet:setopts(Sock, [{packet, http}]),
$ grep -rin gen_tcp:recv  .
./misultin_socket.erl:195:      case gen_tcp:recv(Sock, Len, RecvTimeout) of

各位可以參考下它的實現,很值得學習!

我還是舉個例子來演示unrecv的使用:
比如某個報文是用 [報文體]+ 回車行+,類似底下這樣的報文

[demo]
line1
line2
line3

由於事先不知道報文的準確長度,我們就設成{packet,raw}, {active, false},
先讀入一段報文進行分析,分析出報文的‘[’和‘]’,就知道報文的大小,剩下的報文因為inet_drv支援行分割的報文,我們就無需自己動手分析了。
但是我們預讀取的資料可能會超出報文的大小,部分回車行分割的資料已經被讀取出來了,利用{packet, line}來分析就不正確了。所以我們用unrecv把這段資料還回去,就可以了,方便之門就打開了。

總結:分析了gen_tcp接收緩衝區的工作原理以及影響大小的因素,還順便介紹了unrecv的用途。

gen_tcp以及傳送緩衝區相關的文章請參見這裡

祝玩得開心!

Post Footer automatically generated by wp-posturl plugin for wordpress.

相關推薦

系統技術業餘研究 » 公開gen_tcp:unrecv以及接收緩衝區行為分析

gen_tcp:unrecv是個未公開的函式,作用是往tcp的接收緩衝區裡面填入指定的資料。別看這小小的函式,用起來很舒服的。 我們先看下它的程式碼實現,Erlang程式碼部分: %%gen_tcp.erl:L299 unrecv(S, Data) when is_port(S) ->

系統技術業餘研究 » 公開的erlang ports trace

erlang的trace機制非常強大, 在dbg, ttb模組的配合下, 可以非常清楚的瞭解系統的運作, 排錯, 和調優. 但是有個對於做網路程式重要的功能被忽視了, 沒有寫到文件. 那就是trace ports訊息. 我們的gen_tcp,file都是port實現的, 那麼瞭解這麼模組的運作其實

系統技術業餘研究 » 公開的erlang:port_s/get_data

我們通常在使用port的時候, 需要把他同其他的上下文關聯起來, 以便在port給我們發生資料的時候, 我們能根據繫結的上下文, 知道如何處理資料. 有2種辦法: 1. 用ets來儲存{Port, Ctx},這個比較慢, 每次都要查表. 2. 用Port本身的空間來儲存Ctx. erlang:po

公開gen_tcp:unrecv以及接收緩衝區行為分析

gen_tcp:unrecv是個未公開的函式,作用是往tcp的接收緩衝區裡面填入指定的資料。別看這小小的函式,用起來很舒服的。 我們先看下它的程式碼實現,Erlang程式碼部分: %%gen_tcp.erl:L299 unrecv(S, Data) when is_port(S) ->

系統技術業餘研究 » Erlang程式碼反編譯以及檢視彙編碼

Erlang的程式碼是先翻譯成abstract_code,再到目的碼的,如果有符號資訊很容易恢復原始碼,通常我們部署系統的時候需要把符號資訊去掉,reltool就可以幹這個事情! 我們演示下: $ cat server.erl -module(server). -compile(export

系統技術業餘研究 » Flashcache使用的誤區以及解決方案

但是flashcache在使用中很多人會有個誤區,導致效能很低。首先我們看下flashcache的設計背景和適用場景: Introduction : ============ Flashcache is a write back block cache Linux kernel module.

系統技術業餘研究 » Erlang epmd的角色以及使用

很多同學誤會了epmd的作用,認為epmd就是erlang叢集的協議,我來澄清下: Epmd是Erlang Port Mapper Daemon的縮寫,在Erlang叢集中的作用相當於dns的作用,提供節點名稱到埠的查詢服務,epmd繫結在總所周知的4369埠上。 epmd文件:這裡 epmd協議

系統技術業餘研究 » erts_ use_sender_punish公開的特性

我們知道erlang的VM排程是根據reds的,每個程序初始的時候分配2000個reds, 一旦這個reds用完了,程序就被掛起,放到佇列去排隊,等待下一次排程。OTP R13B04下一個程序給另外一個程序傳送訊息,是需要扣除傳送者一定的reds, 這樣看起來更公平。因為古語說殺敵一千, 自損八百

系統技術業餘研究 » blktrace公開選項網路儲存擷取資料

我們透過blktrace來觀察io行為的時候,第一件事情需要選擇目標裝置,以便分析該裝置的io行為。具體使用可以參考我之前寫的幾篇:這裡 這裡 這裡 blktrace分為核心部分和應用部分,應用部分收到我們要捕捉的裝置名單,傳給核心。核心分佈在block層的各個tracepoint就會開始工作,把

系統技術業餘研究 » Erlang叢集公開特性:IP網段限制

Erlang叢集二個節點之間的通訊是通過一個tcp長連線進行的,而且是全聯通的,一旦cookie論證通過了,任何一個節點就獲得全叢集的訪問權,可以參考Erlang分佈的核心技術淺析 。erlang的這個授權模式特定搞的這麼簡單,但是在實際使用中還是有安全性的問題。我們退而求其次,來個IP網段限制,

系統技術業餘研究 » gen_tcp呼叫程序收到{empty_out_q, Port}訊息奇怪行為分析

今天有同學在gmail裡面問了一個Erlang的問題,問題描述的非常好, 如下: 問題的背景是: 1、我開發了一個服務端程式,接收客戶端的連線。同一時刻會有多個客戶端來連線,連線後,接收客戶端請求後,再發送響應訊息,然後客戶端主動斷連。

系統技術業餘研究 » gen_tcp接受連結時enfile的問題分析及解決

最近我們為了安全方面的原因,在RDS伺服器上做了個代理程式把普通的MYSQL TCP連線變成了SSL連結,在測試的時候,皓庭同學發現Tsung發起了幾千個TCP連結後Erlang做的SSL PROXY老是報告gen_tcp:accept返回{error, enfile}錯誤。針對這個問題,我展開了

系統技術業餘研究 » gen_tcp:send的深度解刨和使用指南(初稿)

在大家的印象中, gen_tcp:send是個很樸素的函式, 一呼叫資料就喀嚓喀嚓到了對端. 這是個很大的誤解, Erlang的otp文件寫的很不清楚. 而且這個功能對於大部分的網路程式是至關重要的, 它的使用對否極大了影響了應用的效能. 我聽到很多同學在抱怨erlang的效能低或者出了很奇怪的問

系統技術業餘研究 » gen_tcp容易誤用的一點解釋

前天有同學在玩erlang gen_tcp的時候碰到了點小麻煩,描述如下: 比如說連線到baidu.com,發個http請求,然後馬上接收資料,發現接收出錯,wireshark抓包發現數據都有往返傳送,比較鬱悶。 我把問題演示下: $ erl Erlang R14B03 (erts-5.8

系統技術業餘研究 » gen_tcp傳送程序被掛起起因分析及對策

最近有同學在gmail上問關於gen_tcp傳送程序被掛起的問題,問題描述的非常好,見底下: 第一個問題是關於port_command和gen_tcp:send的。從專案上線至今,我在tcp傳送的地方遇到過兩次問題,都跟port_command有關係。 起初程式的效能不好,我從各方面嘗試分析和優化

系統技術業餘研究 » gen_tcp接收緩衝區易混淆概念糾正

Erlang的每個TCP網路連結是由相應的gen_tcp物件來表示的,說白了就是個port, 實現Erlang網路相關的邏輯,其實現程式碼位於erts/emulator/drivers/common/inet_drv.c 參照inet:setopts文件,它有三個buffer相關的選項,非常讓人

系統技術業餘研究 » gen_tcp如何限制封包大小

我們在做tcp伺服器的時候,通常會從安全考慮,限制封包的大小,預防被無端攻擊或者避免極端的請求對業務造成損害。 我們的tcp伺服器通常是erlang做的,那麼就涉及到gen_tcp如何限制封包的大小. gen_tcp對封包的獲取有2種方式: 1. {active, false} 封包透過gen_

系統技術業餘研究 » Erlang gen_tcp相關問題彙編索引

gen_tcp是erlang做網路應用最核心的一個模組,實踐中使用起來會有很多問題,我把團隊和我自己過去碰到的問題彙編下,方便大家對症下藥. 以下是gen_tcp,tcp,port相關的博文: 待續,歡迎補充! 祝玩得開心! Post Footer automatically generate

系統技術業餘研究 » gen_tcp連線半關閉問題

很久之前我發在javaeye論壇上,預防丟了抄過來: 原文:http://erlang.group.iteye.com/group/wiki/1422-gen_tcp-half-closed 當tcp對端呼叫shutdown(RD/WR) 時候, 宿主程序預設將收到{tcp_closed, Soc

系統技術業餘研究 » gen_tcp傳送緩衝區以及水位線問題分析

前段時間有同學在線上問了個問題: 伺服器端我是這樣設的:gen_tcp:listen(8000, [{active, false}, {recbuf,1}, {buffer,1}]). 客戶端是這樣設的:gen_tcp:connect(“localhost”, 8000, [{active, f