1. 程式人生 > >gen_tcp呼叫程序收到{empty_out_q, Port}訊息奇怪行為分析

gen_tcp呼叫程序收到{empty_out_q, Port}訊息奇怪行為分析

如果超過5秒,都沒有收到{empty_out_q, Port}訊息,那麼就看下目前的暫存的資料的位元組數, 並且和最初的暫存的資料的位元組數比較,如果一直沒變的話,那麼說明由於種種原因,把這些資料傳送出去比較沒希望,那麼他就果斷的繼續下一步動作。如果位元組數在變少的話,那麼就繼續等。

現在問題來了。shutdown 在發現暫存的資料沒有希望發出去的時候,選擇不作為,那麼作為後遺症,caller被登記在通知名單裡面。過一段時間,要不資料真的被髮出去了,要不就是發生以下事件: 1. socket發現讀錯誤。2. socket發現寫錯誤。 3. 對端關閉。 4. 宿主程序退出,socket作為一個port被強行關閉,這時候需要清空傳送緩衝區,這時候會同時給caller傳送{empty_out_q, Port}訊息。

有圖有真相,我們讓程式碼說話。

參看程式碼: otp_src_R14B03/erts/emulator/drivers/common/inet_drv.c:

// L7100 這個函式負責給呼叫者傳送訊息empty_out_q
static void
send_empty_out_q_msgs(inet_descriptor* desc)
{
  ErlDrvTermData msg[6];
  int msg_len = 0;
 
  if(NO_SUBSCRIBERS(&desc->empty_out_q_subs))
    return;
 
  msg_len = LOAD_ATOM(msg, msg_len, am_empty_out_q);
  msg_len = LOAD_PORT(msg, msg_len, desc->dport);
  msg_len = LOAD_TUPLE(msg, msg_len, 2);
 
  ASSERT(msg_len == sizeof(msg)/sizeof(*msg));
 
  send_to_subscribers(desc->port,
                      &desc->empty_out_q_subs,
                      1,
                      msg,
                      msg_len);
}
 
//那麼誰會呼叫他呢: tcp_clear_output和tcp_inet_output.
 
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
{
...
else if (IS_CONNECTED(INETP(desc))) {
        for (;;) {
            int vsize;
            int n;
            SysIOVec* iov;
 
            if ((iov = driver_peekq(ix, &vsize)) == NULL) {
                sock_select(INETP(desc), FD_WRITE, 0);
                send_empty_out_q_msgs(INETP(desc));
                goto done;
            }
            vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize;
            DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\r\n",
                    (long)desc->inet.port, desc->inet.s, vsize));
            if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, iov, vsize, &n, 0))) {
                if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
                    DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\r\n",
                            (long)desc->inet.port, vsize, sock_errno()));
                    ret =  tcp_send_error(desc, sock_errno());
                    goto done;
                }
#ifdef __WIN32__
                desc->inet.send_would_block = 1;
#endif
                goto done;
...
}
 
//以及:
 
/* clear QUEUED output */
static void tcp_clear_output(tcp_descriptor* desc)
{
    ErlDrvPort ix  = desc->inet.port;
    int qsz = driver_sizeq(ix);
 
    driver_deq(ix, qsz);
    send_empty_out_q_msgs(INETP(desc));
}
 
//誰呼叫 tcp_clear_output呢?
//tcp_inet_flush,
//tcp_recv_closed,
//tcp_recv_error,
//tcp_send_error,
 
//特別是tcp_recv_closed函式
 
/* The peer socket has closed, cleanup and send event */
static int tcp_recv_closed(tcp_descriptor* desc)
{
#ifdef DEBUG
    long port = (long) desc->inet.port; /* Used after driver_exit() */
#endif
    DEBUGF(("tcp_recv_closed(%ld): s=%d, in %s, line %d\r\n",
            port, desc->inet.s, __FILE__, __LINE__));
    if (IS_BUSY(INETP(desc))) {
        /* A send is blocked */
        desc->inet.caller = desc->inet.busy_caller;
        tcp_clear_output(desc);
        if (desc->busy_on_send) {
            driver_cancel_timer(desc->inet.port);
            desc->busy_on_send = 0;
            DEBUGF(("tcp_recv_closed(%ld): busy on send\r\n", port));
        }
        desc->inet.state &= ~INET_F_BUSY;
        set_busy_port(desc->inet.port, 0);
        inet_reply_error_am(INETP(desc), am_closed);
        DEBUGF(("tcp_recv_closed(%ld): busy reply 'closed'\r\n", port));
    }
    if (!desc->inet.active) {
        /* We must cancel any timer here ! */
        driver_cancel_timer(desc->inet.port);
        /* passive mode do not terminate port ! */
        tcp_clear_input(desc);
        if (desc->inet.exitf) {
            tcp_clear_output(desc);
            desc_close(INETP(desc));
        } else {
            desc_close_read(INETP(desc));
        }
        async_error_am_all(INETP(desc), am_closed);
        /* next time EXBADSEQ will be delivered  */
        DEBUGF(("tcp_recv_closed(%ld): passive reply all 'closed'\r\n", port));
    } else {
        tcp_clear_input(desc);
        tcp_closed_message(desc);
        if (desc->inet.exitf) {
            driver_exit(desc->inet.port, 0);
        } else {
            desc_close_read(INETP(desc));
        }
        DEBUGF(("tcp_recv_closed(%ld): active close\r\n", port));
    }
    DEBUGF(("tcp_recv_closed(%ld): done\r\n", port));
    return -1;
}
/* 我們看到在:
1. 對端關閉的時候,
2. 而且我端是被動接收,
3. socket開啟這二個選項的時候最容易{exit_on_close, true}, {delay_send,true}
最容易發生上面的現象
*/

