1. 程式人生 > >系統技術非業餘研究 » gen_tcp:send的深度解刨和使用指南(初稿)

系統技術非業餘研究 » gen_tcp:send的深度解刨和使用指南(初稿)

在大家的印象中, gen_tcp:send是個很樸素的函式, 一呼叫資料就喀嚓喀嚓到了對端. 這是個很大的誤解, Erlang的otp文件寫的很不清楚. 而且這個功能對於大部分的網路程式是至關重要的, 它的使用對否極大了影響了應用的效能. 我聽到很多同學在抱怨erlang的效能低或者出了很奇怪的問題, 很多是由於對系統的不瞭解, 誤用的. 我下面就來解刨下, 文章很長, 而且需要讀者熟悉erlang和底層的知識, 跟我來吧.

這篇文章是基於Erlang R13B04這個版本寫的.

以下是從gen_tcp文件中摘抄的:

gen_tcp:send(Socket, Packet) -> ok | {error, Reason}
* Socket = socket()
* Packet =

[char()] | binary()
* Reason = posix()
* Sends a packet on a socket.

There is no send call with timeout option, you use the send_timeout socket option if timeouts are desired. See the examples section.

典型的使用如下:

client(PortNo,Message) ->
{ok,Sock} = gen_tcp:connect("localhost",PortNo,[{active,false},
{packet,2}]),
gen_tcp:send(Sock,Message),
A = gen_tcp:recv(Sock,0),
gen_tcp:close(Sock),
A.

很簡單是把? 乍一看確實很簡單, 但是這是迷惑人的開始.

我們上原始碼:

lib/kernel/src/gen_tcp.erl

124send(S, Packet) when is_port(S) ->    %這裡可以看出 S是個port
125    case inet_db:lookup_socket(S) of
126        {ok, Mod} ->                  %Mod可能是inet_tcp.erl 或者  inet6_tcp.erl
127            Mod:send(S, Packet);
128        Error ->
129            Error
130    end.

lib/kernel/src/inet_tcp.erl

 49send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts). %轉給prim_inet模組
 50send(Socket, Packet) -> prim_inet:send(Socket, Packet, []).

erts/preloaded/src/prim_inet.erl

 360send(S, Data, OptList) when is_port(S), is_list(OptList) ->
 361    ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]),
 362    try erlang:port_command(S, Data, OptList) of     <strong>%推給底層的port模組來處理</strong>
 363        false -> % Port busy and nosuspend option passed
 364            ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []),
 365            {error,busy};
 366        true -> <strong>% Port模組接受資料</strong>
 367            receive
 368                {inet_reply,S,Status} ->  <strong>%阻塞, 等待迴應</strong>
 369                    ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),
 370                    Status
 371            end
 372    catch
 373        error:_Error ->
 374            ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),
 375             {error,einval}
 376    end.
 377
 378send(S, Data) ->
 379    send(S, Data, []).

從上面這幾段程式碼我們可以看出,當我們呼叫gen_tcp:send的時候, kernel模組會根據gen_tcp socket的型別決定呼叫相應的模組. 這個模組要麼是inet_tcp, 要麼是inet6_tcp. 這個模組會把傳送請求委託給
prim_inet模組. prim_inet模組首先檢查Socket是否合法, 如果合法然後呼叫erlang:port_command把系統推到ERTS執行期.
這個推的結果有2個: 1. 成功, 程序掛起等待執行期的反饋. 2. 失敗,立即返回.
什麼情況下會失敗呢?
1. 驅動不支援soft_busy, 但是我們用了force標誌
2. 驅動已經busy了, 但是我們不允許程序掛起.

我們先看相關的文件和程式碼:

erlang:port_command(Port, Data, OptionList) -> true|false

If the port command is aborted false is returned; otherwise, true is returned.

If the port is busy, the calling process will be suspended until the port is not busy anymore.

Currently the following Options are valid:

force
The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag.

nosuspend
The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned.

