1. 程式人生 > >未公開的gen_tcp:unrecv以及接收緩衝區行為分析

未公開的gen_tcp:unrecv以及接收緩衝區行為分析

gen_tcp:unrecv是個未公開的函式,作用是往tcp的接收緩衝區裡面填入指定的資料。別看這小小的函式,用起來很舒服的。

我們先看下它的程式碼實現,Erlang程式碼部分:

%%gen_tcp.erl:L299
unrecv(S, Data) when is_port(S) ->
    case inet_db:lookup_socket(S) of
        {ok, Mod} ->
            Mod:unrecv(S, Data);
        Error ->
            Error
    end.
%%inet_tcp.erl:L58
unrecv(Socket, Data) -> prim_inet:unrecv(Socket, Data).
 
%%prim_inet.erl:L983
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                             
%%                                                                                                                         
%% UNRECV(insock(), data) -> ok | {error, Reason}                                                                          
%%                                                                                                                         
%%                                                                                                                         
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                             
unrecv(S, Data) ->
    case ctl_cmd(S, ?TCP_REQ_UNRECV, Data) of
        {ok, _} -> ok;
        Error  -> Error
    end.

執行期c程式碼部分:

//inet_drv.c:L8123
/* TCP requests from Erlang */
static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len,
                        char** rbuf, int rsize)
{
...
    case TCP_REQ_UNRECV: {
        DEBUGF(("tcp_inet_ctl(%ld): UNRECV\r\n", (long)desc->inet.port));
        if (!IS_CONNECTED(INETP(desc)))
            return ctl_error(ENOTCONN, rbuf, rsize);
        tcp_push_buffer(desc, buf, len);
        if (desc->inet.active)
            tcp_deliver(desc, 0);
        return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
    }
...
}
 
static int tcp_push_buffer(tcp_descriptor* desc, char* buf, int len)
{
    ErlDrvBinary* bin;
 
    if (desc->i_buf == NULL) {
        bin = alloc_buffer(len);
        sys_memcpy(bin->orig_bytes, buf, len);
        desc->i_buf = bin;
        desc->i_bufsz = len;
        desc->i_ptr_start = desc->i_buf->orig_bytes;
        desc->i_ptr = desc->i_ptr_start + len;
    }
    else {
        char* start =  desc->i_buf->orig_bytes;
        int sz_before = desc->i_ptr_start - start;
        int sz_filled = desc->i_ptr - desc->i_ptr_start;
 
        if (len <= sz_before) {
            sys_memcpy(desc->i_ptr_start - len, buf, len);
            desc->i_ptr_start -= len;
        }
        else {
            bin = alloc_buffer(desc->i_bufsz+len);
            sys_memcpy(bin->orig_bytes, buf, len);
            sys_memcpy(bin->orig_bytes+len, desc->i_ptr_start, sz_filled);
            free_buffer(desc->i_buf);
            desc->i_bufsz += len;
            desc->i_buf = bin;
            desc->i_ptr_start = bin->orig_bytes;
            desc->i_ptr = desc->i_ptr_start + sz_filled + len;
        }
    }
    desc->i_remain = 0;
    return 0;
}

實現上很簡單,就是透過tcp ctl命令往驅動接收緩衝區裡面填資料。

但是什麼是gen_tcp接收緩衝區, 它的大小是多大呢?

在回答這個問題之前,我們先看下inet:setopts文件,參見這裡

setopts(Socket, Options) -> ok | {error, posix()}

Types:

Socket = term()

Options = [{Opt, Val} | {raw, Protocol, Option, ValueBin}]

Protocol = integer()

OptionNum = integer()

ValueBin = binary()

Opt, Val — see below

Sets one or more options for a socket. The following options are available:

{active, true | false | once}

If the value is true, which is the default, everything received from the socket will be sent as messages to the receiving process. If the value is false (passive mode), the process must explicitly receive incoming data by calling gen_tcp:recv/2,3 or gen_udp:recv/2,3 (depending on the type of socket).

If the value is once ({active, once}), one data message from the socket will be sent to the process. To receive one more message, setopts/2 must be called again with the {active, once} option.

When using {active, once}, the socket changes behaviour automatically when data is received. This can sometimes be confusing in combination with connection oriented sockets (i.e. gen_tcp) as a socket with {active, false} behaviour reports closing differently than a socket with {active, true} behaviour. To make programming easier, a socket where the peer closed and this was detected while in {active, false} mode, will still generate the message {tcp_closed,Socket} when set to {active, once} or {active, true} mode. It is therefore safe to assume that the message {tcp_closed,Socket}, possibly followed by socket port termination (depending on the exit_on_close option) will eventually appear when a socket changes back and forth between {active, true} and {active, false} mode. However, when peer closing is detected is all up to the underlying TCP/IP stack and protocol.

