1. 程式人生 > >gen_tcp傳送程序被掛起起因分析及對策

gen_tcp傳送程序被掛起起因分析及對策

最近有同學在gmail上問關於gen_tcp傳送程序被掛起的問題,問題描述的非常好,見底下:

第一個問題是關於port_command和gen_tcp:send的。從專案上線至今,我在tcp傳送的地方遇到過兩次問題,都跟port_command有關係。 起初程式的效能不好,我從各方面嘗試分析和優化,還有部分是靠猜測,當初把全服廣播訊息的地方,換成了port_command,當時參考了hotwheels的程式碼和您的一遍相關博文。 根據您的分析,port_command應該比直接用gen_tcp:send高效的,並且沒有阻塞。但是我卻在這個地方遇到了阻塞,具體表現如下(兩次,分別出現在專案不同階段,下面分別描述)
專案上線初期: 當時玩家程序給玩家發訊息用的是gen_tcp:send,廣播程序為了高效率用了port_command。當活躍玩家到了一定數量以後,玩家無法進入遊戲,分析原因,是全域性傳送廣播訊息的程序堵住了,從message_queue_len可以看出來,改為廣播程序給玩家程序發訊息再讓玩家程序給玩家自己發訊息後,狀況排除。 最近一段時間: 這時候玩家程序的tcp傳送資料,已經被我替換成了port_command並運行了一段時間都沒問題。但是一些流量比較大的遊戲服,活躍玩家到了一定數量以後,訊息延遲很大(5-6秒),做任何操作都卡,在出現狀況期間,伺服器CPU、記憶體、負載各項指標並未異常,ssh連到伺服器操作也很正常,沒有任何卡頓現象。同伺服器的其它遊戲服也都正常,但是出問題的遊戲服的整個erlang節點都進入一個“很卡”的狀態,體現在我進入erlang shell中進行操作時,輸入文字延遲很大。
起初我沒懷疑過port_command有問題,所以我到處找原因和“優化”程式碼,這個優化是加了引號的。 但是最後,在一次伺服器同樣出現狀況很卡的時候,我把tcp傳送資料的程式碼改回了gen_tcp:send,並熱更新了相關模組,伺服器立即恢復正常。 我一直對上面的情況百思不得其解,我之前寫的程式碼如下:
tcp_send (Socket, Bin) ->
try erlang:port_command(Socket, Bin, [force, nosuspend]) of
false ->
exit({game_tcp_send_error, busy});
true ->
true
catch
error : Error ->
exit({game_tcp_send_error, {error, einval, Error}})
end.
希望您能幫忙分析下是什麼原因導致整個erlang節點都卡的,我想這對其他的erlang程式設計師也會有幫助! 關於這個問題我之前寫了篇文章,系統的介紹了gen_tcp的行為,gen_tcp:send的深度解刨和使用指南(初稿)見 這裡 
gen_tcp.erl:L235
send(S, Packet) when is_port(S) ->
case inet_db:lookup_socket(S) of
{ok, Mod} ->
Mod:send(S, Packet);
Error ->
Error
end.
我們就這個問題再深入的分析下,首先看gen_tcp:send的程式碼:
%% inet_tcp.erl:L50
%%                                                                                                                         
%% Send data on a socket                                                                                                   
%%                                                                                                                         
send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts).
send(Socket, Packet) -> prim_inet:send(Socket, Packet, []).
 
%%prim_inet.erl:L349
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                             
%%                                                                                                                         
%% SEND(insock(), Data) -> ok | {error, Reason}                                                                             
%%                                                                                                                         
%% send Data on the socket (io-list)                                                                                       
%%                                                                                                                         
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                             
%% This is a generic "port_command" interface used by TCP, UDP, SCTP, depending                                            
%% on the driver it is mapped to, and the "Data". It actually sends out data,--                                            
%% NOT delegating this task to any back-end.  For SCTP, this function MUST NOT                                             
%% be called directly -- use "sendmsg" instead:                                                                            
%%                                                                                                                         
send(S, Data, OptList) when is_port(S), is_list(OptList) ->
    ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]),
    try erlang:port_command(S, Data, OptList) of
        false -> % Port busy and nosuspend option passed                                                                   
            ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []),
            {error,busy};
        true ->
            receive
                {inet_reply,S,Status} ->
                    ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),
                    Status
            end
    catch
        error:_Error ->
            ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),
             {error,einval}
    end.