關於busy_port可以參見文件

erlang:system_monitor(MonitorPid, [Option]) -> MonSettings
busy_port
If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending to Port.

erts/emulator/beam/erl_bif_port.c

 215BIF_RETTYPE port_command_2(BIF_ALIST_2)
 216{
 217    return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL, 0);
 218}
 219
 220BIF_RETTYPE port_command_3(BIF_ALIST_3)
 221{
 222    Eterm l = BIF_ARG_3;
 223    Uint32 flags = 0;
 224    while (is_list(l)) {
 225        Eterm* cons = list_val(l);
 226        Eterm car = CAR(cons);
        <strong>    /*處理force和no_suspend選項*/</strong>
 227        if (car == am_force) {
 228            flags |= ERTS_PORT_COMMAND_FLAG_FORCE;
 229        } else if (car == am_nosuspend) {
 230            flags |= ERTS_PORT_COMMAND_FLAG_NOSUSPEND;
 231        } else {
 232            BIF_ERROR(BIF_P, BADARG);
 233        }
 234        l = CDR(cons);
 235    }
 236    if(!is_nil(l)) {
 237        BIF_ERROR(BIF_P, BADARG);
 238    }
 239    return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, flags);
 240}

 121#define ERTS_PORT_COMMAND_FLAG_FORCE            (((Uint32) 1) << 0)
 122#define ERTS_PORT_COMMAND_FLAG_NOSUSPEND        (((Uint32) 1) << 1)
 123
 124static BIF_RETTYPE do_port_command(Process *BIF_P,
 125                                   Eterm BIF_ARG_1,
 126                                   Eterm BIF_ARG_2,
 127                                   Eterm BIF_ARG_3,
 128                                   Uint32 flags)
 129{
 130    BIF_RETTYPE res;
 131    Port *p;
 132
 133    /* Trace sched out before lock check wait */
 134    if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) {
 135        trace_virtual_sched(BIF_P, am_out);
 136    }
 137
 138    if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) {
 139        profile_runnable_proc(BIF_P, am_inactive);
 140    }
 141
 142    p = id_or_name2port(BIF_P, BIF_ARG_1);
 143    if (!p) {
 144        if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) {
 145            trace_virtual_sched(BIF_P, am_in);
 146        }
 147        if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) {
 148            profile_runnable_proc(BIF_P, am_active);
 149        }
 150        BIF_ERROR(BIF_P, BADARG);
 151    }
 152
 153    /* Trace port in, id_or_name2port causes wait */
 154
 155    if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
 156        trace_sched_ports_where(p, am_in, am_command);
 157    }
 158    if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(p)) {
 159        profile_runnable_port(p, am_active);
 160    }
 161
 162    ERTS_BIF_PREP_RET(res, am_true);
 163
 164    if ((flags & ERTS_PORT_COMMAND_FLAG_FORCE)
 165        && !(p->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY)) {
 166        ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP);
 167    }
 168    else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE)
 169             && p->status & ERTS_PORT_SFLG_PORT_BUSY) {
 170        if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) {
 171            ERTS_BIF_PREP_RET(res, am_false);
 172        }
 173        else {<strong>/*掛起呼叫者程序, 同時傳送busy_port*/</strong>
 174            erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, p);
 175            if (erts_system_monitor_flags.busy_port) {
 176                monitor_generic(BIF_P, am_busy_port, p->id);
 177            }
 178            ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_port_command_3], BIF_P,
 179                                 BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
 180        }
 181    } else {
 182        int wres;
 183        erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
 184        ERTS_SMP_CHK_NO_PROC_LOCKS;
 185        wres = erts_write_to_port(BIF_P->id, p, BIF_ARG_2);
 186        erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
 187        if (wres != 0) {
 188            ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG);
 189        }
 190    }
 191
 192    if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
 193        trace_sched_ports_where(p, am_out, am_command);
 194    }
 195    if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(p)) {
 196        profile_runnable_port(p, am_inactive);
 197    }
 198
 199    erts_port_release(p);
 200    /* Trace sched in after port release */
 201    if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) {
 202        trace_virtual_sched(BIF_P, am_in);
 203    }
 204    if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) {
 205        profile_runnable_proc(BIF_P, am_active);
 206    }
 207
 208    if (ERTS_PROC_IS_EXITING(BIF_P)) {
 209        KILL_CATCHES(BIF_P);    /* Must exit */
 210        ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_ERROR);
 211    }
 212    return res;
 213}
 214

