1. 程式人生 > >系統技術非業餘研究 » gen_tcp傳送程序被掛起起因分析及對策

系統技術非業餘研究 » gen_tcp傳送程序被掛起起因分析及對策

最近有同學在gmail上問關於gen_tcp傳送程序被掛起的問題,問題描述的非常好,見底下:

第一個問題是關於port_command和gen_tcp:send的。從專案上線至今,我在tcp傳送的地方遇到過兩次問題,都跟port_command有關係。

起初程式的效能不好,我從各方面嘗試分析和優化,還有部分是靠猜測,當初把全服廣播訊息的地方,換成了port_command,當時參考了hotwheels的程式碼和您的一遍相關博文。

根據您的分析,port_command應該比直接用gen_tcp:send高效的,並且沒有阻塞。但是我卻在這個地方遇到了阻塞,具體表現如下(兩次,分別出現在專案不同階段,下面分別描述)

專案上線初期:

當時玩家程序給玩家發訊息用的是gen_tcp:send,廣播程序為了高效率用了port_command。當活躍玩家到了一定數量以後,玩家無法進入遊戲,分析原因,是全域性傳送廣播訊息的程序堵住了,從message_queue_len可以看出來,改為廣播程序給玩家程序發訊息再讓玩家程序給玩家自己發訊息後,狀況排除。

最近一段時間:

這時候玩家程序的tcp傳送資料,已經被我替換成了port_command並運行了一段時間都沒問題。但是一些流量比較大的遊戲服,活躍玩家到了一定數量以後,訊息延遲很大(5-6秒),做任何操作都卡,在出現狀況期間,伺服器CPU、記憶體、負載各項指標並未異常,ssh連到伺服器操作也很正常,沒有任何卡頓現象。同伺服器的其它遊戲服也都正常,但是出問題的遊戲服的整個erlang節點都進入一個“很卡”的狀態,體現在我進入erlang shell中進行操作時,輸入文字延遲很大。

起初我沒懷疑過port_command有問題,所以我到處找原因和“優化”程式碼,這個優化是加了引號的。

但是最後,在一次伺服器同樣出現狀況很卡的時候,我把tcp傳送資料的程式碼改回了gen_tcp:send,並熱更新了相關模組,伺服器立即恢復正常。

我一直對上面的情況百思不得其解,我之前寫的程式碼如下:

tcp_send (Socket, Bin) ->
try erlang:port_command(Socket, Bin, [force, nosuspend]) of
false ->
exit({game_tcp_send_error, busy});
true ->
true
catch
error : Error ->
exit({game_tcp_send_error, {error, einval, Error}})
end.

希望您能幫忙分析下是什麼原因導致整個erlang節點都卡的,我想這對其他的erlang程式設計師也會有幫助!

關於這個問題我之前寫了篇文章,系統的介紹了gen_tcp的行為,gen_tcp:send的深度解刨和使用指南(初稿)見 這裡

gen_tcp.erl:L235
send(S, Packet) when is_port(S) ->
case inet_db:lookup_socket(S) of
{ok, Mod} ->
Mod:send(S, Packet);
Error ->
Error
end.

我們就這個問題再深入的分析下,首先看gen_tcp:send的程式碼:

%% inet_tcp.erl:L50
%%                                                                                                                          
%% Send data on a socket                                                                                                    
%%                                                                                                                          
send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts).
send(Socket, Packet) -> prim_inet:send(Socket, Packet, []).

%%prim_inet.erl:L349
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                              
%%                                                                                                                          
%% SEND(insock(), Data) -> ok | {error, Reason}                                                                              
%%                                                                                                                          
%% send Data on the socket (io-list)                                                                                        
%%                                                                                                                          
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                              
%% This is a generic "port_command" interface used by TCP, UDP, SCTP, depending                                             
%% on the driver it is mapped to, and the "Data". It actually sends out data,--                                             
%% NOT delegating this task to any back-end.  For SCTP, this function MUST NOT                                              
%% be called directly -- use "sendmsg" instead:                                                                             
%%                                                                                                                          
send(S, Data, OptList) when is_port(S), is_list(OptList) ->
    ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]),
    try erlang:port_command(S, Data, OptList) of
        false -> % Port busy and nosuspend option passed                                                                    
            ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []),
            {error,busy};
        true ->
            receive
                {inet_reply,S,Status} ->
                    ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),
                    Status
            end
    catch
        error:_Error ->
            ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),
             {error,einval}
    end.

