1. 程式人生 > >系統技術非業餘研究 » gen_tcp傳送緩衝區以及水位線問題分析

系統技術非業餘研究 » gen_tcp傳送緩衝區以及水位線問題分析

前段時間有同學在線上問了個問題:

伺服器端我是這樣設的:gen_tcp:listen(8000, [{active, false}, {recbuf,1}, {buffer,1}]).
客戶端是這樣設的:gen_tcp:connect(“localhost”, 8000, [{active, false}, {high_watermark,2}, {low_watermark,1}, {sndbuf,1}, {buffer,1}]).
我客戶端每次gen_tcp:send()傳送一個位元組,前6個位元組返回ok,第7個位元組阻塞
服務端每次gen_tcp:recv(_,0)接收一個位元組,接收三個位元組後,客戶端的第7次傳送返回。
按我的理解的話:應該是 伺服器端可以接收2個位元組+sndbuf裡的一個位元組,第4個位元組客戶端就該阻塞的,可事實不時這樣,求分析

這個問題確實還是比較複雜,涉及到gen_tcp的傳送緩衝區和接收緩衝區,水位線等問題,其中接收緩衝區的問題在這篇 以及這篇 博文裡面講的比較清楚了,今天我們重點來分析下發送緩衝區和水位線的問題。

在開始分析前,我們需要熟悉幾個gen_tcp的選項, 更多參見 這裡

{delay_send, Boolean}
Normally, when an Erlang process sends to a socket, the driver will try to immediately send the data. If that fails, the driver will use any means available to queue up the message to be sent whenever the operating system says it can handle it. Setting {delay_send, true} will make all messages queue up. This makes the messages actually sent onto the network be larger but fewer. The option actually affects the scheduling of send requests versus Erlang processes instead of changing any real property of the socket. Needless to say it is an implementation specific option. Default is false.

{high_msgq_watermark, Size} (TCP/IP sockets)
The socket message queue will be set into a busy state when the amount of data queued on the message queue reaches this limit. Note that this limit only concerns data that have not yet reached the ERTS internal socket implementation. Default value used is 8 kB.

Senders of data to the socket will be suspended if either the socket message queue is busy, or the socket itself is busy.

For more information see the low_msgq_watermark, high_watermark, and low_watermark options.

Note that distribution sockets will disable the use of high_msgq_watermark and low_msgq_watermark, and will instead use the distribution buffer busy limit which is a similar feature.

{high_watermark, Size} (TCP/IP sockets)
The socket will be set into a busy state when the amount of data queued internally by the ERTS socket implementation reaches this limit. Default value used is 8 kB.

Senders of data to the socket will be suspended if either the socket message queue is busy, or the socket itself is busy.

For more information see the low_watermark, high_msgq_watermark, and low_msqg_watermark options.
{low_msgq_watermark, Size} (TCP/IP sockets)
If the socket message queue is in a busy state, the socket message queue will be set in a not busy state when the amount of data queued in the message queue falls below this limit. Note that this limit only concerns data that have not yet reached the ERTS internal socket implementation. Default value used is 4 kB.

Senders that have been suspended due to either a busy message queue or a busy socket, will be resumed when neither the socket message queue, nor the socket are busy.

For more information see the high_msgq_watermark, high_watermark, and low_watermark options.

Note that distribution sockets will disable the use of high_msgq_watermark and low_msgq_watermark, and will instead use the distribution buffer busy limit which is a similar feature.

{low_watermark, Size} (TCP/IP sockets)
If the socket is in a busy state, the socket will be set in a not busy state when the amount of data queued internally by the ERTS socket implementation falls below this limit. Default value used is 4 kB.

Senders that have been suspended due to either a busy message queue or a busy socket, will be resumed when neither the socket message queue, nor the socket are busy.

For more information see the high_watermark, high_msgq_watermark, and low_msgq_watermark options.

這選項裡面兩對高低水位線的設定,以及delay_send選項,對傳送緩衝區的影響很大。
gen_tcp:send的行為在之前的 博文 中分析的比較到位了,建議同學先看看這篇文章墊底下。

我們知道每個erlang的程序都有個訊息佇列,其他程序要和他通訊就需要透過發訊息給他,把通訊的內容在訊息裡面交代清楚。程序訊息佇列裡面一旦有訊息,erlang的VM就會馬上準備排程該程序來讓程序執行,處理訊息。這個程序的訊息佇列機制每個erlang入門的書籍都寫的非常清楚。 那麼port呢?在Erlang的早期,Port是和程序一樣的地位,介面,使用方式。Port作為Erlang對外的IO的執行單位,也擁有自己的訊息佇列,當程序把訊息傳送給port的時候,port通常也是把訊息儲存在訊息佇列中,然後VM就會排程這個port。等到port被排程執行的時候,port把佇列裡面的訊息消耗掉,傳送到網路或者執行相應IO的操作。port的排程和erlang的程序的排程是一樣的,都非常講究公平排程。

我們來考證下port和程序訊息傳送的介面。 我們知道!符號是erlang:send的語法糖,當我們給Port!msg 或者Pid!msg,最終都是呼叫erlang:send來發送訊息。後面不知道為什麼,erlang的設計者專門為port設計了port_command系列函式專門為port傳送訊息。

我們來考證下:
erlang:send->BIF_RETTYPE send_3(BIF_ALIST_3)->do_send 原始碼在bif.c中我們來看看:

Sint
do_send(Process *p, Eterm to, Eterm msg, int suspend, Eterm *refp) {
...
    if (is_internal_pid(to)) {
	...

    } else if (is_external_pid(to)) {
    ...
    return remote_send(p, dep, to, to, msg, suspend);
    } else if (is_atom(to)) {
    ...
    } else if (is_external_port(to)
               && (external_port_dist_entry(to)
                   == erts_this_dist_entry)) {
        erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
        erts_dsprintf(dsbufp,
                      "Discarding message %T from %T to %T in an old "
                      "incarnation (%d) of this node (%d)\n",
                      msg,
                      p->common.id,
                      to,
                      external_port_creation(to),
                      erts_this_node->creation);
        erts_send_error_to_logger(p->group_leader, dsbufp);
        return 0;
    } else if (is_internal_port(to)) {
    ...
        pt = erts_port_lookup(portid, ERTS_PORT_SFLGS_INVALID_LOOKUP);
        ...
            switch (erts_port_command(p, ps_flags, pt, msg, refp)) {
            case ERTS_PORT_OP_CALLER_EXIT:
...
}

諸位看到了吧! 1. erlang:send接受二種物件: port和process 2. 傳送到port的訊息走的和erts_port_command是一樣的路。

喝口水,儲存體力,重新溫習下二點: 1. port有訊息佇列。 2. port也是公平排程。

有了上面的知識鋪墊,我們其實就比較好明白上面選項中的水位線做什麼的。和每個訊息佇列一樣,為了防止傳送者和接收者能力的失衡,通常都會設定高低水位線來保護佇列不至於太大把
系統撐爆。 上面的{high_watermark, Size},{low_watermark, Size} 就是幹這個用的。

那port是如何保護自己的呢?答案是:
當訊息量達到高水位線的時候,port進入busy狀態,這時候會把傳送程序suspend起來,等訊息達到低水位線的時候,解除busy狀態,同時讓傳送程序繼續執行。

證明上面的說法,參考下port_command 文件

port_command(Port, Data, OptionList) -> boolean()

Types:

Port = port() | atom()
Data = iodata()
Option = force | nosuspend
OptionList = [Option]
Sends data to a port. port_command(Port, Data, []) equals port_command(Port, Data).

If the port command is aborted false is returned; otherwise, true is returned.

If the port is busy, the calling process will be suspended until the port is not busy anymore.

Currently the following Options are valid:

force
The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of

the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag.
nosuspend
The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned.

那如何知道一個port進入busy狀態,因為這個狀態通常很嚴重,傳送程序被掛起,會引起很大的latency.

幸虧erlang考慮周到,參看這裡

erlang:system_monitor(MonitorPid, Options) -> MonSettings

busy_port
If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that

got suspended when sending to Port.

系統會很友好的把發生busy_port的程序發出來,我們就可以知道那個程序程序碰到高水位線被掛起了,方面我們後面調整水位線避免這種情況發生。

當用戶呼叫gen_tcp:send要傳送資料的時候最終都會呼叫port_command來具體執行, 那麼我們來看下它是如何運作的:

/* Command should be of the form                                                                                          
**   {PID, close}                                                                                                         
**   {PID, {command, io-list}}                                                                                            
**   {PID, {connect, New_PID}}                                                                                            
*/
ErtsPortOpResult
erts_port_command(Process *c_p,
                  int flags,
                  Port *port,
                  Eterm command,
                  Eterm *refp)
{
...
   if (is_tuple_arity(command, 2)) {
        Eterm cntd;
        tp = tuple_val(command);
        cntd = tp[1];
        if (is_internal_pid(cntd)) {
            if (tp[2] == am_close) {
                if (!erts_port_synchronous_ops)
                    refp = NULL;
                flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND;
                return erts_port_exit(c_p, flags, port, cntd, am_normal, refp);
            } else if (is_tuple_arity(tp[2], 2)) {
                tp = tuple_val(tp[2]);
                if (tp[1] == am_command) {
                    if (!(flags & ERTS_PORT_SIG_FLG_NOSUSPEND)
                        && !erts_port_synchronous_ops)
     	            refp = NULL;
                    return erts_port_output(c_p, flags, port, cntd, tp[2], refp);
                }
                else if (tp[1] == am_connect) {
                    if (!erts_port_synchronous_ops)
                        refp = NULL;
                    flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND;
                    return erts_port_connect(c_p, flags, port, cntd, tp[2], refp);
                }
            }
        }
    }
}
...
}

ErtsPortOpResult
erts_port_output(Process *c_p,
                 int flags,
                 Port *prt,
                 Eterm from,
                 Eterm list,
                 Eterm *refp)
{
...
    try_call = (force_immediate_call /* crash dumping */
                || !(sched_flags & (invalid_flags
                                    | ERTS_PTS_FLGS_FORCE_SCHEDULE_OP)));

    if (drv->outputv) {
          try_call_state.pre_chk_sched_flags = 0; /* already checked */
            if (force_immediate_call)
                try_call_res = force_imm_drv_call(&try_call_state);
            else
                try_call_res = try_imm_drv_call(&try_call_state);
            switch (try_call_res) {
            case ERTS_TRY_IMM_DRV_CALL_OK:
                call_driver_outputv(flags & ERTS_PORT_SIG_FLG_BANG_OP,
                                    c_p ? c_p->common.id : ERTS_INVALID_PID,
                                    from,
                                    prt,
                                    drv,
                                    evp);
                if (force_immediate_call)
                    finalize_force_imm_drv_call(&try_call_state);
                else
                    finalize_imm_drv_call(&try_call_state);
                /* Fall through... */
            ...
            case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS:
                sched_flags = try_call_state.sched_flags;
            case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK:
                /* Schedule outputv() call instead... */
                break;
...
}

static ERTS_INLINE void
call_driver_outputv(int bang_op,
                    Eterm caller,
                    Eterm from,
                    Port *prt,
                    erts_driver_t *drv,
                    ErlIOVec *evp)
{
    /*                                                                                                                    
     * if (bang_op)                                                                                                       
     *   we are part of a "Prt ! {From, {command, Data}}" operation                                                       
     * else                                                                                                               
     *   we are part of a call to port_command/[2,3]                                                                      
     * behave accordingly...                                                                                              
     */
    if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt))
        send_badsig(prt);
    else {
...
        prt->caller = caller;
        (*drv->outputv)((ErlDrvData) prt->drv_data, evp);
        prt->caller = NIL;

        prt->bytes_out += size;
        erts_smp_atomic_add_nob(&erts_bytes_out, size);
    }
...
}

從原始碼分析來看,我們看到port_command如果看到port要執行command命令就會呼叫erts_port_output, 而後者會做複雜的判斷,來決定如何呼叫call_driver_outputv。
這個複雜的流程就是msgq_watermark水位線發揮作用地方,我們暫時不分析,等後面講msgq_watermark的時候一起。

目前只需要知道最終gen_tcp:send發鬆資料會呼叫port driver的outputv回撥函式輸出就好了。
接著原始碼分析:

static struct erl_drv_entry tcp_inet_driver_entry =
{
    tcp_inet_init,  /* inet_init will add this driver !! */
    tcp_inet_start,
    tcp_inet_stop,
    tcp_inet_command,
    tcp_inet_drv_input,
    tcp_inet_drv_output,
    "tcp_inet",
    NULL,
    NULL,
    tcp_inet_ctl,
    tcp_inet_timeout,
    tcp_inet_commandv,
...
}
static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev)
{
    tcp_descriptor* desc = (tcp_descriptor*)e;
    desc->inet.caller = driver_caller(desc->inet.port);

    DEBUGF(("tcp_inet_commanv(%ld) {s=%d\r\n",
            (long)desc->inet.port, desc->inet.s));
    if (!IS_CONNECTED(INETP(desc))) {
        if (desc->tcp_add_flags & TCP_ADDF_DELAYED_CLOSE_SEND) {
            desc->tcp_add_flags &= ~TCP_ADDF_DELAYED_CLOSE_SEND;
            inet_reply_error_am(INETP(desc), am_closed);
        }
        else
            inet_reply_error(INETP(desc), ENOTCONN);
    }
    else if (tcp_sendv(desc, ev) == 0)
        inet_reply_ok(INETP(desc));
    DEBUGF(("tcp_inet_commandv(%ld) }\r\n", (long)desc->inet.port));
}