erts/emulator/drivers/common/inet_drv.c

865
866static struct erl_drv_entry tcp_inet_driver_entry =
867{
868    tcp_inet_init,  /* inet_init will add this driver !! */
869    tcp_inet_start,
870    tcp_inet_stop,
871    tcp_inet_command,
872#ifdef __WIN32__
873    tcp_inet_event,
874    NULL,
875#else
876    tcp_inet_drv_input,
877    tcp_inet_drv_output,
878#endif
879    "tcp_inet",
880    NULL,
881    NULL,
882    tcp_inet_ctl,
883    tcp_inet_timeout,
884    tcp_inet_commandv,
885    NULL,
886    tcp_inet_flush,
887    NULL,
888    NULL,
889    ERL_DRV_EXTENDED_MARKER,
890    ERL_DRV_EXTENDED_MAJOR_VERSION,
891    ERL_DRV_EXTENDED_MINOR_VERSION,
892    ERL_DRV_FLAG_USE_PORT_LOCKING|ERL_DRV_FLAG_SOFT_BUSY,<strong>  /*我們的tcp 驅動支援soft_busy*/</strong>
893    NULL,
894    tcp_inet_process_exit,
895    inet_stop_select
896};

897

在tcp:send 虛擬機器執行這個層面上,  呼叫者程序被掛起有以下幾種可能:

1. 資料成功推到ERTS, 等待ERTS的傳送結果通知. 這是大多數情況.

2. 該socket忙, 我們沒有設定port_command的force標誌.

3. 呼叫者程序傳送了大量的資料, 時間片用完被執行期掛起.

失敗的可能:  我們設定了nosuspend標誌, 但是socket忙.

到此為止執行期順利開始呼叫erts_write_to_port把資料傳遞到下一層去了:

我們的疑問是資料組織的, 執行期會對資料如何處理呢? 繼續看程式

erts/emulator/beam/io.c