我們可以看到gen_tcp:send分為二個步驟 1. port_command提交資料 2. 等待{inet_reply,S,Status}迴應。這是一個典型的阻塞操作,在等待的時候,程序被調出。
所以如果系統中有大量的tcp連結要傳送資料,這種方式有點低效。 所以很多系統把這個動作改成集中提交資料,集中等待迴應。

典型的例子見rabbitmq:

%%rabbit_writer.erl
...
handle_message({inet_reply, _, ok}, State) ->
    State;
handle_message({inet_reply, _, Status}, _State) ->
    exit({writer, send_failed, Status});
handle_message(shutdown, _State) ->
    exit(normal);
...
internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
    true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
                                              Content, FrameMax)),
    ok.

port_cmd(Sock, Data) ->
    try rabbit_net:port_command(Sock, Data)
    catch error:Error -> exit({writer, send_failed, Error})
    end.

它的做法是用一個程序集中來發送資料,集中接收回應。在正常情況下,這種處理會大大提高程序切換的開銷,減少等待時間。但是也會帶來問題,我們看到port_command這個操作如果出現意外,被阻塞了,那麼這個系統的訊息傳送會被卡死。而之前由每個處理程序去gen_tcp:send只會阻塞個別程序。

我們仔細看下port_command的文件

port_command(Port, Data, OptionList) -> true|false

Types:

Port = port() | atom()
Data = iodata()
OptionList = [Option]
Option = force
Option = nosuspend
Sends data to a port. port_command(Port, Data, []) equals port_command(Port, Data).

If the port command is aborted false is returned; otherwise, true is returned.

If the port is busy, the calling process will be suspended until the port is not busy anymore.

Currently the following Options are valid:

force
The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag.
nosuspend
The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned.
Note
More options may be added in the future.

Failures:

badarg
If Port is not an open port or the registered name of an open port.
badarg
If Data is not a valid io list.
badarg
If OptionList is not a valid option list.
notsup
If the force option has been passed, but the driver of the port does not allow forcing through a busy port.

呼叫port_command是可能引起經常被suspend的,什麼條件呢? 出於效能的考慮, inet會在gen_tcp驅動port中起用一個傳送快取區,當我們的資料超過了緩衝區的高水位線預設情況就會被掛起。

那什麼是傳送緩衝區高低水位線呢?我們看程式碼:

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                              
%%                                                                                                                          
%% SETOPT(insock(), Opt, Value) -> ok | {error, Reason}                                                                     
%% SETOPTS(insock(), [{Opt,Value}]) -> ok | {error, Reason}                                                                 
%%                                                                                                                          
%% set socket, ip and driver option                                                                                         
%%                                                                                                                          
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                              

setopt(S, Opt, Value) when is_port(S) -> 
    setopts(S, [{Opt,Value}]).

setopts(S, Opts) when is_port(S) ->
    case encode_opt_val(Opts) of
        {ok, Buf} ->
            case ctl_cmd(S, ?INET_REQ_SETOPTS, Buf) of
                {ok, _} -> ok;
                Error -> Error
            end;
        Error  -> Error
    end.

%% Encoding for setopts                                                                                                     
%%                                                                                                                          
%% encode opt/val REVERSED since options are stored in reverse order                                                        
%% i.e. the recent options first (we must process old -> new)                                                               
encode_opt_val(Opts) -> 
    try
        enc_opt_val(Opts, [])
    catch
        Reason -> {error,Reason}
    end.
...
enc_opt_val(Opts, Acc, Opt, Val) when is_atom(Opt) ->
    Type = type_opt(set, Opt),
    case type_value(set, Type, Val) of
        true -> 
            enc_opt_val(Opts, [enc_opt(Opt),enc_value(set, Type, Val)|Acc]);
        false -> {error,einval}
    end;
...
enc_opt(high_watermark)  -> ?INET_LOPT_TCP_HIWTRMRK;
enc_opt(low_watermark)   -> ?INET_LOPT_TCP_LOWTRMRK;
#define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy  */
#define INET_LOW_WATERMARK  (1024*4) /* 4k pending => allow more */