對於inet_drv(gen_tcp)的例子來講就是會呼叫tcp_sendv來把訊息轉變成網路封包傳送出去。

好吧,喝口水,休息下。 這裡我們梳理下我們的資料路線:
gen_tcp:send->port_command->erts_port_output->call_driver_outputv->tcp_inet_commandv->tcp_sendv
大家要牢記在心。

繼續接著我們參照原始碼來分析下水位線的實現:

/* inet_drv.c */
#define INET_LOPT_TCP_HIWTRMRK     27  /* set local high watermark */
#define INET_LOPT_TCP_LOWTRMRK     28  /* set local low watermark */


#define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy  */
#define INET_LOW_WATERMARK  (1024*4) /* 4k pending => allow more */

typedef struct {
...
    int   high;                 /* high watermark */
    int   low;                  /* low watermark */
...
} tcp_descriptor;

static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
{
...
    desc->high = INET_HIGH_WATERMARK;
    desc->low  = INET_LOW_WATERMARK;
...
}

static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_LOPT_TCP_HIWTRMRK:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
                if (ival < 0) ival = 0;
                if (tdesc->low > ival)
                    tdesc->low = ival;
                tdesc->high = ival;
            }
            continue;

        case INET_LOPT_TCP_LOWTRMRK:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
                if (ival < 0) ival = 0;
                if (tdesc->high < ival)
                    tdesc->high = ival;
                tdesc->low = ival;
            }
            continue;
...
}

/* Copy a descriptor, by creating a new port with same settings                                                           
 * as the descriptor desc.                                                                                                
 * return NULL on error (SYSTEM_LIMIT no ports avail)                                                                     
 */
static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
                                     ErlDrvTermData owner, int* err)
{
...
    copy_desc->high          = desc->high;
    copy_desc->low           = desc->low;
...
}

static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev)
{
...
if ((sz = driver_sizeq(ix)) > 0) {
        driver_enqv(ix, ev, 0);
        if (sz+ev->size >= desc->high) {
            DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\r\n",
                    (long)desc->inet.port, desc->inet.s));
            desc->inet.state |= INET_F_BUSY;  /* mark for low-watermark */
            desc->inet.busy_caller = desc->inet.caller;
            set_busy_port(desc->inet.port, 1);
            if (desc->send_timeout != INET_INFINITY) {
                desc->busy_on_send = 1;
                driver_set_timer(desc->inet.port, desc->send_timeout);
            }
            return 1;
        }
...
}

/* socket ready for ouput:                                                                                                
** 1. INET_STATE_CONNECTING => non block connect ?                                                                        
** 2. INET_STATE_CONNECTED  => write output                                                                               
*/
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
{
...
  	if (driver_deq(ix, n) <= desc->low) {
                if (IS_BUSY(INETP(desc))) {
                    desc->inet.caller = desc->inet.busy_caller;
                    desc->inet.state &= ~INET_F_BUSY;
                    set_busy_port(desc->inet.port, 0);
                    /* if we have a timer then cancel and send ok to client */
                    if (desc->busy_on_send) {
                        driver_cancel_timer(desc->inet.port);
                        desc->busy_on_send = 0;
                    }
                    inet_reply_ok(INETP(desc));
                }
            }
...
}

從原始碼我們可以分析出幾點:
1. 水位線設定是可以繼承的。
2. 高低水位線預設是8K/4K.
3. 進入高水位後,port進入busy狀態。
4. 當訊息消耗到小於低水位線,busy解除。

這個水位線的說明和文件解釋的一樣,接下來我們稍微看看delay_send的實現原理,還是繼續上原始碼:

/* TCP additional flags */
#define TCP_ADDF_DELAY_SEND    1

static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
case INET_LOPT_TCP_DELAY_SEND:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
                if (ival)
                    tdesc->tcp_add_flags |= TCP_ADDF_DELAY_SEND;
                else
                    tdesc->tcp_add_flags &= ~TCP_ADDF_DELAY_SEND;
            }
            continue;
...
}

/*                                                                                                                        
** Send non-blocking vector data                                                                                          
*/
static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev)
{
...
        if (INETP(desc)->is_ignored) {
            INETP(desc)->is_ignored |= INET_IGNORE_WRITE;
            n = 0;
        } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) {
            n = 0;
        } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, ev->iov,
                                              vsize, &n, 0))) {
            if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
                int err = sock_errno();
                DEBUGF(("tcp_sendv(%ld): s=%d, "
                        "sock_sendv(size=2) errno = %d\r\n",
                        (long)desc->inet.port, desc->inet.s, err));
                return tcp_send_error(desc, err);
            }
     	    n = 0;
        }
       else {
            DEBUGF(("tcp_sendv(%ld): s=%d, only sent "
                    LLU"/%d of "LLU"/%d bytes/items\r\n",
                    (long)desc->inet.port, desc->inet.s,
                    (llu_t)n, vsize, (llu_t)ev->size, ev->vsize));
        }

        DEBUGF(("tcp_sendv(%ld): s=%d, Send failed, queuing\r\n",
                (long)desc->inet.port, desc->inet.s));
        driver_enqv(ix, ev, n);
        if (!INETP(desc)->is_ignored)
            sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1);
