gen_tcp呼叫程序收到{empty_out_q, Port}訊息奇怪行為分析
阿新 • • 發佈:2018-12-31
如果超過5秒,都沒有收到{empty_out_q, Port}訊息,那麼就看下目前的暫存的資料的位元組數, 並且和最初的暫存的資料的位元組數比較,如果一直沒變的話,那麼說明由於種種原因,把這些資料傳送出去比較沒希望,那麼他就果斷的繼續下一步動作。如果位元組數在變少的話,那麼就繼續等。
現在問題來了。shutdown 在發現暫存的資料沒有希望發出去的時候,選擇不作為,那麼作為後遺症,caller被登記在通知名單裡面。過一段時間,要不資料真的被髮出去了,要不就是發生以下事件: 1. socket發現讀錯誤。2. socket發現寫錯誤。 3. 對端關閉。 4. 宿主程序退出,socket作為一個port被強行關閉,這時候需要清空傳送緩衝區,這時候會同時給caller傳送{empty_out_q, Port}訊息。
有圖有真相,我們讓程式碼說話。
參看程式碼: otp_src_R14B03/erts/emulator/drivers/common/inet_drv.c:
// L7100 這個函式負責給呼叫者傳送訊息empty_out_q static void send_empty_out_q_msgs(inet_descriptor* desc) { ErlDrvTermData msg[6]; int msg_len = 0; if(NO_SUBSCRIBERS(&desc->empty_out_q_subs)) return; msg_len = LOAD_ATOM(msg, msg_len, am_empty_out_q); msg_len = LOAD_PORT(msg, msg_len, desc->dport); msg_len = LOAD_TUPLE(msg, msg_len, 2); ASSERT(msg_len == sizeof(msg)/sizeof(*msg)); send_to_subscribers(desc->port, &desc->empty_out_q_subs, 1, msg, msg_len); } //那麼誰會呼叫他呢: tcp_clear_output和tcp_inet_output. static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) { ... else if (IS_CONNECTED(INETP(desc))) { for (;;) { int vsize; int n; SysIOVec* iov; if ((iov = driver_peekq(ix, &vsize)) == NULL) { sock_select(INETP(desc), FD_WRITE, 0); send_empty_out_q_msgs(INETP(desc)); goto done; } vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize; DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\r\n", (long)desc->inet.port, desc->inet.s, vsize)); if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, iov, vsize, &n, 0))) { if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) { DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\r\n", (long)desc->inet.port, vsize, sock_errno())); ret = tcp_send_error(desc, sock_errno()); goto done; } #ifdef __WIN32__ desc->inet.send_would_block = 1; #endif goto done; ... } //以及: /* clear QUEUED output */ static void tcp_clear_output(tcp_descriptor* desc) { ErlDrvPort ix = desc->inet.port; int qsz = driver_sizeq(ix); driver_deq(ix, qsz); send_empty_out_q_msgs(INETP(desc)); } //誰呼叫 tcp_clear_output呢? //tcp_inet_flush, //tcp_recv_closed, //tcp_recv_error, //tcp_send_error, //特別是tcp_recv_closed函式 /* The peer socket has closed, cleanup and send event */ static int tcp_recv_closed(tcp_descriptor* desc) { #ifdef DEBUG long port = (long) desc->inet.port; /* Used after driver_exit() */ #endif DEBUGF(("tcp_recv_closed(%ld): s=%d, in %s, line %d\r\n", port, desc->inet.s, __FILE__, __LINE__)); if (IS_BUSY(INETP(desc))) { /* A send is blocked */ desc->inet.caller = desc->inet.busy_caller; tcp_clear_output(desc); if (desc->busy_on_send) { driver_cancel_timer(desc->inet.port); desc->busy_on_send = 0; DEBUGF(("tcp_recv_closed(%ld): busy on send\r\n", port)); } desc->inet.state &= ~INET_F_BUSY; set_busy_port(desc->inet.port, 0); inet_reply_error_am(INETP(desc), am_closed); DEBUGF(("tcp_recv_closed(%ld): busy reply 'closed'\r\n", port)); } if (!desc->inet.active) { /* We must cancel any timer here ! */ driver_cancel_timer(desc->inet.port); /* passive mode do not terminate port ! */ tcp_clear_input(desc); if (desc->inet.exitf) { tcp_clear_output(desc); desc_close(INETP(desc)); } else { desc_close_read(INETP(desc)); } async_error_am_all(INETP(desc), am_closed); /* next time EXBADSEQ will be delivered */ DEBUGF(("tcp_recv_closed(%ld): passive reply all 'closed'\r\n", port)); } else { tcp_clear_input(desc); tcp_closed_message(desc); if (desc->inet.exitf) { driver_exit(desc->inet.port, 0); } else { desc_close_read(INETP(desc)); } DEBUGF(("tcp_recv_closed(%ld): active close\r\n", port)); } DEBUGF(("tcp_recv_closed(%ld): done\r\n", port)); return -1; } /* 我們看到在: 1. 對端關閉的時候, 2. 而且我端是被動接收, 3. socket開啟這二個選項的時候最容易{exit_on_close, true}, {delay_send,true} 最容易發生上面的現象 */
只有這種情況你的程序會收到該訊息。
從你的描述來看
“同一時刻會有多個客戶端來連線,連線後,接收客戶端請求後,再發送響應訊息,然後客戶端主動斷連。”.
基本上可以推斷是,你用shutdown(write 或者 read_write)後,你的客戶端由於某種原因斷鏈,你的傳送程序收到這樣的訊息。
我們來試驗下我們的猜想:
$ cat test.erl -module(test). -compile(export_all). start() -> start(1234). start(Port) -> register(?MODULE, self()), spawn_link(fun ()-> S= listen(Port), accept(S) end), receive Any -> io:format("~p~n", [Any]) end. %% to stop: test!stop. listen(Port) -> Opts = [{active, false}, binary, {backlog, 256}, {packet, raw}, {reuseaddr, true}], {ok, S} = gen_tcp:listen(Port, Opts), S. accept(S) -> case gen_tcp:accept(S) of {ok, Socket} -> inet:setopts(Socket, [{exit_on_close, true}, {delay_send,true}]), spawn_opt(?MODULE, entry, [Socket], []); Error -> erlang:error(Error) end, accept(S). entry(S)-> loop(S), check_empty_out_q_msg(1000), io:format("bye socket ~p~n",[S]), ok. check_empty_out_q_msg(Timeout)-> receive Any -> io:format("bingo, got message ~p~n", [Any]), Any after Timeout -> cont end. loop(S) -> check_empty_out_q_msg(0), case gen_tcp:recv(S, 0) of {ok, <<"start", _/binary>>}-> io:format("start to reproduce {empty_out_q, Port} message ~n",[]), gen_tcp:send(S, lists:duplicate(1024*1024, "A")), io:format("sent 1M bytes ~n",[]), io:format("sleep 1s ~n",[]), receive Any1 -> Any1 after 1000 -> cont end, loop(S); {ok, _Data} -> io:format("shutdown(write) ~n",[]), {ok, [{send_pend, N}]}=inet:getstat(S, [send_pend]), gen_tcp:shutdown(S, write), {ok, [{send_pend, N1}]}=inet:getstat(S, [send_pend]), io:format("5s send_pend ~w/~w ~n",[N,N1]), loop(S); Error -> io:format("tcp ~p~n", [Error]), Error end. $ cat client.erl -module(client). -export([start/0]). start()-> {ok,Sock} = gen_tcp:connect("localhost", 1234, [{active,false}]), gen_tcp:send(Sock, "start"), io:format("send start~n",[]), gen_tcp:recv(Sock,1024), io:format("drain 1024 bytes~n",[]), gen_tcp:send(Sock, "bang"), io:format("send bang~n",[]), io:format("sleep 10s~n",[]), receive Any -> Any after 10000 -> cont end, gen_tcp:shutdown(Sock, write), io:format("end~n",[]), ok. $ erlc test.erl client.erl
$ erl -noshell -s test start to reproduce {empty_out_q, Port} message sent 1M bytes sleep 1s shutdown(write) 5s send_pend 851968/851968 tcp {error,closed} bingo, got message {empty_out_q,#Port<0.456>} bye socket #Port<0.456>在另外一個終端執行:
$ erl -noshell -s client send start drain 1024 bytes send bang sleep 10s end中間我們可以看到:
$ ss State Recv-Q Send-Q Local Address:Port Peer Address:Port ESTAB 0 130944 127.0.0.1:search-agent 127.0.0.1:43273 ...
傳送佇列大量的資料阻塞。
結果驗證了我們的猜想!這個故事告訴我們:原始碼是最權威的。
祝大家玩得開心!