typedef struct {
    inet_descriptor inet;       /* common data structure (DON'T MOVE) */
    int   high;                 /* high watermark */
    int   low;                  /* low watermark */
    int   send_timeout;         /* timeout to use in send */
    int   send_timeout_close;   /* auto-close socket on send_timeout */
    int   busy_on_send;         /* busy on send with timeout! */
    int   i_bufsz;              /* current input buffer size (<= bufsz) */
    ErlDrvBinary* i_buf;        /* current binary buffer */
    char*         i_ptr;        /* current pos in buf */
    char*         i_ptr_start;  /* packet start pos in buf */
    int           i_remain;     /* remaining chars to read */
    int           tcp_add_flags;/* Additional TCP descriptor flags */
    int           http_state;   /* 0 = response|request  1=headers fields */
    inet_async_multi_op *multi_first;/* NULL == no multi-accept-queue, op is in ordinary queue */
    inet_async_multi_op *multi_last;
    MultiTimerData *mtd;        /* Timer structures for multiple accept */
} tcp_descriptor;

static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_LOPT_TCP_HIWTRMRK:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
		if (ival < 0) ival = 0;
                if (tdesc->low > ival)
                    tdesc->low = ival;
                tdesc->high = ival;
            }
            continue;

        case INET_LOPT_TCP_LOWTRMRK:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
                if (ival < 0) ival = 0;
                if (tdesc->high < ival)
                    tdesc->high = ival;
                tdesc->low = ival;
            }
            continue;
...
}

gen_tcp的預設高低水位線分別為8K/4K, 如何微調參見 節點間通訊的通道微調
我們來驗證下水位線的存在:

$ erl
Erlang R14B04 (erts-5.8.5)  [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.5  (abort with ^G)
1> {ok,Sock} = gen_tcp:connect("baidu.com", 80, [{active,false}]).
{ok,#Port<0.595>}
2> inet:getopts(Sock,[high_watermark, low_watermark]).
{ok,[{high_watermark,8192},{low_watermark,4096}]}
3> inet:setopts(Sock,[{high_watermark,131072},{low_watermark, 65536}]).
ok
4> inet:getopts(Sock,[high_watermark, low_watermark]).                 
{ok,[{high_watermark,131072},{low_watermark,65536}]}

我們成功的把水位先提高到了128K/64K,同時也驗證了它的存在。那麼如果資料超出水位線會發生什麼事情呢?
我們繼續看文件和程式碼:

//inet_drv.c:L845
static struct erl_drv_entry tcp_inet_driver_entry =
{
    tcp_inet_init,  /* inet_init will add this driver !! */
    tcp_inet_start,
    tcp_inet_stop,
    tcp_inet_command,
#ifdef __WIN32__
    tcp_inet_event,
    NULL,
#else
    tcp_inet_drv_input,
    tcp_inet_drv_output,
#endif
    "tcp_inet",
    NULL,
    NULL,
    tcp_inet_ctl,
    tcp_inet_timeout,
    tcp_inet_commandv,
    NULL,
    tcp_inet_flush,
    NULL,
    NULL,
    ERL_DRV_EXTENDED_MARKER,
    ERL_DRV_EXTENDED_MAJOR_VERSION,
    ERL_DRV_EXTENDED_MINOR_VERSION,
    ERL_DRV_FLAG_USE_PORT_LOCKING|ERL_DRV_FLAG_SOFT_BUSY,
    NULL,
    tcp_inet_process_exit,
    inet_stop_select
};

我們的tcp驅動是支援ERL_DRV_FLAG_SOFT_BUSY的,那麼什麼是ERL_DRV_FLAG_SOFT_BUSY呢?

文件在這裡

int driver_flags
This field is used to pass driver capability information to the runtime system. If the extended_marker field equals ERL_DRV_EXTENDED_MARKER, it should contain 0 or driver flags (ERL_DRV_FLAG_*) ored bitwise. Currently the following driver flags exist:

ERL_DRV_FLAG_USE_PORT_LOCKING
The runtime system will use port level locking on all ports executing this driver instead of driver level locking when the driver is run in a runtime system with SMP support. For more information see the erl_driver documentation.
ERL_DRV_FLAG_SOFT_BUSY
Marks that driver instances can handle being called in the output and/or outputv callbacks even though a driver instance has marked itself as busy (see set_busy_port()). Since erts version 5.7.4 this flag is required for drivers used by the Erlang distribution (the behaviour has always been required by drivers used by the distribution).

那麼是port_command如何運作的呢?

BIF_RETTYPE port_command_2(BIF_ALIST_2)
{
    return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL, 0);
}
//erl_bif_port.c:L120:
static BIF_RETTYPE do_port_command(Process *BIF_P,
                                   Eterm BIF_ARG_1,
                                   Eterm BIF_ARG_2,
                                   Eterm BIF_ARG_3,
                                   Uint32 flags)
{
...
    if ((flags & ERTS_PORT_COMMAND_FLAG_FORCE)
        && !(p->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY)) {
        ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP);
    }
    else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE)
             && p->status & ERTS_PORT_SFLG_PORT_BUSY) {
        if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) {
            ERTS_BIF_PREP_RET(res, am_false);
        }
        else {
            erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, p);
            if (erts_system_monitor_flags.busy_port) {
                monitor_generic(BIF_P, am_busy_port, p->id);
            }
            ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_port_command_3], BIF_P,
                                 BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
        }
    } else {
        int wres;
        erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
        ERTS_SMP_CHK_NO_PROC_LOCKS;
        wres = erts_write_to_port(BIF_P->id, p, BIF_ARG_2);
        erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
        if (wres != 0) {
            ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG);
        }
    }