...
}
static void tcp_inet_drv_output(ErlDrvData data, ErlDrvEvent event)
{
    (void)tcp_inet_output((tcp_descriptor*)data, (HANDLE)event);
}
/* socket ready for ouput:                                                                                                
** 1. INET_STATE_CONNECTING => non block connect ?                                                                        
** 2. INET_STATE_CONNECTED  => write output                                                                               
*/
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
{
...
 else if (IS_CONNECTED(INETP(desc))) {
        for (;;) {
            int vsize;
            ssize_t n;
            SysIOVec* iov;

            if ((iov = driver_peekq(ix, &vsize)) == NULL) {
                sock_select(INETP(desc), FD_WRITE, 0);
                send_empty_out_q_msgs(INETP(desc));
                goto done;
            }
            vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize;
            DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\r\n",
                    (long)desc->inet.port, desc->inet.s, vsize));
            if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, iov, vsize, &n, 0))) {
                if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
                    DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\r\n",
                            (long)desc->inet.port, vsize, sock_errno()));
                    ret =  tcp_send_error(desc, sock_errno());
                    goto done;
                }
            goto done;
            }
            if (driver_deq(ix, n) <= desc->low) {
                if (IS_BUSY(INETP(desc))) {
                    desc->inet.caller = desc->inet.busy_caller;
                    desc->inet.state &= ~INET_F_BUSY;
                    set_busy_port(desc->inet.port, 0);
                    /* if we have a timer then cancel and send ok to client */
                    if (desc->busy_on_send) {
                        driver_cancel_timer(desc->inet.port);
                        desc->busy_on_send = 0;
                    }
             inet_reply_ok(INETP(desc));
                }
            }
...
}

從原始碼分析我們可以知道當tcp_sendv傳送資料前看下:
1. delay_send標誌是否設定,如果設定就不嘗試呼叫sock_sendv傳送。
2. 呼叫sock_sendv傳送網路資料,剩下的部分資料儲存到驅動的佇列去。
3. 如果佇列有資料的話,就把把epoll的寫事件掛載上。
4. 後續epoll會通知socket可寫的時候,會呼叫tcp_inet_drv_output
5. tcp_inet_drv_output->tcp_inet_output 繼續把之前在佇列裡面的資料透過sock_sendv再次傳送到網路

步驟3和4之間需要時間,依賴於epoll的寫事件發生的以及port排程的時間點。

所以簡單的說: delay_send就是在第一階段不嘗試傳送資料,直接把資料推入port的訊息佇列去,等後面epoll說socket可寫的時候一起傳送出去。
這種做法的好處是gen_tcp:send馬上就可以返回,因為sock_send通常要耗費幾十us的時間,可用在對傳送的latency很敏感的場合。

到這裡為止,我們清楚的分析了資料是如何在port的各個鏈條裡面流動.

再回顧下:當gen_tcp:send資料無法離開通過網路傳送出去的時候,會暫時保留在port的訊息佇列裡面,當訊息佇列滿(到高水位線)的時候,port就會busy,抑制傳送者推送更多的資料。當epoll探測到socket可寫的時候,vm會呼叫tcp_inet_output把訊息佇列裡面的資料,拉到網路去,這個過程中,佇列裡面的資料會越來越少,少到低水位線的時候,解除busy, 好讓傳送者傳送更多的資料。

再喝口水,我們接著分析msgq_watermark. 這又是很大的一個坑,大家要堅持住,好像又是一頓分析!
首先,還是普及下知識:
從R16B的釋出note裡面我們摘抄和port相關的重大變化就是:

— Latency of signals sent from processes to ports — Signals
from processes to ports where previously always delivered
immediately. This kept latency for such communication to a
minimum, but it could cause lock contention which was very
expensive for the system as a whole. In order to keep this
latency low also in the future, most signals from processes
to ports are by default still delivered immediately as long
as no conflicts occur. Such conflicts include not being able
to acquire the port lock, but also include other conflicts.
When a conflict occur, the signal will be scheduled for
delivery at a later time. A scheduled signal delivery may
cause a higher latency for this specific communication, but
improves the overall performance of the system since it
reduce lock contention between schedulers. The default
behavior of only scheduling delivery of these signals on
conflict can be changed by passing the +spp command line flag
to erl(1). The behavior can also be changed on port basis
using the parallelism option of the open_port/2 BIF.

簡單的說,過去程序給port傳送資料的時候,都是立即鎖定port, 呼叫call_driver_outputv來消耗資料,幹完活解鎖。這樣對於單個請求來講,latency最低。 但是如果系統有多個程序給同一個port傳送資料,鎖的碰撞率就會很高,勢必影響到port的吞吐量。所以新的VM引入了port的parallelism這個概念,也就是說當鎖衝突的時候,不是在那裡傻傻的等,而是把要傳送的資料引用計數後,通知port排程器在合適的時間,也就是說在
port不忙的時候,擇機call_driver_outputv來消耗資料,這樣就會大大提高吞吐量。

考證下程式碼:

int erts_port_parallelism = 0;

void
erl_start(int argc, char **argv)
{
...
else if (has_prefix("pp", sub_param)) {
                arg = get_arg(sub_param+2, argv[i+1], &i);
                if (sys_strcmp(arg, "true") == 0)
                    erts_port_parallelism = 1;
                else if (sys_strcmp(arg, "false") == 0)
                    erts_port_parallelism = 0;

...
}

static Port *
open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump)
{
... 
opts.parallelism = erts_port_parallelism;
...
  } else if (option == am_parallelism) {
                    if (*tp == am_true)
                        opts.parallelism = 1;
                    else if (*tp == am_false)
                        opts.parallelism = 0;
                    else
...
}