<strong>1054#define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT) /* #define ERL_ONHEAP_BIN_LIMIT 64*/
1055#define SMALL_WRITE_VEC  16</strong>
1056
1057
1058/* write data to a port */
1059int erts_write_to_port(Eterm caller_id, Port *p, Eterm list)
1060{
1061    char *buf;
1062    erts_driver_t *drv = p->drv_ptr;
1063    int size;
1064    int fpe_was_unmasked;
1065
1066    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
1067    ERTS_SMP_CHK_NO_PROC_LOCKS;
1068
1069    p->caller = caller_id;
1070    if (drv->outputv != NULL) {
1071        int vsize;
1072        int csize;
1073        int pvsize;
1074        int pcsize;
1075        int blimit;
1076        SysIOVec iv[SMALL_WRITE_VEC];   /*最多16個段*/
1077        ErlDrvBinary* bv[SMALL_WRITE_VEC];
1078        SysIOVec* ivp;
1079        ErlDrvBinary**  bvp;
1080        ErlDrvBinary* cbin;
1081        ErlIOVec ev;
1082
1083        if ((size = io_list_vec_len(list, &vsize, &csize,
1084                                    ERL_SMALL_IO_BIN_LIMIT,
1085                                    &pvsize, &pcsize)) < 0) {
1086            goto bad_value;
1087        }
1088        /* To pack or not to pack (small binaries) ...? */
1089        vsize++;
1090        if (vsize <= SMALL_WRITE_VEC) {
1091            /* Do NOT pack */
1092            blimit = 0;
1093        } else {
1094            /* Do pack */
1095            vsize = pvsize + 1;
1096            csize = pcsize;
1097            blimit = ERL_SMALL_IO_BIN_LIMIT;
1098        }
1099        /* Use vsize and csize from now on */
1100        if (vsize <= SMALL_WRITE_VEC) {
1101            ivp = iv;
1102            bvp = bv;
1103        } else {
1104            ivp = (SysIOVec *) erts_alloc(ERTS_ALC_T_TMP,
1105                                          vsize * sizeof(SysIOVec));
1106            bvp = (ErlDrvBinary**) erts_alloc(ERTS_ALC_T_TMP,
1107                                              vsize * sizeof(ErlDrvBinary*));
1108        }
1109        cbin = driver_alloc_binary(csize);
1110        if (!cbin)
1111            erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
1112
1113        /* Element 0 is for driver usage to add header block */
1114        ivp[0].iov_base = NULL;
1115        ivp[0].iov_len = 0;
1116        bvp[0] = NULL;
1117        ev.vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
1118        ev.vsize++;
1119#if 0
1120        /* This assertion may say something useful, but it can
1121           be falsified during the emulator test suites. */
1122        ASSERT((ev.vsize >= 0) && (ev.vsize == vsize));
1123#endif
1124        ev.size = size;  /* total size */
1125        ev.iov = ivp;
1126        ev.binv = bvp;
1127        fpe_was_unmasked = erts_block_fpe();
1128        (*drv->outputv)((ErlDrvData)p->drv_data, &ev);
1129        erts_unblock_fpe(fpe_was_unmasked);
1130        if (ivp != iv) {
1131            erts_free(ERTS_ALC_T_TMP, (void *) ivp);
1132        }
1133        if (bvp != bv) {
1134            erts_free(ERTS_ALC_T_TMP, (void *) bvp);
1135        }
1136        driver_free_binary(cbin);
1137    } else {
1138        int r;
1139
1140        /* Try with an 8KB buffer first (will often be enough I guess). */
1141        size = 8*1024;
1142        /* See below why the extra byte is added. */
1143        buf = erts_alloc(ERTS_ALC_T_TMP, size+1);
1144        r = io_list_to_buf(list, buf, size);
1145
1146        if (r >= 0) {
1147            size -= r;
1148            fpe_was_unmasked = erts_block_fpe();
1149            (*drv->output)((ErlDrvData)p->drv_data, buf, size); <strong> /*呼叫inet_drv裡面的tcp output*/</strong>
1150            erts_unblock_fpe(fpe_was_unmasked);
1151            erts_free(ERTS_ALC_T_TMP, buf);
1152        }
1153        else if (r == -2) {
1154            erts_free(ERTS_ALC_T_TMP, buf);
1155            goto bad_value;
1156        }
1157        else {
1158            ASSERT(r == -1); /* Overflow */
1159            erts_free(ERTS_ALC_T_TMP, buf);
1160            if ((size = io_list_len(list)) < 0) {
1161                goto bad_value;
1162            }
1163
1164            /*
1165             * I know drivers that pad space with '\0' this is clearly
1166             * incorrect but I don't feel like fixing them now, insted
1167             * add ONE extra byte.
1168             */
1169            buf = erts_alloc(ERTS_ALC_T_TMP, size+1);
1170            r = io_list_to_buf(list, buf, size);
1171            fpe_was_unmasked = erts_block_fpe();
1172            (*drv->output)((ErlDrvData)p->drv_data, buf, size);
1173            erts_unblock_fpe(fpe_was_unmasked);
1174            erts_free(ERTS_ALC_T_TMP, buf);
1175        }
1176    }
1177    p->bytes_out += size;
1178    erts_smp_atomic_add(&erts_bytes_out, size);
1179
1180#ifdef ERTS_SMP
1181    if (p->xports)
1182        erts_smp_xports_unlock(p);
1183    ASSERT(!p->xports);
1184#endif
1185    p->caller = NIL;
1186    return 0;
1187
1188 bad_value:
1189    p->caller = NIL;
1190    {
1191        erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
1192        erts_dsprintf(dsbufp, "Bad value on output port '%s'\n", p->name);
1193        erts_send_error_to_logger_nogl(dsbufp);
1194        return 1;
1195    }
1196}