我們可以看到gen_tcp:send分為二個步驟 1. port_command提交資料 2. 等待{inet_reply,S,Status}迴應。這是一個典型的阻塞操作,在等待的時候,程序被調出。 所以如果系統中有大量的tcp連結要傳送資料,這種方式有點低效。 所以很多系統把這個動作改成集中提交資料,集中等待迴應。 典型的例子見rabbitmq:
%%rabbit_writer.erl
...
handle_message({inet_reply, _, ok}, State) ->
    State;
handle_message({inet_reply, _, Status}, _State) ->
    exit({writer, send_failed, Status});
handle_message(shutdown, _State) ->
    exit(normal);
...
internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
    true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
                                              Content, FrameMax)),
    ok.
 
port_cmd(Sock, Data) ->
    try rabbit_net:port_command(Sock, Data)
    catch error:Error -> exit({writer, send_failed, Error})
    end.
它的做法是用一個程序集中來發送資料,集中接收回應。在正常情況下,這種處理會大大提高程序切換的開銷,減少等待時間。但是也會帶來問題,我們看到port_command這個操作如果出現意外,被阻塞了,那麼這個系統的訊息傳送會被卡死。而之前由每個處理程序去gen_tcp:send只會阻塞個別程序。 我們仔細看下port_command的文件 port_command(Port, Data, OptionList) -> true|false Types: Port = port() | atom() Data = iodata() OptionList = [Option] Option = force Option = nosuspend Sends data to a port. port_command(Port, Data, []) equals port_command(Port, Data). If the port command is aborted false is returned; otherwise, true is returned. If the port is busy, the calling process will be suspended until the port is not busy anymore. Currently the following Options are valid: force The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag. nosuspend The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned. Note More options may be added in the future. Failures: badarg If Port is not an open port or the registered name of an open port. badarg If Data is not a valid io list. badarg If OptionList is not a valid option list. notsup If the force option has been passed, but the driver of the port does not allow forcing through a busy port. 呼叫port_command是可能引起經常被suspend的,什麼條件呢? 出於效能的考慮, inet會在gen_tcp驅動port中起用一個傳送快取區,當我們的資料超過了緩衝區的高水位線預設情況就會被掛起。 那什麼是傳送緩衝區高低水位線呢?我們看程式碼:
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                             
%%                                                                                                                         
%% SETOPT(insock(), Opt, Value) -> ok | {error, Reason}                                                                    
%% SETOPTS(insock(), [{Opt,Value}]) -> ok | {error, Reason}                                                                
%%                                                                                                                         
%% set socket, ip and driver option                                                                                        
%%                                                                                                                         
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                             
 
setopt(S, Opt, Value) when is_port(S) ->
    setopts(S, [{Opt,Value}]).
 
setopts(S, Opts) when is_port(S) ->
    case encode_opt_val(Opts) of
        {ok, Buf} ->
            case ctl_cmd(S, ?INET_REQ_SETOPTS, Buf) of
                {ok, _} -> ok;
                Error -> Error
            end;
        Error  -> Error
    end.
 
%% Encoding for setopts                                                                                                    
%%                                                                                                                         
%% encode opt/val REVERSED since options are stored in reverse order                                                       
%% i.e. the recent options first (we must process old -> new)                                                              
encode_opt_val(Opts) ->
    try
        enc_opt_val(Opts, [])
    catch
        Reason -> {error,Reason}
    end.
...
enc_opt_val(Opts, Acc, Opt, Val) when is_atom(Opt) ->
    Type = type_opt(set, Opt),
    case type_value(set, Type, Val) of
        true ->
            enc_opt_val(Opts, [enc_opt(Opt),enc_value(set, Type, Val)|Acc]);
        false -> {error,einval}
    end;
...
enc_opt(high_watermark)  -> ?INET_LOPT_TCP_HIWTRMRK;
enc_opt(low_watermark)   -> ?INET_LOPT_TCP_LOWTRMRK;
#define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy  */
#define INET_LOW_WATERMARK  (1024*4) /* 4k pending => allow more */
 