分析了原始碼可以知道:
1. port並行傳送的行為為了和過去的版本相容預設是關閉的,但是可以用+spp全域性開啟
2. 在open_port的時候通過引數{parallelism, true}來個別開啟這個選項。

{parallelism, Boolean}
Set scheduler hint for port parallelism. If set to true, the VM will schedule port tasks when it by this can improve the parallelism in the system. If set to false, the VM will try to perform port tasks immediately and by this improving the latency at the expense of parallelism. The default can be set on system startup by passing the +spp command line argument to erl(1).

那開了這個選項後,對gen_tcp(port)有什麼影響呢? 最明顯的區別是過去call_driver_outputv是排隊執行的,誰先拿到鎖,誰先執行。那麼在driver_outputv裡面如果訊息消耗不了,有可能會把資料加到port的訊息佇列去。 這個我們前面分析過,每個訊息佇列有高低水位線來控制,總能保證訊息在一定的量。但是parallelism了後,當port在忙著做call_driver_outputv的時候,其他程序就不等了,直接把訊息加引用計數儲存到一個地方去,然後請求port排程器稍後排程執行這個訊息,它就立即返回了。

各位看官看出來問題了嗎? 如果不做控制的話,每個程序都會積累很多訊息,都等著port排程器後續執行。所以port排程器就有義務來為這部分訊息做水位線的控制,這就很自然的引入了msgq_watermark選項。

是不是有點複雜? 更復雜的還有呢? 引入msgq_watermark項後,那這些傳送程序就可能被掛起,那如何喚醒它呢? 我們先回答這個問題, 上程式碼:

%%erlang.erl
port_command(Port, Data) ->
    case case erts_internal:port_command(Port, Data, []) of
             Ref when erlang:is_reference(Ref) -> receive {Ref, Res} -> Res end;
             Res -> Res
         end of
        true -> true;
        Error -> erlang:error(Error, [Port, Data])
    end.
/*                                                                                                                        
 * erts_internal:port_command/3 is used by the                                                                            
 * erlang:port_command/2 and erlang:port_command/3                                                                        
 * BIFs.                                                                                                                  
 */

BIF_RETTYPE erts_internal_port_command_3(BIF_ALIST_3)
{
...
 prt = lookup_port(BIF_P, BIF_ARG_1);
    if (!prt)
        BIF_RET(am_badarg); 
...

 switch (erts_port_output(BIF_P, flags, prt, prt->common.id, BIF_ARG_2, &ref)) {
    case ERTS_PORT_OP_CALLER_EXIT:
    case ERTS_PORT_OP_BADARG:
    case ERTS_PORT_OP_DROPPED:            
        ERTS_BIF_PREP_RET(res, am_badarg);
        break;
    case ERTS_PORT_OP_BUSY:
        ASSERT(!(flags & ERTS_PORT_SIG_FLG_FORCE));
        if (flags & ERTS_PORT_SIG_FLG_NOSUSPEND)
            ERTS_BIF_PREP_RET(res, am_false);
        else {
            erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, prt);
            ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_erts_internal_port_command_3],
                                 BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
        }
        break;
    case ERTS_PORT_OP_BUSY_SCHEDULED:
        ASSERT(!(flags & ERTS_PORT_SIG_FLG_FORCE));
        /* Fall through... */
    case ERTS_PORT_OP_SCHEDULED:
        ASSERT(is_internal_ref(ref));
        ERTS_BIF_PREP_RET(res, ref);
        break;
}

呼叫流程是:erlang:port_command->erts_internal:port_command->erts_internal_port_command_3
從erts_internal_port_command_3程式碼可以看出來,當我們呼叫傳送資料的時候,遇到ERTS_PORT_OP_BUSY時候,系統會被掛起,等被喚醒的時候,發生trap,erlang vm會在合適的時間再重新呼叫erts_internal_port_command_3完成之前未完成的事情。

溫習下:從port_command這層來講,parallelism執行的時候,如果底層的port出現busy, 呼叫程序會經歷掛起,喚醒,重新呼叫這套流程。
現在的問題是什麼時候會出現ERTS_PORT_OP_BUSY?

從前面的分析,我們知道port_command->erts_port_output, 那麼我們來看下什麼情況下會返回ERTS_PORT_OP_BUSY:

ErtsPortOpResult
erts_port_output(Process *c_p,
                 int flags,
                 Port *prt,
                 Eterm from,
                 Eterm list,
                 Eterm *refp)
{
...
    /*                                                                                                                    
     * Assumes caller have checked that port is valid...                                                                  
     */

    sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
    if (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))
        return ((sched_flags & ERTS_PTS_FLG_EXIT)
                ? ERTS_PORT_OP_DROPPED
                : ERTS_PORT_OP_BUSY);