...
}

從程式碼我們可以看出:
1. 如果port_command設定了force標誌,但是驅動不支援ERL_DRV_FLAG_SOFT_BUSY, 要返回EXC_NOTSUP錯誤。
我們的驅動支援ERL_DRV_FLAG_SOFT_BUSY的,所以如果force的話,資料入緩衝區;

2. 如果設定了NOSUSPEND,但是port已經busy了,返回false,表明傳送失敗。否則的話就把傳送程序suspend,同時告訴system_monitor系統現在有port進入busy_port了。

透過system_monitor我們可以監控port的busy,參看:這裡

erlang:system_monitor(MonitorPid, [Option]) -> MonSettings

Types:

MonitorPid = pid()
Option = {long_gc, Time} | {large_heap, Size} | busy_port | busy_dist_port
Time = Size = int()
MonSettings = {OldMonitorPid, [Option]}
OldMonitorPid = pid()
Sets system performance monitoring options. MonitorPid is a local pid that will receive system monitor messages, and the second argument is a list of monitoring options:

{long_gc, Time}
If a garbage collection in the system takes at least Time wallclock milliseconds, a message {monitor, GcPid, long_gc, Info} is sent to MonitorPid. GcPid is the pid that was garbage collected and Info is a list of two-element tuples describing the result of the garbage collection. One of the tuples is {timeout, GcTime} where GcTime is the actual time for the garbage collection in milliseconds. The other tuples are tagged with heap_size, heap_block_size, stack_size, mbuf_size, old_heap_size, and old_heap_block_size. These tuples are explained in the documentation of the gc_start trace message (see erlang:trace/3). New tuples may be added, and the order of the tuples in the Info list may be changed at any time without prior notice.

{large_heap, Size}
If a garbage collection in the system results in the allocated size of a heap being at least Size words, a message {monitor, GcPid, large_heap, Info} is sent to MonitorPid. GcPid and Info are the same as for long_gc above, except that the tuple tagged with timeout is not present. Note: As of erts version 5.6 the monitor message is sent if the sum of the sizes of all memory blocks allocated for all heap generations is equal to or larger than Size. Previously the monitor message was sent if the memory block allocated for the youngest generation was equal to or larger than Size.

busy_port
If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending to Port.

busy_dist_port
If a process in the system gets suspended because it sends to a process on a remote node whose inter-node communication was handled by a busy port, a message {monitor, SusPid, busy_dist_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending through the inter-node communication port Port.

Returns the previous system monitor settings just like erlang:system_monitor/0.

Note
If a monitoring process gets so large that it itself starts to cause system monitor messages when garbage collecting, the messages will enlarge the process’s message queue and probably make the problem worse.

Keep the monitoring process neat and do not set the system monitor limits too tight.

Failure: badarg if MonitorPid does not exist.

那麼發生的資料多於高水位線的時候要設定busy_port如何實現的呢?
看程式碼:

static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev)
{
    tcp_descriptor* desc = (tcp_descriptor*)e;
    desc->inet.caller = driver_caller(desc->inet.port);

    DEBUGF(("tcp_inet_commanv(%ld) {s=%d\r\n",
            (long)desc->inet.port, desc->inet.s));
    if (!IS_CONNECTED(INETP(desc))) {
        if (desc->tcp_add_flags & TCP_ADDF_DELAYED_CLOSE_SEND) {
            desc->tcp_add_flags &= ~TCP_ADDF_DELAYED_CLOSE_SEND;
            inet_reply_error_am(INETP(desc), am_closed);
	}
        else
            inet_reply_error(INETP(desc), ENOTCONN);
    }
    else if (tcp_sendv(desc, ev) == 0)
        inet_reply_ok(INETP(desc));
    DEBUGF(("tcp_inet_commandv(%ld) }\r\n", (long)desc->inet.port));
}