gent_tcp:send的時候資料的格式是iolist. 很多人會誤會,特地把iolist特地變成list或者binary. 新生成的binary或者list在send之後要GC回收, 如果頻繁的話,
系統的效能損失很大.
tcp驅動是支援scatter write的, 最終是呼叫writev系統呼叫的.所以我們要充分利用這一特性.
我們從上面的程式碼可以看出, io是按照這樣的規則填充writev向量的: 如果iolist的元素是
1. int, 拷貝.
2. binary是heap binary, 拷貝
3. binary是proc binary而且大小<64位元組拷貝.

同時tcp是流協議,我們在傳送訊息的時候, 通常需要在訊息前面添加個頭, 比如說4個位元組的長度. 這個如果手工做的話, 效率非常低.
tcp_driver支援自動加訊息長度, 看文件:

{packet, PacketType}(TCP/IP sockets)
Defines the type of packets to use for a socket. The following values are valid:

raw | 0
No packaging is done.

1 | 2 | 4
Packets consist of a header specifying the number of bytes in the packet, followed by that number of bytes. The length of header can be one, two, or four bytes; containing an unsigned integer in big-endian byte order. Each send operation will generate the header, and the header will be stripped off on each receive operation.

In current implementation the 4-byte header is limited to 2Gb.

到此為止, 資料已經打包準備好, 這時候資料就移到到inet_drv驅動來負責了:
inet_drv內部每個socket都有個訊息佇列, 保持著上層推來的訊息. 這個訊息佇列有上下水位線的. 當訊息的位元組數目超過了高水位線的時候, inet_drv就把socket標誌為busy. 這個busy要到佇列的位元組數少於
低水位線的時候才解除.

這是未公開的文件,用法參見下面:
inet:setopts(Socket, [{high_watermark, 131072}]).
inet:setopts(Socket, [{low_watermark, 65536}]).

erts/emulator/drivers/common/inet_drv.c

8641static void tcp_inet_drv_output(ErlDrvData data, ErlDrvEvent event)
8642{
8643    (void)tcp_inet_output((tcp_descriptor*)data, (HANDLE)event);
8644}