...
}
void
set_busy_port(ErlDrvPort dprt, int on)
{
...
    if (on) {
        flags = erts_smp_atomic32_read_bor_acqb(&prt->sched.flags,
                                                ERTS_PTS_FLG_BUSY_PORT);
        if (flags & ERTS_PTS_FLG_BUSY_PORT)
            return; /* Already busy */
...
}

程式碼很清楚的說明了,一旦到達高水位線呼叫set_busy_port就會導致後續的erts_port_output呼叫直接返回ERTS_PORT_OP_BUSY, 簡單幹脆。

再喝口水,接著分析, 現在到了程式碼驗證的階段了。
重新溫習下前面我們分析過的:
在parallelism模式下,當port在忙著做call_driver_outputv的時候,其他程序就不等了,直接把訊息加引用計數儲存到一個地方去,然後請求port排程器稍後排程執行這個訊息,它就立即返回了。

ErtsPortOpResult
erts_port_output(Process *c_p,
                 int flags,
                 Port *prt,
                 Eterm from,
                 Eterm list,
                 Eterm *refp)
{
...
if (drv->outputv) {
...


}
...
        if (!try_call) {
            int i;
            /* Need to increase refc on all binaries */
            for (i = 1; i < evp->vsize; i++)
                if (bvp[i])
                    driver_binary_inc_refc(bvp[i]);
        }
        else {
            int i;
            ErlIOVec *new_evp;
            ErtsTryImmDrvCallResult try_call_res;
            ErtsTryImmDrvCallState try_call_state
                = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
                    c_p,
                    prt,
                    ERTS_PORT_SFLGS_INVALID_LOOKUP,
                    invalid_flags,
                    !refp,
                    am_command);


            try_call_state.pre_chk_sched_flags = 0; /* already checked */
            if (force_immediate_call)
                try_call_res = force_imm_drv_call(&try_call_state);
            else
                try_call_res = try_imm_drv_call(&try_call_state);
            switch (try_call_res) {
            case ERTS_TRY_IMM_DRV_CALL_OK:
                call_driver_outputv(flags & ERTS_PORT_SIG_FLG_BANG_OP,
                                    c_p ? c_p->common.id : ERTS_INVALID_PID,
                                    from,
                                    prt,
                                    drv,
                                    evp);
                if (force_immediate_call)
                    finalize_force_imm_drv_call(&try_call_state);
                else
                    finalize_imm_drv_call(&try_call_state);
     /* Fall through... */
            case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
.....
            case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS:
                sched_flags = try_call_state.sched_flags;
            case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK:
                /* Schedule outputv() call instead... */
                break;
            }

        sigdp = erts_port_task_alloc_p2p_sig_data();
        sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
        sigdp->u.outputv.from = from;
        sigdp->u.outputv.evp = evp;
        sigdp->u.outputv.cbinp = cbin;
        port_sig_callback = port_sig_outputv;
    }
    res = erts_schedule_proc2port_signal(c_p,
                                         prt,
                                         c_p ? c_p->common.id : ERTS_INVALID_PID,
                                         refp,
                                         sigdp,
                                         task_flags,
                                         port_sig_callback);

    if (res != ERTS_PORT_OP_SCHEDULED) {
        if (drv->outputv)
            cleanup_scheduled_outputv(evp, cbin);
        else
            cleanup_scheduled_output(buf);
        return res;
    }
}
static int
port_sig_outputv(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
{
    Eterm reply;

    switch (op) {
       case ERTS_PROC2PORT_SIG_EXEC:
        /* Execution of a scheduled outputv() call */

        ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));

        if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
            reply = am_badarg;
        else {
            call_driver_outputv(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP,
                                sigdp->caller,
                                sigdp->u.outputv.from,
                                prt,
                                prt->drv_ptr,
                                sigdp->u.outputv.evp);
            reply = am_true;
        }
        break;
...
}

ErtsPortOpResult
erts_schedule_proc2port_signal(Process *c_p,
                               Port *prt,
                               Eterm caller,
                               Eterm *refp,
                               ErtsProc2PortSigData *sigdp,
                               int task_flags,
                               ErtsProc2PortSigCallback callback)
{
...
 /* Schedule port close call for later execution... */
    sched_res = erts_port_task_schedule(prt->common.id,
                                        NULL,
                                        ERTS_PORT_TASK_PROC_SIG,
                                        sigdp,
                                        callback,
                                        task_flags);

...
}
/*                                                                                                                        
 * Schedule a task.                                                                                                       
 */
int
erts_port_task_schedule(Eterm id,
                        ErtsPortTaskHandle *pthp,
                        ErtsPortTaskType type,
                        ...)
{
...
    if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
        reset_handle(ptp);
        if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
            goto abort_nosuspend;
        else
            goto fail;
    }
...
}

static ERTS_INLINE int
enqueue_task(Port *pp,
             ErtsPortTask *ptp,
             ErtsProc2PortSigData *sigdp,
             ErtsPortTaskHandleList *ns_pthlp,
             erts_aint32_t *flagsp)

{
...
        pp->sched.taskq.in.last = ptp;
        flags = enqueue_proc2port_data(pp, sigdp, flags);
        res = 1;
...
}

從上面的程式碼我們知道,當port不忙的時候,就會直接呼叫call_driver_outputv幹活,否則就會把訊息延遲放到port排程去。這個環節的呼叫流程是
erts_port_output->erts_schedule_proc2port_signal->erts_port_task_schedule->enqueue_task->enqueue_proc2port_data.

enqueue_proc2port_data這個函式顧名思義就是把程序傳送到port的資料儲存到port_task的某個佇列去。那麼在這個地方做水位線檢查是最合適的。
好吧,我們的msg_watermark相關的東西要出場了。

/* erl_driver.h */
#define ERL_DRV_BUSY_MSGQ_LIM_MAX       (ERL_DRV_BUSY_MSGQ_DISABLED - 1)
#define ERL_DRV_BUSY_MSGQ_LIM_MIN       ((ErlDrvSizeT) 1)

/* inet_drv.c */
#define INET_HIGH_MSGQ_WATERMARK (1024*8) /* 8k pending high => busy  */
#define INET_LOW_MSGQ_WATERMARK  (1024*4) /* 4k pending => allow more */

static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
{
...
    q_high = INET_HIGH_MSGQ_WATERMARK;
    q_low = INET_LOW_MSGQ_WATERMARK;
...
    if (q_high < ERL_DRV_BUSY_MSGQ_LIM_MIN)
        q_high = ERL_DRV_BUSY_MSGQ_LIM_MIN;
    else if (q_high > ERL_DRV_BUSY_MSGQ_LIM_MAX)
        q_high = ERL_DRV_BUSY_MSGQ_LIM_MAX;
    erl_drv_busy_msgq_limits(port, &q_low, &q_high);
...
}

/* erl_port_task.c */
/*                                                                                                                        
 * erl_drv_busy_msgq_limits() is called by drivers either reading or                                                      
 * writing the limits.                                                                                                    
 *                                                                                                                        
 * A limit of zero is interpreted as a read only request (using a                                                         
 * limit of zero would not be useful). Other values are interpreted                                                       
 * as a write-read request.                                                                                               
 */
void
erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
{
...
    Port *pp = erts_drvport2port(dport, NULL);
    ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
...
 if (!low)
            low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
        else {
            if (bpq->high < low)
                bpq->high = low;
            erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
            written = 1;
        }

        if (!high)
            high = bpq->high;
        else {
            if (low > high) {
                low = high;
                erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
            }
            bpq->high = high;
            written = 1;
        }

...
}