Note that {active,true} mode provides no flow control; a fast sender could easily overflow the receiver with incoming messages. Use active mode only if your high-level protocol provides its own flow control (for instance, acknowledging received messages) or the amount of data exchanged is small. {active,false} mode or use of the {active, once} mode provides flow control; the other side will not be able send faster than the receiver can read.

{packet, PacketType}(TCP/IP sockets)

Defines the type of packets to use for a socket. The following values are valid:

raw | 0

No packaging is done.

1 | 2 | 4

Packets consist of a header specifying the number of bytes in the packet, followed by that number of bytes. The length of header can be one, two, or four bytes; containing an unsigned integer in big-endian byte order. Each send operation will generate the header, and the header will be stripped off on each receive operation.

In current implementation the 4-byte header is limited to 2Gb.

asn1 | cdr | sunrm | fcgi | tpkt | line

These packet types only have effect on receiving. When sending a packet, it is the responsibility of the application to supply a correct header. On receiving, however, there will be one message sent to the controlling process for each complete packet received, and, similarly, each call to gen_tcp:recv/2,3 returns one complete packet. The header is not stripped off.

The meanings of the packet types are as follows:

asn1 – ASN.1 BER,

sunrm – Sun’s RPC encoding,

cdr – CORBA (GIOP 1.1),

fcgi – Fast CGI,

tpkt – TPKT format [RFC1006],

line – Line mode, a packet is a line terminated with newline, lines longer than the receive buffer are truncated.

http | http_bin

The Hypertext Transfer Protocol. The packets are returned with the format according to HttpPacket described in erlang:decode_packet/3. A socket in passive mode will return {ok, HttpPacket} from gen_tcp:recv while an active socket will send messages like {http, Socket, HttpPacket}.

Note that the packet type httph is not needed when reading from a socket.

{packet_size, Integer}(TCP/IP sockets)

Sets the max allowed length of the packet body. If the packet header indicates that the length of the packet is longer than the max allowed length, the packet is considered invalid. The same happens if the packet header is too big for the socket receive buffer.

{recbuf, Integer}

Gives the size of the receive buffer to use for the socket.

我把和接收緩衝區大小有關的引數都列出來了,結合inet_drv.c的程式碼:

#define INET_DEF_BUFFER     1460        /* default buffer size */
#define INET_MIN_BUFFER     1           /* internal min buffer */
 
#define TCP_MAX_PACKET_SIZE 0x4000000  /* 64 M */
 
 
/* LOPT is local options */
#define INET_LOPT_BUFFER      20  /* min buffer size hint */
 
typedef struct {
...
       unsigned int psize;         /* max packet size (TCP only?) */
...
       int   bufsz;                /* minimum buffer constraint */
...
} inet_descriptor;
 
typedef struct {
    inet_descriptor inet;       /* common data structure (DON'T MOVE) */
  ...
    int   i_bufsz;              /* current input buffer size (<= bufsz) */
    ErlDrvBinary* i_buf;        /* current binary buffer */
    char*         i_ptr;        /* current pos in buf */
    char*         i_ptr_start;  /* packet start pos in buf */
    int           i_remain;     /* remaining chars to read */
   ...
} tcp_descriptor;
 
 
static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_LOPT_BUFFER:
            DEBUGF(("inet_set_opts(%ld): s=%d, BUFFER=%d\r\n",
                    (long)desc->port, desc->s, ival));
            if (ival < INET_MIN_BUFFER) ival = INET_MIN_BUFFER;
            desc->bufsz = ival;
            continue;
...
    if (type == SO_RCVBUF) {
            /* make sure we have desc->bufsz >= SO_RCVBUF */
            if (ival > desc->bufsz)
                desc->bufsz = ival;
        }
...
        case INET_LOPT_PACKET_SIZE:
            DEBUGF(("inet_set_opts(%ld): s=%d, PACKET_SIZE=%d\r\n",
                    (long)desc->port, desc->s, ival));
            desc->psize = (unsigned int)ival;
            continue;
...
}
 
static int inet_fill_opts(inet_descriptor* desc,
                          char* buf, int len, char** dest, int destlen)
{
...
case INET_OPT_RCVBUF:
            type = SO_RCVBUF;
            break;
...
}

從文件和程式碼結合中可以看出,一個TCP報文的最大大小由{packet_size, Integer}決定,最大不超過64M. 每個TCP報文由一定的頭,比如(1 | 2 | 4)位元組的報文長度,由{packet, PacketType}決定。

那麼接收緩衝區的預設大小多大呢?

未公開的{buffer,Integer}選項:

enc_opt(buffer)          -> ?INET_LOPT_BUFFER;