typedef struct {
    inet_descriptor inet;       /* common data structure (DON'T MOVE) */
    int   high;                 /* high watermark */
    int   low;                  /* low watermark */
    int   send_timeout;         /* timeout to use in send */
    int   send_timeout_close;   /* auto-close socket on send_timeout */
    int   busy_on_send;         /* busy on send with timeout! */
    int   i_bufsz;              /* current input buffer size (<= bufsz) */
    ErlDrvBinary* i_buf;        /* current binary buffer */
    char*         i_ptr;        /* current pos in buf */
    char*         i_ptr_start;  /* packet start pos in buf */
    int           i_remain;     /* remaining chars to read */
    int           tcp_add_flags;/* Additional TCP descriptor flags */
    int           http_state;   /* 0 = response|request  1=headers fields */
    inet_async_multi_op *multi_first;/* NULL == no multi-accept-queue, op is in ordinary queue */
    inet_async_multi_op *multi_last;
    MultiTimerData *mtd;        /* Timer structures for multiple accept */
} tcp_descriptor;
 
static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_LOPT_TCP_HIWTRMRK:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
        if (ival < 0) ival = 0;
                if (tdesc->low > ival)
                    tdesc->low = ival;
                tdesc->high = ival;
            }
            continue;
 
        case INET_LOPT_TCP_LOWTRMRK:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
                if (ival < 0) ival = 0;
                if (tdesc->high < ival)
                    tdesc->high = ival;
                tdesc->low = ival;
            }
            continue;
...
}
gen_tcp的預設高低水位線分別為8K/4K, 如何微調參見 節點間通訊的通道微調 我們來驗證下水位線的存在:
$ erl
Erlang R14B04 (erts-5.8.5) 1 [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]
 
Eshell V5.8.5  (abort with ^G)
1> {ok,Sock} = gen_tcp:connect("baidu.com", 80, [{active,false}]).
{ok,#Port<0.595>}
2> inet:getopts(Sock,[high_watermark, low_watermark]).
{ok,[{high_watermark,8192},{low_watermark,4096}]}
3> inet:setopts(Sock,[{high_watermark,131072},{low_watermark, 65536}]).
ok
4> inet:getopts(Sock,[high_watermark, low_watermark]).                
{ok,[{high_watermark,131072},{low_watermark,65536}]}
我們成功的把水位先提高到了128K/64K,同時也驗證了它的存在。那麼如果資料超出水位線會發生什麼事情呢? 我們繼續看文件和程式碼:
//inet_drv.c:L845
static struct erl_drv_entry tcp_inet_driver_entry =
{
    tcp_inet_init,  /* inet_init will add this driver !! */
    tcp_inet_start,
    tcp_inet_stop,
    tcp_inet_command,
#ifdef __WIN32__
    tcp_inet_event,
    NULL,
#else
    tcp_inet_drv_input,
    tcp_inet_drv_output,
#endif
    "tcp_inet",
    NULL,
    NULL,
    tcp_inet_ctl,
    tcp_inet_timeout,
    tcp_inet_commandv,
    NULL,
    tcp_inet_flush,
    NULL,
    NULL,
    ERL_DRV_EXTENDED_MARKER,
    ERL_DRV_EXTENDED_MAJOR_VERSION,
    ERL_DRV_EXTENDED_MINOR_VERSION,
    ERL_DRV_FLAG_USE_PORT_LOCKING|ERL_DRV_FLAG_SOFT_BUSY,
    NULL,
    tcp_inet_process_exit,
    inet_stop_select
};
我們的tcp驅動是支援ERL_DRV_FLAG_SOFT_BUSY的,那麼什麼是ERL_DRV_FLAG_SOFT_BUSY呢? 文件在這裡 int driver_flags This field is used to pass driver capability information to the runtime system. If the extended_marker field equals ERL_DRV_EXTENDED_MARKER, it should contain 0 or driver flags (ERL_DRV_FLAG_*) ored bitwise. Currently the following driver flags exist: ERL_DRV_FLAG_USE_PORT_LOCKING The runtime system will use port level locking on all ports executing this driver instead of driver level locking when the driver is run in a runtime system with SMP support. For more information see the erl_driver documentation. ERL_DRV_FLAG_SOFT_BUSY Marks that driver instances can handle being called in the output and/or outputv callbacks even though a driver instance has marked itself as busy (see set_busy_port()). Since erts version 5.7.4 this flag is required for drivers used by the Erlang distribution (the behaviour has always been required by drivers used by the distribution). 那麼是port_command如何運作的呢?
BIF_RETTYPE port_command_2(BIF_ALIST_2)
{
    return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL, 0);
}
//erl_bif_port.c:L120:
static BIF_RETTYPE do_port_command(Process *BIF_P,
                                   Eterm BIF_ARG_1,
                                   Eterm BIF_ARG_2,
                                   Eterm BIF_ARG_3,
                                   Uint32 flags)
{
...
    if ((flags & ERTS_PORT_COMMAND_FLAG_FORCE)
        && !(p->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY)) {
        ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP);
    }
    else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE)
             && p->status & ERTS_PORT_SFLG_PORT_BUSY) {
        if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) {
            ERTS_BIF_PREP_RET(res, am_false);
        }
        else {
            erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, p);
            if (erts_system_monitor_flags.busy_port) {
                monitor_generic(BIF_P, am_busy_port, p->id);
            }
            ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_port_command_3], BIF_P,
                                 BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
        }
    } else {
        int wres;
        erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
        ERTS_SMP_CHK_NO_PROC_LOCKS;
        wres = erts_write_to_port(BIF_P->id, p, BIF_ARG_2);
        erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
        if (wres != 0) {
            ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG);
        }
    }
 