/*                                                                                                                          
** Send non-blocking vector data                                                                                            
*/
static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev)
{
...
if ((sz = driver_sizeq(ix)) > 0) {
        driver_enqv(ix, ev, 0);
        if (sz+ev->size >= desc->high) {
            DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\r\n",
                    (long)desc->inet.port, desc->inet.s));
            desc->inet.state |= INET_F_BUSY;  /* mark for low-watermark */
            desc->inet.busy_caller = desc->inet.caller;
            set_busy_port(desc->inet.port, 1);
            if (desc->send_timeout != INET_INFINITY) {
                desc->busy_on_send = 1;
                driver_set_timer(desc->inet.port, desc->send_timeout);
            }
            return 1;
        }
    }

...
}

// beam/io.c:L2352
void
set_busy_port(ErlDrvPort port_num, int on)
{
    ERTS_SMP_CHK_NO_PROC_LOCKS;

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));

    if (on) {
        erts_port_status_bor_set(&erts_port[port_num],
                                 ERTS_PORT_SFLG_PORT_BUSY);
    } else {
        ErtsProcList* plp = erts_port[port_num].suspended;
        erts_port_status_band_set(&erts_port[port_num],
                                  ~ERTS_PORT_SFLG_PORT_BUSY);
        erts_port[port_num].suspended = NULL;

        if (erts_port[port_num].dist_entry) {
            /*                                                                                                              
             * Processes suspended on distribution ports are                                                                
             * normally queued on the dist entry.                                                                           
             */
            erts_dist_port_not_busy(&erts_port[port_num]);
        }
        /*                                                                                                                  
         * Resume, in a round-robin fashion, all processes waiting on the port.                                             
         *                                                                                                                  
         * This version submitted by Tony Rogvall. The earlier version used                                                 
         * to resume the processes in order, which caused starvation of all but                                             
         * the first process.                                                                                               
         */

        if (plp) {
            /* First proc should be resumed last */
            if (plp->next) {
                erts_resume_processes(plp->next);
                plp->next = NULL;
            }
            erts_resume_processes(plp);
        }
    }
}

也就是說一旦超過了,只是設定下busy_port標誌,但是本次程序並沒有被掛起,下次傳送者才會被掛起。同時會開啟send_timeout定時器,如果資料在send_timeout時間內未傳送出去就會出現timeout錯誤。

那麼掛起的程序如何被解除掛起,繼續執行呢,看程式碼:

/* socket ready for ouput:                                                                                                  
** 1. TCP_STATE_CONNECTING => non block connect ?                                                                           
** 2. TCP_STATE_CONNECTED  => write output                                                                                  
*/
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
{
...
 if (driver_deq(ix, n) <= desc->low) {
                if (IS_BUSY(INETP(desc))) {
                    desc->inet.caller = desc->inet.busy_caller;
                    desc->inet.state &= ~INET_F_BUSY;
                    set_busy_port(desc->inet.port, 0);
                    /* if we have a timer then cancel and send ok to client */
                    if (desc->busy_on_send) {
                        driver_cancel_timer(desc->inet.port);
                        desc->busy_on_send = 0;
                    }
                    inet_reply_ok(INETP(desc));
                }
            }

...
}

//erl_process.c:5038
/*                                                                                                                          
** Suspend a process                                                                                                        
** If we are to suspend on a port the busy_port is the thing                                                                
** otherwise busy_port is NIL                                                                                               
*/