從原始碼分析來看:
1. MSGQ高低水位線也是8/4K,最小值是1, 高不封頂。
2. 它影響的是每個port排程器認為佇列的bpq->low和bpq->high

有了這個知識,我們就很容易分析enqueue_proc2port_data是如何在資料入佇列時限定高低水位線的, 同時一定會有個配套的dequeued_proc2port_data在出資料佇列的時候解除busy狀態的。

繼續程式碼之旅:

static ERTS_INLINE erts_aint32_t
enqueue_proc2port_data(Port *pp,
                       ErtsProc2PortSigData *sigdp,
                       erts_aint32_t flags)
{
    ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
...
            if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
                flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags,
                                                        ERTS_PTS_FLG_BUSY_PORT_Q);
                flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
                qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size);
                if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) {
                    flags = (erts_smp_atomic32_read_bor_relb(
                                 &pp->sched.flags,
                                 ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
                    flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
                }
            }
...
}

static ERTS_INLINE void
dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
{
    ErtsPortTaskBusyPortQ *bpq;
    erts_aint32_t flags;
    ErlDrvSizeT qsz;

    ASSERT(pp->sched.taskq.bpq);

    if (size == 0)
        return;

    bpq = pp->sched.taskq.bpq;

    qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
                                                      (erts_aint_t) -size);
    ASSERT(qsz + size > qsz);
    flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
    if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
        return;
    if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low))
        check_unset_busy_port_q(pp, flags, bpq);
}

/*                                                                                                                        
 * Busy port queue management                                                                                             
 */
static erts_aint32_t
check_unset_busy_port_q(Port *pp,
                        erts_aint32_t flags,
                        ErtsPortTaskBusyPortQ *bpq)
{
    ErlDrvSizeT qsize, low;
    int resume_procs = 0;

    ASSERT(bpq);
    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));

    erts_port_task_sched_lock(&pp->sched);
    qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
    low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
    if (qsize < low) {
        erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
                               | ERTS_PTS_FLG_BUSY_PORT_Q);
        flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask);
        if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
            resume_procs = 1;
    }
    else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
        flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags,
                                                 ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
        flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
    }
    erts_port_task_sched_unlock(&pp->sched);
    if (resume_procs)
        erts_port_resume_procs(pp);
    return flags;
}

程式碼很明白的說:
任務入佇列的時候超過水位線會設定ERTS_PTS_FLG_BUSY_PORT_Q狀態,出佇列的時候發現低於水位線的時候,會呼叫check_unset_busy_port_q來喚醒被掛起的傳送程序。

到現在任務入佇列的流程我們基本明白了,但是何時會呼叫dequeued_proc2port_data出佇列呢?
答案是erts_port_task_execute。還記得前面分析的時候說:這些任務會入佇列,port排程器會在適當的時候執行這些任務。
繼續看程式碼:

int
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
...

    pp = pop_port(runq);
    if (!pp) {
        res = 0;
        goto done;
    }