8651/* socket ready for ouput:
8652** 1. TCP_STATE_CONNECTING => non block connect ?
8653** 2. TCP_STATE_CONNECTED  => write output
8654*/
8655static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
8656{
8657    int ret = 0;
8658    ErlDrvPort ix = desc->inet.port;
8659
8660    DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n",
8661            (long)desc->inet.port, desc->inet.s));
8662    if (desc->inet.state == TCP_STATE_CONNECTING) {
8663        sock_select(INETP(desc),FD_CONNECT,0);
8664
8665        driver_cancel_timer(ix);  /* posssibly cancel a timer */
8666#ifndef __WIN32__
8667        /*
8668         * XXX This is strange.  This *should* work on Windows NT too,
8669         * but doesn't.  An bug in Winsock 2.0 for Windows NT?
8670         *
8671         * See "Unix Netwok Programming", W.R.Stevens, p 412 for a
8672         * discussion about Unix portability and non blocking connect.
8673         */
8674
8675#ifndef SO_ERROR
8676        {
8677            int sz = sizeof(desc->inet.remote);
8678            int code = sock_peer(desc->inet.s,
8679                                 (struct sockaddr*) &desc->inet.remote, &sz);
8680
8681            if (code == SOCKET_ERROR) {
8682                desc->inet.state = TCP_STATE_BOUND;  /* restore state */
8683                ret =  async_error(INETP(desc), sock_errno());
8684                goto done;
8685            }
8686        }
8687#else
8688        {
8689            int error = 0;      /* Has to be initiated, we check it */
8690            unsigned int sz = sizeof(error); /* even if we get -1 */
8691            int code = sock_getopt(desc->inet.s, SOL_SOCKET, SO_ERROR,
8692                                   (void *)&error, &sz);
8693
8694            if ((code < 0) || error) {
8695                desc->inet.state = TCP_STATE_BOUND;  /* restore state */
8696                ret = async_error(INETP(desc), error);
8697                goto done;
8698            }
8699        }
8700#endif /* SOCKOPT_CONNECT_STAT */
8701#endif /* !__WIN32__ */
8702
8703        desc->inet.state = TCP_STATE_CONNECTED;
8704        if (desc->inet.active)
8705            sock_select(INETP(desc),(FD_READ|FD_CLOSE),1);
8706        async_ok(INETP(desc));
8707    }
8708    else if (IS_CONNECTED(INETP(desc))) {
8709        for (;;) {
8710            int vsize;
8711            int n;
8712            SysIOVec* iov;
8713
8714            if ((iov = driver_peekq(ix, &vsize)) == NULL) {
8715                sock_select(INETP(desc), FD_WRITE, 0);
8716                send_empty_out_q_msgs(INETP(desc));
8717                goto done;
8718            }
8719            vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize;
8720            DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\r\n",
8721                    (long)desc->inet.port, desc->inet.s, vsize));
8722            if (sock_sendv(desc->inet.s, iov, vsize, &n, 0)==SOCKET_ERROR) {
8723                if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
8724                    DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\r\n",
8725                            (long)desc->inet.port, vsize, sock_errno()));
8726                    ret =  tcp_send_error(desc, sock_errno());
8727                    goto done;
8728                }
8729#ifdef __WIN32__
8730                desc->inet.send_would_block = 1;
8731#endif
8732                goto done;
8733            }
8734            if (driver_deq(ix, n) <= desc->low) {
8735                if (IS_BUSY(INETP(desc))) {
8736                    desc->inet.caller = desc->inet.busy_caller;
8737                    desc->inet.state &= ~INET_F_BUSY;
8738                    set_busy_port(desc->inet.port, 0);
8739                    /* if we have a timer then cancel and send ok to client */
8740                    if (desc->busy_on_send) {
8741                        driver_cancel_timer(desc->inet.port);
8742                        desc->busy_on_send = 0;
8743                    }
8744                    inet_reply_ok(INETP(desc));
8745                }
8746            }
8747        }
8748    }
8749    else {
8750        sock_select(INETP(desc),FD_CONNECT,0);
8751        DEBUGF(("tcp_inet_output(%ld): bad state: %04x\r\n",
8752                (long)desc->inet.port, desc->inet.state));
8753    }
8754 done:
8755    DEBUGF(("tcp_inet_output(%ld) }\r\n", (long)desc->inet.port));
8756    return ret;
8757}
8758

首先看下文件, 那些行為會對這個資料傳送有影響.

inet:setopts(Socket, Options) -> ok | {error, posix()}

{delay_send, Boolean}
Normally, when an Erlang process sends to a socket, the driver will try to immediately send the data. If that fails, the driver will use any means available to queue up the message to be sent whenever the operating system says it can handle it. Setting {delay_send, true} will make all messages queue up. This makes the messages actually sent onto the network be larger but fewer. The option actually affects the scheduling of send requests versus Erlang processes instead of changing any real property of the socket. Needless to say it is an implementation specific option. Default is false.

通常tcp_inet_output的時候,  首先先從佇列裡找出上次為傳送完畢的訊息, 嘗試傳送, 如果傳送不能全傳送出去. 那麼剩下的連同現在的訊息入佇列.