void
erts_suspend(Process* process, ErtsProcLocks process_locks, Port *busy_port)
{
    ErtsRunQueue *rq;

    ERTS_SMP_LC_ASSERT(process_locks == erts_proc_lc_my_proc_locks(process));
    if (!(process_locks & ERTS_PROC_LOCK_STATUS))
        erts_smp_proc_lock(process, ERTS_PROC_LOCK_STATUS);

    rq = erts_get_runq_proc(process);

    erts_smp_runq_lock(rq);

    suspend_process(rq, process);

    erts_smp_runq_unlock(rq);

    if (busy_port)
	erts_wake_process_later(busy_port, process);

    if (!(process_locks & ERTS_PROC_LOCK_STATUS))
	erts_smp_proc_unlock(process, ERTS_PROC_LOCK_STATUS);

}


// io.c:474:
void
erts_wake_process_later(Port *prt, Process *process)
{
    ErtsProcList** p;
    ErtsProcList* new_p;

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));

    if (prt->status & ERTS_PORT_SFLGS_DEAD)
        return;

    for (p = &(prt->suspended); *p != NULL; p = &((*p)->next))
	/* Empty loop body */;

    new_p = erts_proclist_create(process);
    new_p->next = NULL;
    *p = new_p;
}

我們可以看到隨著資料被髮送出去,緩衝區裡面的資料如果少於低水位線,那麼就解除busy_port標誌,同時喚醒所有被掛起在這個port的程序,繼續執行。

還有一種情況port會被掛起,那就是port也是公平排程的,預防過快的IO把其他的port餓死了.

port是和程序一樣公平排程的.  程序是按照reductions為單位排程的, port是把傳送的位元組數摺合成reductions.  所以如果一個程序傳送大量的tcp資料 那麼這個程序不是一直會得到執行的. 執行期會強制停止一段時間, 讓其他port有機會執行的.

我們看下程式碼:

//erl_port_task.c:L45
/*                                                                                                                          
 * Costs in reductions for some port operations.                                                                            
 */
#define ERTS_PORT_REDS_EXECUTE          0
#define ERTS_PORT_REDS_FREE             50
#define ERTS_PORT_REDS_TIMEOUT          200
#define ERTS_PORT_REDS_INPUT            200
#define ERTS_PORT_REDS_OUTPUT           200
#define ERTS_PORT_REDS_EVENT            200
#define ERTS_PORT_REDS_TERMINATE        100

/*                                                                                                                          
 * Run all scheduled tasks for the first port in run queue. If                                                              
 * new tasks appear while running reschedule port (free task is                                                             
 * an exception; it is always handled instantly).                                                                           
 *                                                                                                                          
 * erts_port_task_execute() is called by scheduler threads between                                                          
 * scheduleing of processes. Sched lock should be held by caller.                                                           
 */

int
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
 ...
 case ERTS_PORT_TASK_TIMEOUT:
            reds += ERTS_PORT_REDS_TIMEOUT;
            if (!(pp->status & ERTS_PORT_SFLGS_DEAD))
                (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
            break;
        case ERTS_PORT_TASK_INPUT:
	    reds += ERTS_PORT_REDS_INPUT;
            ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
            /* NOTE some windows drivers use ->ready_input for input and output */
	    (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->event);
            io_tasks_executed++;
            break;
        case ERTS_PORT_TASK_OUTPUT:
            reds += ERTS_PORT_REDS_OUTPUT;
            ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
            (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event);
            io_tasks_executed++;
            break;
        case ERTS_PORT_TASK_EVENT:
            reds += ERTS_PORT_REDS_EVENT;
            ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
            (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data);
            io_tasks_executed++;
            break;
        case ERTS_PORT_TASK_DIST_CMD:
            reds += erts_dist_command(pp, CONTEXT_REDS-reds);
            break;

...
  ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds);

    return res;

}

#define ERTS_PORT_REDUCTIONS_EXECUTED(RQ, REDS)                 \
do {                                                            \
    (RQ)->ports.info.reds += (REDS);                            \
    (RQ)->check_balance_reds -= (REDS);                         \
    (RQ)->wakeup_other_reds += (REDS);                          \
} while (0)

從程式碼可以看出port的排程的時間片是從宿主的程序的時間片裡面扣的,
#define ERTS_PORT_REDS_INPUT 200
#define ERTS_PORT_REDS_OUTPUT 200

每個讀寫佔用200個時間片,而每個程序初始分配2000個時間片,也就是說做10次輸出就要被排程了。

通過上面的分析我們知道被gen_tcp傳送程序被掛起的原因。