以及{recbuf, Integer},接收緩衝區的預設大小取他們中間最大的一個,在預設情況下是1460,一個TCP段的大小。

那麼接收快取區的大小會如何變化呢?

我們知道gen_tcp由主動和被動模式,在主動模式下報文會根據packettype來準備好報文體,扔給宿主程序,在這種情況下,如果接收緩衝區比報文的長度小的話,那麼就把緩衝區擴充套件到該報文的大小,然後等待所有的報文體都接收完畢就好。

那麼什麼時候,把緩衝區主動搞小呢?

/*                                                                                                                         
** Deliver all packets ready                                                                                               
** if len == 0 then check start with a check for ready packet                                                              
*/
static int tcp_deliver(tcp_descriptor* desc, int len)
{
...
 while (len > 0) {
        int code = 0;
 
        inet_input_count(INETP(desc), len);
 
        /* deliver binary? */
        if (len*4 >= desc->i_buf->orig_size*3) { /* >=75% */
            /* something after? */
            if (desc->i_ptr_start + len == desc->i_ptr) { /* no */
                code = tcp_reply_binary_data(desc, desc->i_buf,
                                             (desc->i_ptr_start -
                                              desc->i_buf->orig_bytes),
                                             len);
                tcp_clear_input(desc);
            }
     else { /* move trail to beginning of a new buffer */
                ErlDrvBinary* bin;
                char* ptr_end = desc->i_ptr_start + len;
                int sz = desc->i_ptr - ptr_end;
 
                bin = alloc_buffer(desc->i_bufsz);
                memcpy(bin->orig_bytes, ptr_end, sz);
 
                code = tcp_reply_binary_data(desc, desc->i_buf,
                                             (desc->i_ptr_start-
                                              desc->i_buf->orig_bytes),
                                             len);
                free_buffer(desc->i_buf);
                desc->i_buf = bin;
                desc->i_ptr_start = desc->i_buf->orig_bytes;
                desc->i_ptr = desc->i_ptr_start + sz;
                desc->i_remain = 0;
            }
        }
   else {
            code = tcp_reply_data(desc, desc->i_ptr_start, len);
            /* XXX The buffer gets thrown away on error  (code < 0)    */
            /* Windows needs workaround for this in tcp_inet_event...  */
            desc->i_ptr_start += len;
            if (desc->i_ptr_start == desc->i_ptr)
                tcp_clear_input(desc);
            else
                desc->i_remain = 0;
 
        }

當接收到的報文提交出去的時候,長度大於75%緩衝區的長度或者緩衝區剛好只有這個報文的時候,就把緩衝區釋放掉。

被動模式則由使用者指定要接收多少資料,如果指定0,則表明有多少要多少。那麼這種情況下,緩衝區的長度會取報文的長度和使用者要求的長度中的最大值。

根據分析,我們知道接收緩衝區佔用的記憶體比較複雜,我們如果在程式中要精確的控制記憶體,需要調整上面的引數。

接下來我們看下分析下unrecv的用途,首先我們參考下misultin小型的erlang web伺服器,專案在 這裡

這個專案就很靈活的使用了packet型別和active模式的結合來利用erts已有的協議分析,我簡單的演示如下:

grep下程式碼,可以看到:

$ grep -rin setopts .
./misultin_socket.erl:106:    inet:setopts(Sock, [{active, once}]),
./misultin_socket.erl:130:    inet:setopts(Sock, [{active, once}]),
./misultin_socket.erl:194:      inet:setopts(Sock, [{packet, raw}, {active, false}]),
./misultin_socket.erl:202:    inet:setopts(Sock, [{packet, http}]),
$ grep -rin gen_tcp:recv  .
./misultin_socket.erl:195:      case gen_tcp:recv(Sock, Len, RecvTimeout) of

各位可以參考下它的實現,很值得學習!

我還是舉個例子來演示unrecv的使用:

比如某個報文是用 [報文體]+ 回車行+,類似底下這樣的報文

[demo]
line1
line2
line3

由於事先不知道報文的準確長度,我們就設成{packet,raw}, {active, false},

先讀入一段報文進行分析,分析出報文的‘[’和‘]’,就知道報文的大小,剩下的報文因為inet_drv支援行分割的報文,我們就無需自己動手分析了。

但是我們預讀取的資料可能會超出報文的大小,部分回車行分割的資料已經被讀取出來了,利用{packet, line}來分析就不正確了。所以我們用unrecv把這段資料還回去,就可以了,方便之門就打開了。

總結:分析了gen_tcp接收緩衝區的工作原理以及影響大小的因素,還順便介紹了unrecv的用途。

gen_tcp以及傳送緩衝區相關的文章請參見這裡 !

祝玩得開心!