...
}
從程式碼我們可以看出: 1. 如果port_command設定了force標誌,但是驅動不支援ERL_DRV_FLAG_SOFT_BUSY, 要返回EXC_NOTSUP錯誤。 我們的驅動支援ERL_DRV_FLAG_SOFT_BUSY的,所以如果force的話,資料入緩衝區; 2. 如果設定了NOSUSPEND,但是port已經busy了,返回false,表明傳送失敗。否則的話就把傳送程序suspend,同時告訴system_monitor系統現在有port進入busy_port了。 透過system_monitor我們可以監控port的busy,參看:這裡 erlang:system_monitor(MonitorPid, [Option]) -> MonSettings Types: MonitorPid = pid() Option = {long_gc, Time} | {large_heap, Size} | busy_port | busy_dist_port Time = Size = int() MonSettings = {OldMonitorPid, [Option]} OldMonitorPid = pid() Sets system performance monitoring options. MonitorPid is a local pid that will receive system monitor messages, and the second argument is a list of monitoring options: {long_gc, Time} If a garbage collection in the system takes at least Time wallclock milliseconds, a message {monitor, GcPid, long_gc, Info} is sent to MonitorPid. GcPid is the pid that was garbage collected and Info is a list of two-element tuples describing the result of the garbage collection. One of the tuples is {timeout, GcTime} where GcTime is the actual time for the garbage collection in milliseconds. The other tuples are tagged with heap_size, heap_block_size, stack_size, mbuf_size, old_heap_size, and old_heap_block_size. These tuples are explained in the documentation of the gc_start trace message (see erlang:trace/3). New tuples may be added, and the order of the tuples in the Info list may be changed at any time without prior notice. {large_heap, Size} If a garbage collection in the system results in the allocated size of a heap being at least Size words, a message {monitor, GcPid, large_heap, Info} is sent to MonitorPid. GcPid and Info are the same as for long_gc above, except that the tuple tagged with timeout is not present. Note: As of erts version 5.6 the monitor message is sent if the sum of the sizes of all memory blocks allocated for all heap generations is equal to or larger than Size. Previously the monitor message was sent if the memory block allocated for the youngest generation was equal to or larger than Size. busy_port If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending to Port. busy_dist_port If a process in the system gets suspended because it sends to a process on a remote node whose inter-node communication was handled by a busy port, a message {monitor, SusPid, busy_dist_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending through the inter-node communication port Port. Returns the previous system monitor settings just like erlang:system_monitor/0. Note If a monitoring process gets so large that it itself starts to cause system monitor messages when garbage collecting, the messages will enlarge the process’s message queue and probably make the problem worse. Keep the monitoring process neat and do not set the system monitor limits too tight. Failure: badarg if MonitorPid does not exist. 那麼發生的資料多於高水位線的時候要設定busy_port如何實現的呢? 看程式碼:
static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev)
{
    tcp_descriptor* desc = (tcp_descriptor*)e;
    desc->inet.caller = driver_caller(desc->inet.port);
 
    DEBUGF(("tcp_inet_commanv(%ld) {s=%d\r\n",
            (long)desc->inet.port, desc->inet.s));
    if (!IS_CONNECTED(INETP(desc))) {
        if (desc->tcp_add_flags & TCP_ADDF_DELAYED_CLOSE_SEND) {
            desc->tcp_add_flags &= ~TCP_ADDF_DELAYED_CLOSE_SEND;
            inet_reply_error_am(INETP(desc), am_closed);
    }
        else
            inet_reply_error(INETP(desc), ENOTCONN);
    }
    else if (tcp_sendv(desc, ev) == 0)
        inet_reply_ok(INETP(desc));
    DEBUGF(("tcp_inet_commandv(%ld) }\r\n", (long)desc->inet.port));
}
 