對策就是如果該程序不能阻塞,那麼就新增force標誌,強行往緩衝區加入資料,同時設定{send_timeout, Integer}。
如果該socket在指定的時間內無法把資料傳送完成,那麼就直接宣告socket傳送超時,避免了潛在的force加資料造成的緩衝區佔用大量記憶體而出現問題。

上面分析過,gen_tcp資料的傳送需要佔用宿主程序的reds,這也可能造成宿主程序被掛起,在設計的時候儘量避免一個程序擁有太多的port.

試驗過程稍後奉上!

祝大家玩得開心!

Post Footer automatically generated by wp-posturl plugin for wordpress.

相關推薦

系統技術業餘研究 » gen_tcp傳送程序起因分析對策

最近有同學在gmail上問關於gen_tcp傳送程序被掛起的問題,問題描述的非常好,見底下: 第一個問題是關於port_command和gen_tcp:send的。從專案上線至今,我在tcp傳送的地方遇到過兩次問題,都跟port_command有關係。 起初程式的效能不好,我從各方面嘗試分析和優化

gen_tcp傳送程序起因分析對策

最近有同學在gmail上問關於gen_tcp傳送程序被掛起的問題,問題描述的非常好,見底下: 第一個問題是關於port_command和gen_tcp:send的。從專案上線至今,我在tcp傳送的地方遇到過兩次問題,都跟port_command有關係。 起初程式的效能不好,我從各方面嘗試分析和優化,

系統技術業餘研究 » gen_tcp傳送緩衝區以及水位線問題分析