只有這種情況你的程序會收到該訊息。

從你的描述來看

“同一時刻會有多個客戶端來連線,連線後,接收客戶端請求後,再發送響應訊息,然後客戶端主動斷連。”.

基本上可以推斷是,你用shutdown(write 或者 read_write)後,你的客戶端由於某種原因斷鏈,你的傳送程序收到這樣的訊息。

我們來試驗下我們的猜想:

$ cat test.erl
-module(test).
-compile(export_all).
 
start() ->
    start(1234).
 
start(Port) ->
    register(?MODULE, self()),
     
    spawn_link(fun ()-> S= listen(Port), accept(S) end),
 
    receive Any -> io:format("~p~n", [Any]) end.  %% to stop: test!stop.
 
listen(Port) ->
    Opts = [{active, false},
            binary,
            {backlog, 256},
            {packet, raw},
            {reuseaddr, true}],
 
    {ok, S} = gen_tcp:listen(Port, Opts),
    S.
 
accept(S) ->
    case gen_tcp:accept(S) of
        {ok, Socket} -> inet:setopts(Socket, [{exit_on_close, true},   
                                              {delay_send,true}]),
                        spawn_opt(?MODULE, entry, [Socket], []);
        Error    -> erlang:error(Error)
    end,
    accept(S).
 
 
entry(S)->
        loop(S),
        check_empty_out_q_msg(1000),
        io:format("bye socket ~p~n",[S]),
        ok.
 
 
check_empty_out_q_msg(Timeout)->
    receive
        Any -> io:format("bingo, got message ~p~n", [Any]), Any
    after Timeout -> cont end.
 
 
 
loop(S) ->
    check_empty_out_q_msg(0),
     
    case gen_tcp:recv(S, 0) of
        {ok, <<"start", _/binary>>}->
                io:format("start to reproduce {empty_out_q, Port} message ~n",[]),
                gen_tcp:send(S, lists:duplicate(1024*1024, "A")),
                io:format("sent 1M bytes ~n",[]),
                 
                io:format("sleep 1s ~n",[]),
                receive Any1 -> Any1 after 1000 -> cont end,
 
                loop(S);
 
        {ok, _Data} ->
            io:format("shutdown(write) ~n",[]),
            {ok, [{send_pend, N}]}=inet:getstat(S, [send_pend]),
            gen_tcp:shutdown(S, write),
            {ok, [{send_pend, N1}]}=inet:getstat(S, [send_pend]),
            io:format("5s send_pend ~w/~w ~n",[N,N1]),
            loop(S);
             
        Error ->
            io:format("tcp ~p~n", [Error]),
            Error
    end.
 
$ cat client.erl
-module(client).
-export([start/0]).
 
start()->
        {ok,Sock} = gen_tcp:connect("localhost", 1234, [{active,false}]),
 
        gen_tcp:send(Sock, "start"),                                    
        io:format("send start~n",[]),
 
        gen_tcp:recv(Sock,1024),
        io:format("drain 1024 bytes~n",[]),                                  
 
        gen_tcp:send(Sock, "bang"), 
        io:format("send bang~n",[]),                                  
 
        io:format("sleep 10s~n",[]),                                  
        receive
                Any -> Any
        after 10000 -> cont end,
 
        gen_tcp:shutdown(Sock, write),
 
        io:format("end~n",[]),                                  
        ok.
 
$ erlc test.erl client.erl

在一個終端執行:
$ erl -noshell -s test
start to reproduce {empty_out_q, Port} message
sent 1M bytes
sleep 1s
shutdown(write)
5s send_pend 851968/851968
tcp {error,closed}
bingo, got message {empty_out_q,#Port<0.456>}
bye socket #Port<0.456>

在另外一個終端執行:
$ erl -noshell -s client
send start
drain 1024 bytes
send bang
sleep 10s
end
中間我們可以看到:
$ ss
State       Recv-Q Send-Q                                                 Local Address:Port                                                     Peer Address:Port  
ESTAB       0      130944                                                     127.0.0.1:search-agent                                                     127.0.0.1:43273  
...

傳送佇列大量的資料阻塞。

結果驗證了我們的猜想!這個故事告訴我們:原始碼是最權威的。

祝大家玩得開心!