如果傳送成功, 那麼看下delay_send標誌有沒有置位, 如果有直接把訊息入佇列, 湊成大的訊息塊, 等下次一起傳送.

如果佇列裡有資料的話, tcp 驅動會把該socket登記等待可寫事件,等待事件通知,在合適的時間,等待port再排程寫.

{sndbuf, Integer}
Gives the size of the send buffer to use for the socket.

這個標誌影響socket在核心協議棧的寫快取區, 越大, 系統呼叫send就越容易把資料推入協議棧.

{send_timeout, Integer}

Only allowed for connection oriented sockets.

Specifies a longest time to wait for a send operation to be accepted by the underlying TCP stack. When the limit is exceeded, the send operation will return {error,timeout}. How much of a packet that actually got sent is unknown, why the socket should be closed whenever a timeout has occurred (see send_timeout_close). Default is infinity.

{send_timeout_close, Boolean}
Only allowed for connection oriented sockets.

Used together with send_timeout to specify whether the socket will be automatically closed when the send operation returns {error,timeout}. The recommended setting is true which will automatically close the socket. Default is false due to backward compatibility.

到現在為止, 資料可能部分在訊息佇列裡面, 部分推到tcp協議棧的buffer中去等待網絡卡發出去,同時還可能登記著socket的可寫事件.

一旦發生可寫事件, 執行期 就會排程該socket對應的port來進行進一步的寫.

如果一條訊息成功的推到協議棧, 那麼tcp 驅動會給呼叫者程序傳送{inet_reply,S,Status}訊息, 反饋結果. 這時候呼叫者程序也就是tcp:send返回, 完成了整個流程.

這裡有幾個要點:

1. port是和程序一樣公平排程的.  程序是按照reductions為單位排程的, port是把傳送的位元組數摺合成reductions.  所以如果一個程序傳送大量的tcp資料 那麼這個程序不是一直會得到執行的. 執行期會強制停止一段時間, 讓其他port有機會執行的.

2.  gen_tcp的傳送是同步的, 也就是說阻塞在receive {inet_reply,S,Status} -> ?DBG_FORMAT(“prim_inet:send() -> ~p~n”, [Status]),Status end上, 這個對傳送大量的訊息的場合很不利.

更好的做法是: 手工把gen_tcp的2個步驟分開做:
1. 不停的erlang:port_command(S, Data, OptList)  最好加上force標誌
2. 被動等待{inet_reply,S,Status} 訊息.

具體請參考hotwheels或者rabbitmq專案的程式碼.
hotwheels/src/pubsun.erl

 69handle_cast({publish, Msg}, State) ->
 70    io:format("publish,info: ~p~n", [ets:info(State#state.subs)]),
 71    {A, B, C} = Start = now(),
 72    Msg1 = <<A:32, B:32, C:32, ?MESSAGE, Msg/binary>>,
 73    F = fun({_, _, Sock}, _) -> erlang:port_command(Sock, Msg1) end,
 74   erlang:process_flag(priority, high),
 75    ets:foldl(F, ok, State#state.subs),
 76    End = now(),
 77    erlang:process_flag(priority, normal),
 78    io:format("cost time: ~p~n", [timer:now_diff(End, Start) / 1000]),
 79    {noreply, State};

 95
 96handle_info({inet_reply, _Sock, _Error}, State) ->
 97    io:format("inet reply error: ~p~n", [_Error]),
 98    %% there needs to be a reverse lookup from sock to pid
 99    {noreply, State};
100


推論:
gen_tcp:send理論上的效率應該是頂級c程式設計師寫的80%, 如果你低於這個數字, 請按照上面的步驟來排錯問題.

參考文章:
http://mryufeng.javaeye.com/blog/475003
http://mryufeng.javaeye.com/blog/289058
http://mryufeng.javaeye.com/blog/288384
http://mryufeng.javaeye.com/blog/366761
http://avindev.javaeye.com/blog/76373

Post Footer automatically generated by wp-posturl plugin for wordpress.