/*                                                                                                                         
** Send non-blocking vector data                                                                                           
*/
static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev)
{
...
if ((sz = driver_sizeq(ix)) > 0) {
        driver_enqv(ix, ev, 0);
        if (sz+ev->size >= desc->high) {
            DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\r\n",
                    (long)desc->inet.port, desc->inet.s));
            desc->inet.state |= INET_F_BUSY;  /* mark for low-watermark */
            desc->inet.busy_caller = desc->inet.caller;
            set_busy_port(desc->inet.port, 1);
            if (desc->send_timeout != INET_INFINITY) {
                desc->busy_on_send = 1;
                driver_set_timer(desc->inet.port, desc->send_timeout);
            }
            return 1;
        }
    }
 
...
}
 
// beam/io.c:L2352
void
set_busy_port(ErlDrvPort port_num, int on)
{
    ERTS_SMP_CHK_NO_PROC_LOCKS;
 
    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));
 
    if (on) {
        erts_port_status_bor_set(&erts_port[port_num],
                                 ERTS_PORT_SFLG_PORT_BUSY);
    } else {
        ErtsProcList* plp = erts_port[port_num].suspended;
        erts_port_status_band_set(&erts_port[port_num],
                                  ~ERTS_PORT_SFLG_PORT_BUSY);
        erts_port[port_num].suspended = NULL;
 
        if (erts_port[port_num].dist_entry) {
            /*                                                                                                             
             * Processes suspended on distribution ports are                                                               
             * normally queued on the dist entry.                                                                          
             */
            erts_dist_port_not_busy(&erts_port[port_num]);
        }
        /*                                                                                                                 
         * Resume, in a round-robin fashion, all processes waiting on the port.                                            
         *                                                                                                                 
         * This version submitted by Tony Rogvall. The earlier version used                                                
         * to resume the processes in order, which caused starvation of all but                                            
         * the first process.                                                                                              
         */
 
        if (plp) {
            /* First proc should be resumed last */
            if (plp->next) {
                erts_resume_processes(plp->next);
                plp->next = NULL;
            }
            erts_resume_processes(plp);
        }
    }
}
也就是說一旦超過了,只是設定下busy_port標誌,但是本次程序並沒有被掛起,下次傳送者才會被掛起。同時會開啟send_timeout定時器,如果資料在send_timeout時間內未傳送出去就會出現timeout錯誤。 那麼掛起的程序如何被解除掛起,繼續執行呢,看程式碼:
/* socket ready for ouput:                                                                                                 
** 1. TCP_STATE_CONNECTING => non block connect ?                                                                          
** 2. TCP_STATE_CONNECTED  => write output                                                                                 
*/
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
{
...
 if (driver_deq(ix, n) <= desc->low) {
                if (IS_BUSY(INETP(desc))) {
                    desc->inet.caller = desc->inet.busy_caller;
                    desc->inet.state &= ~INET_F_BUSY;
                    set_busy_port(desc->inet.port, 0);
                    /* if we have a timer then cancel and send ok to client */
                    if (desc->busy_on_send) {
                        driver_cancel_timer(desc->inet.port);
                        desc->busy_on_send = 0;
                    }
                    inet_reply_ok(INETP(desc));
                }
            }
 
...
}
 
//erl_process.c:5038
/*                                                                                                                         
** Suspend a process                                                                                                       
** If we are to suspend on a port the busy_port is the thing                                                               
** otherwise busy_port is NIL                                                                                              
*/
 