前段時間有同學在線上問了個問題: 伺服器端我是這樣設的:gen_tcp:listen(8000, [{active, false}, {recbuf,1}, {buffer,1}]). 客戶端是這樣設的:gen_tcp:connect(“localhost”, 8000, [{active, f

系統技術業餘研究 » gen_tcp接受連結時enfile的問題分析解決

最近我們為了安全方面的原因,在RDS伺服器上做了個代理程式把普通的MYSQL TCP連線變成了SSL連結,在測試的時候,皓庭同學發現Tsung發起了幾千個TCP連結後Erlang做的SSL PROXY老是報告gen_tcp:accept返回{error, enfile}錯誤。針對這個問題,我展開了

系統技術業餘研究 » gen_tcp呼叫程序收到{empty_out_q, Port}訊息奇怪行為分析

今天有同學在gmail裡面問了一個Erlang的問題,問題描述的非常好, 如下: 問題的背景是: 1、我開發了一個服務端程式,接收客戶端的連線。同一時刻會有多個客戶端來連線,連線後,接收客戶端請求後,再發送響應訊息,然後客戶端主動斷連。

系統技術業餘研究 » 量化Erlang程序排程的代價

我們都知道erlang的基本哲學之一就是“小訊息大計算”,簡單的說就是儘可能的在訊息裡面攜帶完整的計算需要的資訊,然後計算要儘可能的多,最好遠超過訊息傳遞的代價。但是為什麼要這樣呢?erlang訊息傳送的效率是很高的, 參見這篇文章 Roughly speaking, I’m seeing 3.4

系統技術業餘研究 » gen_tcp:send的深度解刨和使用指南(初稿)

在大家的印象中, gen_tcp:send是個很樸素的函式, 一呼叫資料就喀嚓喀嚓到了對端. 這是個很大的誤解, Erlang的otp文件寫的很不清楚. 而且這個功能對於大部分的網路程式是至關重要的, 它的使用對否極大了影響了應用的效能. 我聽到很多同學在抱怨erlang的效能低或者出了很奇怪的問

系統技術業餘研究 » gen_tcp容易誤用的一點解釋

前天有同學在玩erlang gen_tcp的時候碰到了點小麻煩,描述如下: 比如說連線到baidu.com,發個http請求,然後馬上接收資料,發現接收出錯,wireshark抓包發現數據都有往返傳送,比較鬱悶。 我把問題演示下: $ erl Erlang R14B03 (erts-5.8

系統技術業餘研究 » gen_tcp接收緩衝區易混淆概念糾正

Erlang的每個TCP網路連結是由相應的gen_tcp物件來表示的,說白了就是個port, 實現Erlang網路相關的邏輯,其實現程式碼位於erts/emulator/drivers/common/inet_drv.c 參照inet:setopts文件,它有三個buffer相關的選項,非常讓人

系統技術業餘研究 » gen_tcp如何限制封包大小

我們在做tcp伺服器的時候,通常會從安全考慮,限制封包的大小,預防被無端攻擊或者避免極端的請求對業務造成損害。 我們的tcp伺服器通常是erlang做的,那麼就涉及到gen_tcp如何限制封包的大小. gen_tcp對封包的獲取有2種方式: 1. {active, false} 封包透過gen_

系統技術業餘研究 » gen_tcp連線半關閉問題

很久之前我發在javaeye論壇上,預防丟了抄過來: 原文:http://erlang.group.iteye.com/group/wiki/1422-gen_tcp-half-closed 當tcp對端呼叫shutdown(RD/WR) 時候, 宿主程序預設將收到{tcp_closed, Soc

系統技術業餘研究 » Erlang 17.5引入+hpds命令列控制程序預設字典大小

Erlang 17.5釋出引入控制程序預設字典大小的命令列引數: Erlang/OTP 17.5 has been released Written by Henrik, 01 Apr 2015 Some highlights of the release are: ERTS: Added co

系統技術業餘研究 » 程序死亡原因調查:殺?

最近MySQL平臺化系統都是用熱升級來更新的,在線上的日誌發現類似的crashlog: 2013-07-24 23:54:06 =ERROR REPORT==== ** Generic server <0.31760.980> terminating ** Last message i

系統技術業餘研究 » Erlang網路多程序模型的實驗

在做網路程式的時候我們會經常用到多程序模式. 主程序執行bind呼叫得到控制代碼後, 同時fork N個子程序, 把控制代碼傳遞給子程序, 多程序同時accept來處理. 這個模型在erlang下很有現實意義的. 在之前的測試中,我們演示了erlang的單處理器模式的威力,最多的時候在單cpu上可

系統技術業餘研究 » Linux下誰在切換我們的程序

我們在做Linux伺服器的時候經常會需要知道誰在做程序切換,什麼原因需要做程序切換。 因為程序切換的代價很高,我給出一個LMbench測試出來的數字: Context switching – times in microseconds – smaller is better ———————————

系統技術業餘研究 » Erlang程序簡單的主動負載管制實現

我們知道Erlang的排程器是公平的,當程序的時間片用完了後,會強制切出,但是這個粒度是比較粗的。比如說程序進行了大量的Io操作,這個操作換成時間片是不準確的,會導致某些CPU計算密集型的比較吃虧,IO密集型的合算。 為了避免這個情況,IO密集型的經常可以互動要求短暫放棄執行,最簡單的方法就是用訊

系統技術業餘研究 » 未公開的gen_tcp:unrecv以及接收緩衝區行為分析

gen_tcp:unrecv是個未公開的函式,作用是往tcp的接收緩衝區裡面填入指定的資料。別看這小小的函式,用起來很舒服的。 我們先看下它的程式碼實現,Erlang程式碼部分: %%gen_tcp.erl:L299 unrecv(S, Data) when is_port(S) ->

系統技術業餘研究 » iotop統計linux下per程序的IO活動

Linux下的IO統計工具如iostat, nmon等大多數是隻能統計到per裝置的讀寫情況, 如果你想知道每個程序是如何使用IO的就比較麻煩. 當然如果你會systemtap, 或者blktrace這些事情難不到你, 但是沒專用工具總不是很舒服的. 幸運的是Linux 2.6.20核心以後提供了

系統技術業餘研究 » 網路棧記憶體不足引發程序問題

我們知道TCP socket有傳送緩衝區和接收緩衝區,這二個緩衝區都可以透過setsockopt設定SO_SNDBUF,SO_RCVBUF來修改,但是這些值設多大呢?這些值和協議棧的記憶體控制相關的值什麼關係呢? 我們來解釋下: $ sysctl net|grep mem net.core.wme

系統技術業餘研究 » Linux下如何知道檔案那個程序

晚上朔海同學問: 一個檔案正在被程序寫 我想檢視這個程序 檔案一直在增大 找不到誰在寫 使用lsof也沒找到 這個問題挺有普遍性的,解決方法應該很多,這裡我給大家提個比較直觀的方法。 linux下每個檔案都會在某個塊裝置上存放,當然也都有相應的inode, 那麼透過vfs.write我們就可以知道