...
while (1) {
        erts_aint32_t task_state;
        ErtsPortTask *ptp;

        ptp = select_task_for_exec(pp, &execq, &processing_busy_q);
        if (!ptp)
            break;
      ...
       switch (ptp->type) {
        case ERTS_PORT_TASK_PROC_SIG: {
            ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
            ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
            if (!pp->sched.taskq.bpq)
                reds += ptp->u.alive.td.psig.callback(pp,
                                                      state,
                                                      ERTS_PROC2PORT_SIG_EXEC,
                                                      sigdp);
            else {
                ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
                reds += ptp->u.alive.td.psig.callback(pp,
                                                      state,
                                                      ERTS_PROC2PORT_SIG_EXEC,
                                                      sigdp);
                dequeued_proc2port_data(pp, size);
            }
            break;
        }
...
}

任務排程器發現發現是ERTS_PORT_TASK_PROC_SIG型別的任務,就會呼叫該任務的callback, 然後把該任務出佇列。 還記得那個任務callback叫什麼嗎? 好記性,就是port_sig_outputv。 他會真正執行把資料拖到網路的具體事情。

山路十八彎,好不容易才把整個流程給串起來,是不是很複雜!確實很複雜, 前段時間感冒的時候,躺在床上想了好幾天,才把事情的來龍去脈差不多搞清楚,做高效能伺服器的不容易呀,來點掌聲,為我們自己鼓勵下!

總結:這個水位線官方文件寫的不清不楚,還是原始碼靠譜!watermark 和 msgq_watermark的本質差別就是一個是控制port訊息佇列的水位線,一個控制程序在給並行port傳送資料時,條件暫時無法滿足時候,資料暫緩到port排程器時候,port排程器佇列的水位線,也可以理解為inflight的資料的水位線。

祝玩的開心!

Post Footer automatically generated by wp-posturl plugin for wordpress.

相關推薦

系統技術業餘研究 » gen_tcp傳送緩衝區以及水位問題分析

前段時間有同學在線上問了個問題: 伺服器端我是這樣設的:gen_tcp:listen(8000, [{active, false}, {recbuf,1}, {buffer,1}]). 客戶端是這樣設的:gen_tcp:connect(“localhost”, 8000, [{active, f

系統技術業餘研究 » gen_tcp傳送程序被掛起起因分析及對策

最近有同學在gmail上問關於gen_tcp傳送程序被掛起的問題,問題描述的非常好,見底下: 第一個問題是關於port_command和gen_tcp:send的。從專案上線至今,我在tcp傳送的地方遇到過兩次問題,都跟port_command有關係。 起初程式的效能不好,我從各方面嘗試分析和優化

系統技術業餘研究 » gen_tcp接收緩衝區易混淆概念糾正

Erlang的每個TCP網路連結是由相應的gen_tcp物件來表示的,說白了就是個port, 實現Erlang網路相關的邏輯,其實現程式碼位於erts/emulator/drivers/common/inet_drv.c 參照inet:setopts文件,它有三個buffer相關的選項,非常讓人

系統技術業餘研究 » gen_tcp呼叫程序收到{empty_out_q, Port}訊息奇怪行為分析

今天有同學在gmail裡面問了一個Erlang的問題,問題描述的非常好, 如下: 問題的背景是: 1、我開發了一個服務端程式,接收客戶端的連線。同一時刻會有多個客戶端來連線,連線後,接收客戶端請求後,再發送響應訊息,然後客戶端主動斷連。

系統技術業餘研究 » gen_tcp接受連結時enfile的問題分析及解決

最近我們為了安全方面的原因,在RDS伺服器上做了個代理程式把普通的MYSQL TCP連線變成了SSL連結,在測試的時候,皓庭同學發現Tsung發起了幾千個TCP連結後Erlang做的SSL PROXY老是報告gen_tcp:accept返回{error, enfile}錯誤。針對這個問題,我展開了

系統技術業餘研究 » gen_tcp:send的深度解刨和使用指南(初稿)

在大家的印象中, gen_tcp:send是個很樸素的函式, 一呼叫資料就喀嚓喀嚓到了對端. 這是個很大的誤解, Erlang的otp文件寫的很不清楚. 而且這個功能對於大部分的網路程式是至關重要的, 它的使用對否極大了影響了應用的效能. 我聽到很多同學在抱怨erlang的效能低或者出了很奇怪的問

系統技術業餘研究 » gen_tcp容易誤用的一點解釋

前天有同學在玩erlang gen_tcp的時候碰到了點小麻煩,描述如下: 比如說連線到baidu.com,發個http請求,然後馬上接收資料,發現接收出錯,wireshark抓包發現數據都有往返傳送,比較鬱悶。 我把問題演示下: $ erl Erlang R14B03 (erts-5.8

系統技術業餘研究 » gen_tcp如何限制封包大小

我們在做tcp伺服器的時候,通常會從安全考慮,限制封包的大小,預防被無端攻擊或者避免極端的請求對業務造成損害。 我們的tcp伺服器通常是erlang做的,那麼就涉及到gen_tcp如何限制封包的大小. gen_tcp對封包的獲取有2種方式: 1. {active, false} 封包透過gen_

系統技術業餘研究 » gen_tcp連線半關閉問題

很久之前我發在javaeye論壇上,預防丟了抄過來: 原文:http://erlang.group.iteye.com/group/wiki/1422-gen_tcp-half-closed 當tcp對端呼叫shutdown(RD/WR) 時候, 宿主程序預設將收到{tcp_closed, Soc

系統技術業餘研究 » Linux TASK_IO_ACCOUNTING功能以及如何使用

在過去我們瞭解系統IO的情況大多數是通過iostat來獲取的,這個粒度只能精確到每個裝置。通常我們會想了解每個程序,執行緒層面發起了多少IO,在Linux 2.6.20之前除了用systemtap這樣的工具來實現是沒有其他方法的,因為系統沒有暴露這方面的統計。 disktop per裝置per應用

系統技術業餘研究 » Erlang supervisor規格的dynamic行為分析

今天benjamin同學在網上問了以下問題: 我在看mochiweb和misultin的程式碼時有一些不理解的地方,以下是程式碼: init({MainSupRef, Port, OptionsTcp, AcceptorsPoolsize, RecvTimeout, SocketMode, Cus

系統技術業餘研究 » 未公開的gen_tcp:unrecv以及接收緩衝區行為分析

gen_tcp:unrecv是個未公開的函式,作用是往tcp的接收緩衝區裡面填入指定的資料。別看這小小的函式,用起來很舒服的。 我們先看下它的程式碼實現,Erlang程式碼部分: %%gen_tcp.erl:L299 unrecv(S, Data) when is_port(S) ->

系統技術業餘研究 » Erlang gen_tcp相關問題彙編索引

gen_tcp是erlang做網路應用最核心的一個模組,實踐中使用起來會有很多問題,我把團隊和我自己過去碰到的問題彙編下,方便大家對症下藥. 以下是gen_tcp,tcp,port相關的博文: 待續,歡迎補充! 祝玩得開心! Post Footer automatically generate

系統技術業餘研究

ItPub寫的文章“2017 年度 DB-Engines 資料庫冠軍得主:PostgreSQL 封王!”, 點選 這裡 進一步閱讀 升的最快的幾個資料庫,我簡單的無責任點評: PG資料庫是很老的資料庫,不過這幾年冉冉升起,因為是學院派的,有很好的學術和智力的支援,一直以來在資料庫的體系結構,程式碼

系統技術業餘研究 » MySQL資料庫架構的演化觀察

MySQL資料庫架構的演化觀察 December 14th, 2017 Categories: 資料庫 Tags: mysql

系統技術業餘研究 » inet_dist_connect_options

Erlang 17.5版本引入了inet_dist_{listen,connect}_options,對於結點間的互聯socket可以有更精細的控制,RPC的時候效能可以微調: raimo/inet_tcp_dist-priority-option/OTP-12476: Document ke

系統技術業餘研究 » 推薦工作機會

最後更新時間:2014/11/28 請賜簡歷至:[email protected], 感謝您對加入我們公司有興趣,我們希望能早日和您共事。 以下幾個職位1年內有效,歡迎內部轉崗:
 資深資料工程師 公司:阿里(核心系統資料庫組) 工作地點:杭州(西溪園區) 崗位描述: 分析雲服務產生的海

系統技術業餘研究 » 新的工作和研究方向

和大家更新下: 做了將近8年資料庫後,我的工作和研究方向將會延伸到虛擬化和計算相關的雲服務,希望能夠和大家一起進步,Happy New Year! 預祝大家玩得開心! Post Footer automatically generated by wp-posturl plugin for w

系統技術業餘研究 » 叢集引入inet_dist_{listen,connect}_options更精細引數微調

Erlang 17.5版本引入了inet_dist_{listen,connect}_options,對於結點間的互聯socket可以有更精細的控制,RPC的時候效能可以微調: raimo/inet_tcp_dist-priority-option/OTP-12476: Document ke

系統技術業餘研究 » 2017升的最快的幾個資料庫無責任點評

ItPub寫的文章“2017 年度 DB-Engines 資料庫冠軍得主:PostgreSQL 封王!”, 點選 這裡 進一步閱讀 升的最快的幾個資料庫,我簡單的無責任點評: PG資料庫是很老的資料庫,不過這幾年冉冉升起,因為是學院派的,有很好的學術和智力的支援,一直以來在資料庫的體系結構,程式碼