void
erts_suspend(Process* process, ErtsProcLocks process_locks, Port *busy_port)
{
    ErtsRunQueue *rq;
 
    ERTS_SMP_LC_ASSERT(process_locks == erts_proc_lc_my_proc_locks(process));
    if (!(process_locks & ERTS_PROC_LOCK_STATUS))
        erts_smp_proc_lock(process, ERTS_PROC_LOCK_STATUS);
 
    rq = erts_get_runq_proc(process);
 
    erts_smp_runq_lock(rq);
 
    suspend_process(rq, process);
 
    erts_smp_runq_unlock(rq);
 
    if (busy_port)
    erts_wake_process_later(busy_port, process);
 
    if (!(process_locks & ERTS_PROC_LOCK_STATUS))
    erts_smp_proc_unlock(process, ERTS_PROC_LOCK_STATUS);
 
}
 
 
// io.c:474:
void
erts_wake_process_later(Port *prt, Process *process)
{
    ErtsProcList** p;
    ErtsProcList* new_p;
 
    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
 
    if (prt->status & ERTS_PORT_SFLGS_DEAD)
        return;
 
    for (p = &(prt->suspended); *p != NULL; p = &((*p)->next))
    /* Empty loop body */;
 
    new_p = erts_proclist_create(process);
    new_p->next = NULL;
    *p = new_p;
}
我們可以看到隨著資料被髮送出去,緩衝區裡面的資料如果少於低水位線,那麼就解除busy_port標誌,同時喚醒所有被掛起在這個port的程序,繼續執行。 還有一種情況port會被掛起,那就是port也是公平排程的,預防過快的IO把其他的port餓死了. port是和程序一樣公平排程的.  程序是按照reductions為單位排程的, port是把傳送的位元組數摺合成reductions.  所以如果一個程序傳送大量的tcp資料 那麼這個程序不是一直會得到執行的. 執行期會強制停止一段時間, 讓其他port有機會執行的. 我們看下程式碼:
//erl_port_task.c:L45
/*                                                                                                                         
 * Costs in reductions for some port operations.                                                                           
 */
#define ERTS_PORT_REDS_EXECUTE          0
#define ERTS_PORT_REDS_FREE             50
#define ERTS_PORT_REDS_TIMEOUT          200
#define ERTS_PORT_REDS_INPUT            200
#define ERTS_PORT_REDS_OUTPUT           200
#define ERTS_PORT_REDS_EVENT            200
#define ERTS_PORT_REDS_TERMINATE        100
 
/*                                                                                                                         
 * Run all scheduled tasks for the first port in run queue. If                                                             
 * new tasks appear while running reschedule port (free task is                                                            
 * an exception; it is always handled instantly).                                                                          
 *                                                                                                                         
 * erts_port_task_execute() is called by scheduler threads between                                                         
 * scheduleing of processes. Sched lock should be held by caller.                                                          
 */
 
int
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
 ...
 case ERTS_PORT_TASK_TIMEOUT:
            reds += ERTS_PORT_REDS_TIMEOUT;
            if (!(pp->status & ERTS_PORT_SFLGS_DEAD))
                (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
            break;
        case ERTS_PORT_TASK_INPUT:
        reds += ERTS_PORT_REDS_INPUT;
            ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
            /* NOTE some windows drivers use ->ready_input for input and output */
        (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->event);
            io_tasks_executed++;
            break;
        case ERTS_PORT_TASK_OUTPUT:
            reds += ERTS_PORT_REDS_OUTPUT;
            ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
            (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event);
            io_tasks_executed++;
            break;
        case ERTS_PORT_TASK_EVENT:
            reds += ERTS_PORT_REDS_EVENT;
            ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
            (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data);
            io_tasks_executed++;
            break;
        case ERTS_PORT_TASK_DIST_CMD:
            reds += erts_dist_command(pp, CONTEXT_REDS-reds);
            break;
 
...
  ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds);
 
    return res;
 
}
 
#define ERTS_PORT_REDUCTIONS_EXECUTED(RQ, REDS)                 \
do {                                                            \
    (RQ)->ports.info.reds += (REDS);                            \
    (RQ)->check_balance_reds -= (REDS);                         \
    (RQ)->wakeup_other_reds += (REDS);                          \
} while (0)
從程式碼可以看出port的排程的時間片是從宿主的程序的時間片裡面扣的, #define ERTS_PORT_REDS_INPUT 200 #define ERTS_PORT_REDS_OUTPUT 200 每個讀寫佔用200個時間片,而每個程序初始分配2000個時間片,也就是說做10次輸出就要被排程了。 通過上面的分析我們知道被gen_tcp傳送程序被掛起的原因。 對策就是如果該程序不能阻塞,那麼就新增force標誌,強行往緩衝區加入資料,同時設定{send_timeout, Integer}。 如果該socket在指定的時間內無法把資料傳送完成,那麼就直接宣告socket傳送超時,避免了潛在的force加資料造成的緩衝區佔用大量記憶體而出現問題。 上面分析過,gen_tcp資料的傳送需要佔用宿主程序的reds,這也可能造成宿主程序被掛起,在設計的時候儘量避免一個程序擁有太多的port. 試驗過程稍後奉上! 祝大家玩得開心!