1. 程式人生 > >Nginx-rtmp直播之業務流程分析--比較詳細

Nginx-rtmp直播之業務流程分析--比較詳細

1. 綜述

1.1 直播原理

使用 obs 向 nginx 推送一個直播流,該直播流經 nginx-rtmp 的 ngx_rtmp_live_module 模組轉發給 application live 應用,
然後使用 vlc 連線 live,播放該直播流。

1.2 nginx.conf

# 建立的子程序數
worker_processes  1;

error_log  stderr  debug;

daemon off;

master_process off;

events {
    worker_connections  1024;
}

rtmp {
    
    server {
        listen 1935; # rtmp傳輸埠

        chunk_size 4096; # 資料傳輸塊大小

        application live { # 直播配置
            live on;
        }

        # obs 將流推到該 push 應用,push 應用又將該流釋出到 live 應用
        application push {
            live on;
            push rtmp://192.168.1.82:1935/live; # 推流到上面的直播應用
        }
    }
}

1.3 obs 推流設定

  1. 點選 "+" 選擇一個媒體源,確定,然後設定該媒體源,如下圖:
    技術分享圖片

  2. 點選 "設定" 選擇 "流",設定推流地址,如下圖,確定後即可進行推流:
    技術分享圖片

1.4 使用 vlc 播放直播流

技術分享圖片

2. 原始碼分析:application push

首先開始分析從 obs 推送 rtmp 流到 nginx 伺服器的整個流程。

2.1 監聽連線

nginx 啟動後,就會一直在 ngx_process_events 函式中的 epoll_eait 處休眠,監聽客戶端的連線:

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
    ...
    
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll timer: %M", timer);

    /* nginx 最初執行時,timer 為 -1,即一直等待客戶端連線 */
    events = epoll_wait(ep, event_list, (int) nevents, timer);
    
    ...
    
    for (i = 0; i < events; i++) {
        c = event_list[i].data.ptr;
        
        instance = (uintptr_t) c & 1;
        c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
        
        /* 獲取被監聽的讀事件 */
        rev = c->read;
        
        /* 獲取 epoll_wait 返回的事件標誌 */
        revents = event_list[i].events;
        
        ...
        
        /* 若是監聽的事件可讀,首次監聽即表示有新連線到來 */
        if ((revents & EPOLLIN) && rev->active) {
            ...
            
            rev->ready = 1;

            /* 若是開啟了負載均衡,則先將該事件新增到 ngx_posted_accept_events 
             * 延遲佇列中 */
            if (flags & NGX_POST_EVENTS) {
                queue = rev->accept ? &ngx_posted_accept_events
                                    : &ngx_posted_events;

                ngx_post_event(rev, queue);

            } else {
                /* 否則,直接呼叫該讀事件的回撥函式,若是新連線則
                 * 呼叫的是 ngx_event_accept 函式 */
                rev->handler(rev);
            }
        }
        
        ...
    }
    
    return NGX_OK;
}

ngx_event_accept 函式中主要也就是接受客戶端的連線,並呼叫該監聽埠對應的回撥函式:

void
ngx_event_accept(ngx_event_t *ev)
{
    ...
    
    do {
        ...
        
        s = accept(lc->fd, &sa.sockaddr, &socklen);
        ...
        
        /* 呼叫該監聽埠對應的回撥函式,對於 rtmp 模組,則固定為 ngx_rtmp_init_connection */
        ls->handler(c);
        
        ...
    } while (ev->available);
}

在 ngx_rtmp_init_connection 函式中先經過一系列的初始化後,開始接收與客戶端進行 rtmp 的 handshake 過程。

下面從 hanshake 到 hanshake 成功後接收到第一個 rtmp 包之間僅以圖片說明,就不再分析原始碼了。

2.2 handshake

2.2.1 hs_stage: SERVER_RECV_CHALLENGE(1)

該 hanshake 階段即為等待接收客戶端傳送的 C0 和 C1 階段。

receive: Handshake C0+C1 圖(1)

技術分享圖片
接收到客戶端傳送的 C0 和 C1 後,伺服器進入 NGX_RTMP_HANDSHAKE_SERVER_SEND_CHALLENGE(2)階段,即為
傳送S0 和 S1 階段。

2.2.2 hs_stage: SERVER_SEND_CHALLENGE(2) 和 SERVER_SEND_RESPONSE(3)

該 SERVER_SEND_CHALLENGE 階段即為等待接收客戶端傳送的 S0 和 S1 階段。但是實際上,伺服器在傳送完 S0 和
S1 後,進入到 SERVER_SEND_RESPONSE(3) 階段後又立刻傳送 S2,因此,在抓到的包如下:

send: Handshake S0+S1+S2 圖(2)

技術分享圖片

2.2.3 hs_stage: SERVER_RECV_RESPONSE(4)

該階段為等待接收客戶端傳送的 C2 階段。

receive:Handshake C2 圖(3)

技術分享圖片
至此,伺服器和客戶端的 rtmp handshake 過程完整,開始正常的資訊互動階段。

如下程式碼,接收到 C2 後,伺服器即進入迴圈處理客戶端的請求階段:ngx_rtmp_cycle

static void
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
{
    ngx_rtmp_free_handshake_buffers(s);

    ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
            "handshake: done");

    if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,
                NULL, NULL) != NGX_OK)
    {
        ngx_rtmp_finalize_session(s);
        return;
    }

    ngx_rtmp_cycle(s);
}

ngx_rtmp_cycle 函式中,重新設定了當前 rtmp 連線的讀、寫事件的回撥函式,當監聽到客戶端傳送的資料時,將呼叫
ngx_rtmp_recv 函式進行處理。

void
ngx_rtmp_cycle(ngx_rtmp_session_t *s)
{
    ngx_connection_t           *c;

    c = s->connection;
    c->read->handler  =  ngx_rtmp_recv;
    c->write->handler = ngx_rtmp_send;

    s->ping_evt.data = c;
    s->ping_evt.log = c->log;
    s->ping_evt.handler = ngx_rtmp_ping;
    ngx_rtmp_reset_ping(s);

    ngx_rtmp_recv(c->read);
}

在 ngx_rtmp_recv 函式中,會迴圈接收客戶端發來的 rtmp 包資料,接收到完整的一個 rtmp message 後,會根據該訊息
的 rtmp message type,呼叫相應的函式進行處理,如,若為 20,即為 amf0 型別的命令訊息,就會呼叫
ngx_rtmp_amf_message_handler 函式進行處理。

2.3 connect(‘push‘)

hanshake 成功後,接收到客戶端發來的第一個 rtmp 包為連線 nginx.conf 中 rtmp{} 下的 application push{}
應用,如下圖:

receive: connect(‘push‘) 圖(4)

技術分享圖片
從該圖可知,該訊息型別為 20,即為 AMF0 Command,因此會呼叫 ngx_rtmp_amf_message_handler 對該訊息進行解析,
然後對其中的命令 connect 呼叫預先設定好的 ngx_rtmp_cmd_connect_init 回撥函式。在 ngx_rtmp_cmd_connect_init
函式中,繼續解析該 connect 餘下的訊息後,開始 ngx_rtmp_connect 構件的 connect 函式連結串列,該連結串列中存放著各個
rtmp 模組對該 connect 命令所要做的操作(注:僅有部分 rtmp 模組會對該 connect 命令設定有回撥函式,並且就算
設定了回撥函式,也需要在配置檔案中啟用相應的模組才會真正執行該模組對 connect 的處理)。因此,對於 connect
命令,這裡僅會真正處理 ngx_rtmp_cmd_module 模組設定 ngx_rtmp_cmd_connect 回撥函式。

2.3.1 ngx_rtmp_cmd_connect

static ngx_int_t
ngx_rtmp_cmd_connect(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v)
{
    ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
                "rtmp cmd: connect");

    ngx_rtmp_core_srv_conf_t   *cscf;
    ngx_rtmp_core_app_conf_t  **cacfp;
    ngx_uint_t                  n;
    ngx_rtmp_header_t           h;
    u_char                     *p;

    static double               trans;
    static double               capabilities = NGX_RTMP_CAPABILITIES;
    static double               object_encoding = 0;

    /* 以下內容為伺服器將要對客戶端的 connect 命令返回的 amf 型別的響應 */
    static ngx_rtmp_amf_elt_t  out_obj[] = {

        { NGX_RTMP_AMF_STRING,
          ngx_string("fmsVer"),
          NGX_RTMP_FMS_VERSION, 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("capabilities"),
          &capabilities, 0 },
    };

    static ngx_rtmp_amf_elt_t  out_inf[] = {

        { NGX_RTMP_AMF_STRING,
          ngx_string("level"),
          "status", 0 },

        { NGX_RTMP_AMF_STRING,
          ngx_string("code"),
          "NetConnection.Connect.Success", 0 },

        { NGX_RTMP_AMF_STRING,
          ngx_string("description"),
          "Connection succeeded.", 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("objectEncoding"),
          &object_encoding, 0 }
    };

    static ngx_rtmp_amf_elt_t  out_elts[] = {

        { NGX_RTMP_AMF_STRING,
          ngx_null_string,
          "_result", 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_null_string,
          &trans, 0 },

        { NGX_RTMP_AMF_OBJECT,
          ngx_null_string,
          out_obj, sizeof(out_obj) },

        { NGX_RTMP_AMF_OBJECT,
          ngx_null_string,
          out_inf, sizeof(out_inf) },
    };

    if (s->connected) {
        ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
                "connect: duplicate connection");
        return NGX_ERROR;
    }

    cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);

    trans = v->trans;

    /* fill session parameters */
    s->connected = 1;

    ngx_memzero(&h, sizeof(h));
    h.csid = NGX_RTMP_CSID_AMF_INI;
    h.type = NGX_RTMP_MSG_AMF_CMD;


#define NGX_RTMP_SET_STRPAR(name)                                                 s->name.len = ngx_strlen(v->name);                                            s->name.data = ngx_palloc(s->connection->pool, s->name.len);                  ngx_memcpy(s->name.data, v->name, s->name.len)

    NGX_RTMP_SET_STRPAR(app);
    NGX_RTMP_SET_STRPAR(args);
    NGX_RTMP_SET_STRPAR(flashver);
    NGX_RTMP_SET_STRPAR(swf_url);
    NGX_RTMP_SET_STRPAR(tc_url);
    NGX_RTMP_SET_STRPAR(page_url);

#undef NGX_RTMP_SET_STRPAR

    p = ngx_strlchr(s->app.data, s->app.data + s->app.len, ‘?‘);
    if (p) {
        s->app.len = (p - s->app.data);
    }

    s->acodecs = (uint32_t) v->acodecs;
    s->vcodecs = (uint32_t) v->vcodecs;

    /* 找到客戶端 connect 的應用配置 */
    /* find application & set app_conf */
    cacfp = cscf->applications.elts;
    for(n = 0; n < cscf->applications.nelts; ++n, ++cacfp) {
        if ((*cacfp)->name.len == s->app.len &&
            ngx_strncmp((*cacfp)->name.data, s->app.data, s->app.len) == 0)
        {
            /* found app! */
            s->app_conf = (*cacfp)->app_conf;
            break;
        }
    }

    if (s->app_conf == NULL) {
        ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
                      "connect: application not found: ‘%V‘", &s->app);
        return NGX_ERROR;
    }

    object_encoding = v->object_encoding;

           /* 傳送應答視窗大小:ack_size 給客戶端,該訊息是用來通知對方應答視窗的大小,
            * 傳送方在傳送了等於視窗大小的資料之後,等的愛接收對方的應答訊息(在接收  
            * 到應答訊息之前停止傳送資料)。接收當必須傳送應答訊息,在會話開始時,在  
            * 會話開始時,會從上一次傳送應答之後接收到了等於視窗大小的資料 */
    return ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK ||
           /* 傳送 設定流頻寬訊息。傳送此訊息來說明對方的出口頻寬限制,接收方以此來限制  
            * 自己的出口頻寬,即限制未被應答的訊息資料大小。接收到此訊息的一方,如果  
            * 視窗大小與上一次傳送的不一致,應該回復應答視窗大小的訊息 */
           ngx_rtmp_send_bandwidth(s, cscf->ack_window,
                                   NGX_RTMP_LIMIT_DYNAMIC) != NGX_OK ||
           /* 傳送 設定塊訊息訊息,用來通知對方新的最大的塊大小。 */
           ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK ||
           ngx_rtmp_send_amf(s, &h, out_elts,
                             sizeof(out_elts) / sizeof(out_elts[0]))
           != NGX_OK ? NGX_ERROR : NGX_OK;
}

send: ack_size 圖(5)

技術分享圖片

send: peer bandwidth 圖(6)

技術分享圖片

send:chunk_size 圖(7)

技術分享圖片

send:_result(‘NetConnection.Connect.Success‘) 圖(8)
技術分享圖片

2.4 releaseStream(‘test‘)

伺服器響應客戶端 connect 命令訊息後,客戶端接著傳送 releaseStream 命令訊息給伺服器,但是 nginx-rtmp 中沒有
任何一個 rtmp 模組對該命令設定有回撥函式,因此,不進行處理,接著等待接收下一個訊息。

receive: releaseStream(‘test‘) 圖(9)

技術分享圖片

2.5 createStream(‘‘)

接著伺服器接收到客戶端發來的 createStream 命令訊息。

receive: createStream(‘‘) 圖(10)

技術分享圖片
從以前的分析可知,此時,會呼叫 ngx_rtmp_cmd_create_stream_init 函式。

2.5.1 ngx_rtmp_cmd_create_stream_init

static ngx_int_t
ngx_rtmp_cmd_create_stream_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
                                ngx_chain_t *in)
{
    static ngx_rtmp_create_stream_t     v;

    static ngx_rtmp_amf_elt_t  in_elts[] = {

        { NGX_RTMP_AMF_NUMBER,
          ngx_null_string,
          &v.trans, sizeof(v.trans) },
    };

    /* 解析該 createStream 命令訊息,獲取 v.trans 值,從圖(10) 可知,為 4 */
    if (ngx_rtmp_receive_amf(s, in, in_elts,
                sizeof(in_elts) / sizeof(in_elts[0])))
    {
        return NGX_ERROR;
    }

    ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "createStream");

    return ngx_rtmp_create_stream(s, &v);
}

接著,從該函式中開始呼叫 ngx_rtmp_create_stream 構建的函式連結串列。這裡呼叫到的是 ngx_rtmp_cmd_create_stream
函式。

2.5.2 ngx_rtmp_cmd_create_stream

static ngx_int_t
ngx_rtmp_cmd_create_stream(ngx_rtmp_session_t *s, ngx_rtmp_create_stream_t *v)
{
    ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "rtmp cmd: create stream");

    /* support one message stream per connection */
    static double               stream;
    static double               trans;
    ngx_rtmp_header_t           h;

    static ngx_rtmp_amf_elt_t  out_elts[] = {

        { NGX_RTMP_AMF_STRING,
          ngx_null_string,
          "_result", 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_null_string,
          &trans, 0 },

        { NGX_RTMP_AMF_NULL,
          ngx_null_string,
          NULL, 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_null_string,
          &stream, sizeof(stream) },
    };

    
    trans = v->trans;
    stream = NGX_RTMP_MSID;

    ngx_memzero(&h, sizeof(h));

    h.csid = NGX_RTMP_CSID_AMF_INI;
    h.type = NGX_RTMP_MSG_AMF_CMD;

    return ngx_rtmp_send_amf(s, &h, out_elts,
                             sizeof(out_elts) / sizeof(out_elts[0])) == NGX_OK ?
           NGX_DONE : NGX_ERROR;
}

該函式主要是傳送伺服器對 createStream 的響應。

send: _result()
技術分享圖片

2.6 publish(‘test‘)

接著,客戶端傳送 publish 給伺服器,用來發佈一個有名字的流到伺服器,其他客戶端可以使用此流名來播放流,接收
釋出的音訊,視訊,以及其他資料訊息。

receive:publish(‘test‘) 圖(11)

技術分享圖片
從圖中可知,publish type 為 ‘live‘,即伺服器不會儲存客戶端釋出的流到檔案中。

2.6.1 ngx_rtmp_cmd_publish_init

static ngx_int_t
ngx_rtmp_cmd_publish_init(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
        ngx_chain_t *in)
{
    static ngx_rtmp_publish_t       v;

    static ngx_rtmp_amf_elt_t      in_elts[] = {

        /* transaction is always 0 */
        { NGX_RTMP_AMF_NUMBER,
          ngx_null_string,
          NULL, 0 },

        { NGX_RTMP_AMF_NULL,
          ngx_null_string,
          NULL, 0 },

        { NGX_RTMP_AMF_STRING,
          ngx_null_string,
          &v.name, sizeof(v.name) },

        { NGX_RTMP_AMF_OPTIONAL | NGX_RTMP_AMF_STRING,
          ngx_null_string,
          &v.type, sizeof(v.type) },
    };

    ngx_memzero(&v, sizeof(v));

    /* 從 publish 命令訊息中獲取 in_elts 中指定的值 */
    if (ngx_rtmp_receive_amf(s, in, in_elts,
                             sizeof(in_elts) / sizeof(in_elts[0])))
    {
        return NGX_ERROR;
    }

    ngx_rtmp_cmd_fill_args(v.name, v.args);

    ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
                  "publish: name=‘%s‘ args=‘%s‘ type=%s silent=%d",
                  v.name, v.args, v.type, v.silent);

    return ngx_rtmp_publish(s, &v);
}

接著,該函式開始呼叫 ngx_rtmp_publish 構建的函式連結串列。從 nginx-rtmp 的原始碼和 nginx.conf 的配置可知,主要呼叫
ngx_rtmp_relay_publish 和 ngx_rtmp_live_publish 兩個函式。

由 rtmp 模組的排序,首先呼叫 ngx_rtmp_relay_publish。

2.6.2 ngx_rtmp_relay_publish

static ngx_int_t
ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
    ngx_rtmp_relay_app_conf_t      *racf;
    ngx_rtmp_relay_target_t        *target, **t;
    ngx_str_t                       name;
    size_t                          n;
    ngx_rtmp_relay_ctx_t           *ctx;

    if (s->auto_pushed) {
        goto next;
    }

    ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
    if (ctx && s->relay) {
        goto next;
    }

    racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
    if (racf == NULL || racf->pushes.nelts == 0) {
        goto next;
    }

    /* v->name 中儲存的是從客戶端傳送的 publish 命令訊息中提取出的要釋出的流名稱 */
    name.len = ngx_strlen(v->name);
    name.data = v->name;
    
    /* 從 pushes 陣列中取出首元素,遍歷該陣列 */
    t = racf->pushes.elts;
    for (n = 0; n < racf->pushes.nelts; ++n, ++t) {
        target = *t;

        /* 配置檔案中是否指定了要推流的名稱,若是,則檢測指定的流名字與當前接收到的publish 流名
         * 是否一致 */
        if (target->name.len && (name.len != target->name.len ||
            ngx_memcmp(name.data, target->name.data, name.len)))
        {
            continue;
        }

        
        if (ngx_rtmp_relay_push(s, &name, target) == NGX_OK) {
            continue;
        }

        ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
                "relay: push failed name=‘%V‘ app=‘%V‘ "
                "playpath=‘%V‘ url=‘%V‘",
                &name, &target->app, &target->play_path,
                &target->url.url);

        if (!ctx->push_evt.timer_set) {
            ngx_add_timer(&ctx->push_evt, racf->push_reconnect);
        }
    }

next:
    return next_publish(s, v);
}

2.6.3 ngx_rtmp_relay_push

ngx_int_t
ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,
        ngx_rtmp_relay_target_t *target)
{
    ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
            "relay: create push name=‘%V‘ app=‘%V‘ playpath=‘%V‘ url=‘%V‘",
            name, &target->app, &target->play_path, &target->url.url);

    return ngx_rtmp_relay_create(s, name, target,
            ngx_rtmp_relay_create_local_ctx,
            ngx_rtmp_relay_create_remote_ctx);
}

2.6.4 ngx_rtmp_relay_create

static ngx_int_t
ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *name,
        ngx_rtmp_relay_target_t *target,
        ngx_rtmp_relay_create_ctx_pt create_publish_ctx,
        ngx_rtmp_relay_create_ctx_pt create_play_ctx)
{
    ngx_rtmp_relay_app_conf_t      *racf;
    ngx_rtmp_relay_ctx_t           *publish_ctx, *play_ctx, **cctx;
    ngx_uint_t                      hash;


    racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
    if (racf == NULL) {
        return NGX_ERROR;
    }

    /* 該函式主要是建立一個新的連線,連線推流url中指定的地址,即將該地址作為上游伺服器的地址,
     * 向該上游伺服器發起連線 */
    play_ctx = create_play_ctx(s, name, target);
    if (play_ctx == NULL) {
        return NGX_ERROR;
    }
    
    hash = ngx_hash_key(name->data, name->len);
    cctx = &racf->ctx[hash % racf->nbuckets];
    for (; *cctx; cctx = &(*cctx)->next) {
        if ((*cctx)->name.len == name->len
            && !ngx_memcmp(name->data, (*cctx)->name.data,
                name->len))
        {
            break;
        }
    }

    if (*cctx) {
        play_ctx->publish = (*cctx)->publish;
        play_ctx->next = (*cctx)->play;
        (*cctx)->play = play_ctx;
        return NGX_OK;
    }

    /* 建立一個本地 ngx_rtmp_relay_ctx_t */
    publish_ctx = create_publish_ctx(s, name, target);
    if (publish_ctx == NULL) {
        ngx_rtmp_finalize_session(play_ctx->session);
        return NGX_ERROR;
    }

    publish_ctx->publish = publish_ctx;
    publish_ctx->play = play_ctx;
    play_ctx->publish = publish_ctx;
    *cctx = publish_ctx;

    return NGX_OK;
}

2.6.4.1 ngx_rtmp_relay_create_remote_ctx

static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t* name,
        ngx_rtmp_relay_target_t *target)
{
    ngx_rtmp_conf_ctx_t         cctx;

    cctx.app_conf = s->app_conf;
    cctx.srv_conf = s->srv_conf;
    cctx.main_conf = s->main_conf;

    return ngx_rtmp_relay_create_connection(&cctx, name, target);
}

2.6.4.2 ngx_rtmp_relay_create_connection

static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,
        ngx_rtmp_relay_target_t *target)
{
    ngx_rtmp_relay_app_conf_t      *racf;
    ngx_rtmp_relay_ctx_t           *rctx;
    ngx_rtmp_addr_conf_t           *addr_conf;
    ngx_rtmp_conf_ctx_t            *addr_ctx;
    ngx_rtmp_session_t             *rs;
    ngx_peer_connection_t          *pc;
    ngx_connection_t               *c;
    ngx_addr_t                     *addr;
    ngx_pool_t                     *pool;
    ngx_int_t                       rc;
    ngx_str_t                       v, *uri;
    u_char                         *first, *last, *p;

    racf = ngx_rtmp_get_module_app_conf(cctx, ngx_rtmp_relay_module);

    ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
                   "relay: create remote context");

    pool = NULL;
    /* 分配一個記憶體池 */
    pool = ngx_create_pool(4096, racf->log);
    if (pool == NULL) {
        return NULL;
    }

    /* 從記憶體池中為 ngx_rtmp_relay_ctx_t 結構體分配記憶體 */
    rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));
    if (rctx == NULL) {
        goto clear;
    }

    /* 將釋出的流名拷貝到新建的 ngx_rtmp_relay_ctx_t 中的 name 成員 */
    if (name && ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK) {
        goto clear;
    }

    /* 將配置檔案中配置的 push 推流地址,即 url 拷貝到新建的 ngx_rtmp_relay_ctx_t
     * 結構體的 url 成員中 */
    if (ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) {
        goto clear;
    }

    /* target->tag 指向 ngx_rtmp_relay_module 結構體的首地址 */
    rctx->tag = target->tag;
    /* target->data 指向當前 data 所屬的 ngx_rtmp_relay_ctx_t 結構體的首地址 */
    rctx->data = target->data;

#define NGX_RTMP_RELAY_STR_COPY(to, from)                                         if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) {          goto clear;                                                               }

    /* 將以下 target 中的值拷貝到新建的 ngx_rtmp_relay_ctx_t 結構體的相應成員中 */
    NGX_RTMP_RELAY_STR_COPY(app,        app);
    NGX_RTMP_RELAY_STR_COPY(tc_url,     tc_url);
    NGX_RTMP_RELAY_STR_COPY(page_url,   page_url);
    NGX_RTMP_RELAY_STR_COPY(swf_url,    swf_url);
    NGX_RTMP_RELAY_STR_COPY(flash_ver,  flash_ver);
    NGX_RTMP_RELAY_STR_COPY(play_path,  play_path);

    
    rctx->live  = target->live;
    rctx->start = target->start;
    rctx->stop  = target->stop;

#undef NGX_RTMP_RELAY_STR_COPY

    /* 若 app 的值未知 */
    if (rctx->app.len == 0 || rctx->play_path.len == 0) {
        /* 這裡是從推流地址中提取出 app 的值,下面分析以 "push rtmp:192.168.1.82:1935/live;"  
         * 為例,則提出的 live 將賦給 rctx->app */
        /* parse uri */
        uri = &target->url.uri;
        first = uri->data;
        last  = uri->data + uri->len;
        if (first != last && *first == ‘/‘) {
            ++first;
        }

        if (first != last) {

            /* deduce app */
            p = ngx_strlchr(first, last, ‘/‘);
            if (p == NULL) {
                p = last;
            }

            if (rctx->app.len == 0 && first != p) {
                /* 這裡 v.data 指向 "live" */
                v.data = first;
                v.len = p - first;
                /* 將 "live" 賦給 rctx->app */
                if (ngx_rtmp_relay_copy_str(pool, &rctx->app, &v) != NGX_OK) {
                    goto clear;
                }
            }

            /* deduce play_path */
            if (p != last) {
                ++p;
            }

            /* 若播放路徑為 NULL 且 p 不等於 last(注,這裡 p 不等於 last 意味著 
             * "push rtmp:192.168.1.82:1935/live;" 的 "live" 字串後面還有資料,
             * 但是,這裡沒有)*/
            if (rctx->play_path.len == 0 && p != last) {
                v.data = p;
                v.len = last - p;
                if (ngx_rtmp_relay_copy_str(pool, &rctx->play_path, &v)
                        != NGX_OK)
                {
                    goto clear;
                }
            }
        }
    }

    /* 從記憶體池中為主動連線結構體 ngx_peer_connection_t 分配記憶體 */
    pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));
    if (pc == NULL) {
        goto clear;
    }

    if (target->url.naddrs == 0) {
        ngx_log_error(NGX_LOG_ERR, racf->log, 0,
                      "relay: no address");
        goto clear;
    }

    /* get address */
    /* 獲取 推流地址 url 中指明的伺服器地址(即推流的目標地址)
     * 如"push rtmp:192.168.1.82:1935/live;" 中的 "192.168.1.82:1935" */
    addr = &target->url.addrs[target->counter % target->url.naddrs];
    target->counter++;

    /* copy log to keep shared log unchanged */
    rctx->log = *racf->log;
    pc->log = &rctx->log;
    /* 當使用長連線與上游伺服器通訊時,可通過該方法由連線池中獲取一個新連線 */
    pc->get = ngx_rtmp_relay_get_peer;
    /* 當使用長連線與上游伺服器通訊時,通過該方法將使用完畢的連線釋放給連線池 */
    pc->free = ngx_rtmp_relay_free_peer;
    /* 遠端伺服器的名稱,這裡其實就是 "192.168.1.82:1935" 該串字串 */
    pc->name = &addr->name;
    pc->socklen = addr->socklen;
    pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen);
    if (pc->sockaddr == NULL) {
        goto clear;
    }
    /* 將 addr->sockaddr 中儲存的遠端伺服器的地址資訊拷貝到 pc->sockaddr 中 */
    ngx_memcpy(pc->sockaddr, addr->sockaddr, pc->socklen);

    /* 開始連線上游伺服器 */
    rc = ngx_event_connect_peer(pc);
    /* 由 ngx_event_connect_peer 原始碼可知,因為 socket 套接字被設定為非阻塞,
     * 因為首次 connect 必定失敗,因此該函式返回 NGX_AGAIN */
    if (rc != NGX_OK && rc != NGX_AGAIN ) {
        ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
                "relay: connection failed");
        goto clear;
    }
    c = pc->connection;
    c->pool = pool;
    /* 推流 URL */
    c->addr_text = rctx->url;

    addr_conf = ngx_pcalloc(pool, sizeof(ngx_rtmp_addr_conf_t));
    if (addr_conf == NULL) {
        goto clear;
    }
    addr_ctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_conf_ctx_t));
    if (addr_ctx == NULL) {
        goto clear;
    }
    addr_conf->ctx = addr_ctx;
    addr_ctx->main_conf = cctx->main_conf;
    addr_ctx->srv_conf  = cctx->srv_conf;
    ngx_str_set(&addr_conf->addr_text, "ngx-relay");

    /* 為該主動連線初始化一個會話 */
    rs = ngx_rtmp_init_session(c, addr_conf);
    if (rs == NULL) {
        /* no need to destroy pool */
        return NULL;
    }
    rs->app_conf = cctx->app_conf;
    /* 置該標誌位為 1 */
    rs->relay = 1;
    rctx->session = rs;
    ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module);
    ngx_str_set(&rs->flashver, "ngx-local-relay");

#if (NGX_STAT_STUB)
    (void) ngx_atomic_fetch_add(ngx_stat_active, 1);
#endif

    /* 此時作為客戶端,開始向上遊伺服器發說送 hanshake 包,即 C0 + C1 */
    ngx_rtmp_client_handshake(rs, 1);
    return rctx;

clear:
    if (pool) {
        ngx_destroy_pool(pool);
    }
    return NULL;
}

2.6.4.3 ngx_event_connect_peer

ngx_int_t
ngx_event_connect_peer(ngx_peer_connection_t *pc)
{
    int                rc, type;
#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)
    in_port_t          port;
#endif
    ngx_int_t          event;
    ngx_err_t          err;
    ngx_uint_t         level;
    ngx_socket_t       s;
    ngx_event_t       *rev, *wev;
    ngx_connection_t  *c;

    /* 該 get 方法其實沒有做任何處理 */
    rc = pc->get(pc, pc->data);
    if (rc != NGX_OK) {
        return rc;
    }

    type = (pc->type ? pc->type : SOCK_STREAM);

    /* 建立一個 socket 套接字 */
    s = ngx_socket(pc->sockaddr->sa_family, type, 0);

    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "%s socket %d",
                   (type == SOCK_STREAM) ? "stream" : "dgram", s);

    if (s == (ngx_socket_t) -1) {
        ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                      ngx_socket_n " failed");
        return NGX_ERROR;
    }
    
    /* 從連線池中獲取一個空閒連線 */
    c = ngx_get_connection(s, pc->log);

    if (c == NULL) {
        if (ngx_close_socket(s) == -1) {
            ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                          ngx_close_socket_n "failed");
        }

        return NGX_ERROR;
    }

    /* 當前 socket 的型別,是 STREAM 還是 DGRAM,這裡為 STREAM */
    c->type = type;

    /* 若設定了接收緩衝區的大小,從上面知沒有設定 */
    if (pc->rcvbuf) {
        if (setsockopt(s, SOL_SOCKET, SO_RCVBUF,
                       (const void *) &pc->rcvbuf, sizeof(int)) == -1)
        {
            ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                          "setsockopt(SO_RCVBUF) failed");
            goto failed;
        }
    }

    /* 將該 socket 套接字設定為非阻塞 */
    if (ngx_nonblocking(s) == -1) {
        ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                      ngx_nonblocking_n " failed");

        goto failed;
    }
    
    /* local 儲存的是本地地址資訊,則上面可知,沒有設定 */
    if (pc->local) {

#if (NGX_HAVE_TRANSPARENT_PROXY)
        if (pc->transparent) {
            if (ngx_event_connect_set_transparent(pc, s) != NGX_OK) {
                goto failed;
            }
        }
#endif

#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT || NGX_LINUX)
        port = ngx_inet_get_port(pc->local->sockaddr);
#endif

#if (NGX_HAVE_IP_BIND_ADDRESS_NO_PORT)

        if (pc->sockaddr->sa_family != AF_UNIX && port == 0) {
            static int  bind_address_no_port = 1;

            if (bind_address_no_port) {
                if (setsockopt(s, IPPROTO_IP, IP_BIND_ADDRESS_NO_PORT,
                               (const void *) &bind_address_no_port,
                               sizeof(int)) == -1)
                {
                    err = ngx_socket_errno;

                    if (err != NGX_EOPNOTSUPP && err != NGX_ENOPROTOOPT) {
                        ngx_log_error(NGX_LOG_ALERT, pc->log, err,
                                      "setsockopt(IP_BIND_ADDRESS_NO_PORT) "
                                      "failed, ignored");

                    } else {
                        bind_address_no_port = 0;
                    }
                }
            }
        }

#endif

#if (NGX_LINUX)

        if (pc->type == SOCK_DGRAM && port != 0) {
            int  reuse_addr = 1;

            if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
                           (const void *) &reuse_addr, sizeof(int))
                 == -1)
            {
                ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                              "setsockopt(SO_REUSEADDR) failed");
                goto failed;
            }
        }

#endif

        if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) {
            ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,
                          "bind(%V) failed", &pc->local->name);

            goto failed;
        }
    }

    if (type == SOCK_STREAM) {
        /* 設定當前連線的 IO 回撥函式 */
        c->recv = ngx_recv;
        c->send = ngx_send;
        c->recv_chain = ngx_recv_chain;
        c->send_chain = ngx_send_chain;

        /* 使用 sendfile */
        c->sendfile = 1;

        if (pc->sockaddr->sa_family == AF_UNIX) {
            c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
            c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;

#if (NGX_SOLARIS)
            /* Solaris‘s sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
            c->sendfile = 0;
#endif
        }

    } else { /* type == SOCK_DGRAM */
        c->recv = ngx_udp_recv;
        c->send = ngx_send;
        c->send_chain = ngx_udp_send_chain;
    }

    c->log_error = pc->log_error;

    /* 設定當前主動連線讀寫事件的回撥函式 */
    rev = c->read;
    wev = c->write;

    rev->log = pc->log;
    wev->log = pc->log;

    pc->connection = c;

    c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);

    /* 將該主動連線的讀寫事件新增到 epoll 等事件監控機制中 */
    if (ngx_add_conn) {
        if (ngx_add_conn(c) == NGX_ERROR) {
            goto failed;
        }
    }

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
                   "connect to %V, fd:%d #%uA", pc->name, s, c->number);

    /* 連線該上游伺服器,因為該 socket 套接字被設定為非阻塞,因此首次connect返回 -1,即失敗 */
    rc = connect(s, pc->sockaddr, pc->socklen);

    if (rc == -1) {
        err = ngx_socket_errno;


        if (err != NGX_EINPROGRESS
#if (NGX_WIN32)
            /* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */
            && err != NGX_EAGAIN
#endif
            )
        {
            if (err == NGX_ECONNREFUSED
#if (NGX_LINUX)
                /*
                 * Linux returns EAGAIN instead of ECONNREFUSED
                 * for unix sockets if listen queue is full
                 */
                || err == NGX_EAGAIN
#endif
                || err == NGX_ECONNRESET
                || err == NGX_ENETDOWN
                || err == NGX_ENETUNREACH
                || err == NGX_EHOSTDOWN
                || err == NGX_EHOSTUNREACH)
            {
                level = NGX_LOG_ERR;

            } else {
                level = NGX_LOG_CRIT;
            }

            ngx_log_error(level, c->log, err, "connect() to %V failed",
                          pc->name);

            ngx_close_connection(c);
            pc->connection = NULL;

            return NGX_DECLINED;
        }
    }

    /* 因此,從這裡返回 NGX_AGAIN */
    if (ngx_add_conn) {
        if (rc == -1) {

            /* NGX_EINPROGRESS */

            return NGX_AGAIN;
        }

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");

        wev->ready = 1;

        return NGX_OK;
    }

    if (ngx_event_flags & NGX_USE_IOCP_EVENT) {

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, ngx_socket_errno,
                       "connect(): %d", rc);

        if (ngx_blocking(s) == -1) {
            ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                          ngx_blocking_n " failed");
            goto failed;
        }

        /*
         * FreeBSD‘s aio allows to post an operation on non-connected socket.
         * NT does not support it.
         *
         * TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT
         */

        rev->ready = 1;
        wev->ready = 1;

        return NGX_OK;
    }

    if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {

        /* kqueue */

        event = NGX_CLEAR_EVENT;

    } else {

        /* select, poll, /dev/poll */

        event = NGX_LEVEL_EVENT;
    }

    if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) {
        goto failed;
    }

    if (rc == -1) {

        /* NGX_EINPROGRESS */

        if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) {
            goto failed;
        }

        return NGX_AGAIN;
    }

    ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");

    wev->ready = 1;

    return NGX_OK;

failed:

    ngx_close_connection(c);
    pc->connection = NULL;

    return NGX_ERROR;
}

2.6.4.4 ngx_rtmp_client_handshake

void
ngx_rtmp_client_handshake(ngx_rtmp_session_t *s, unsigned async)
{
    ngx_connection_t           *c;

    c = s->connection;
    /* 設定當前連線讀寫事件的回撥函式 */
    c->read->handler =  ngx_rtmp_handshake_recv;
    c->write->handler = ngx_rtmp_handshake_send;

    ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
            "handshake: start client handshake");

    /* 為該將要進行的 hanshake 過程分配資料快取,用於儲存接收/響應的 hanshake 包 */
    s->hs_buf = ngx_rtmp_alloc_handshake_buffer(s);
    /* 設定當前 hanshake 階段,即為 client send: C0 + C1 */
    s->hs_stage = NGX_RTMP_HANDSHAKE_CLIENT_SEND_CHALLENGE;

    /* 構建 C0 + C1 的 資料包 */
    if (ngx_rtmp_handshake_create_challenge(s,
                ngx_rtmp_client_version,
                &ngx_rtmp_client_partial_key) != NGX_OK)
    {
        ngx_rtmp_finalize_session(s);
        return;
    }

    /* 有前面的呼叫傳入的引數可知,該值為 1,即為非同步,因此這裡暫時不向上游伺服器傳送 handshake,
     * 而是將其寫事件新增到定時器和 epoll 中,等待下次迴圈監控到該寫事件可寫時才傳送 C0 + C1 */
    if (async) {
        /* 將該寫事件新增到定時器中,超時時間為 s->timeout */
        ngx_add_timer(c->write, s->timeout);
        /* 將該寫事件新增到 epoll 等事件監控機制中 */
        if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
            ngx_rtmp_finalize_session(s);
        }
        return;
    }

    ngx_rtmp_handshake_send(c->write);
}

2.6.4.5 ngx_rtmp_relay_create_local_ctx

static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,
        ngx_rtmp_relay_target_t *target)
{
    ngx_rtmp_relay_ctx_t           *ctx;

    ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "relay: create local context");

    ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
    if (ctx == NULL) {
        ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));
        if (ctx == NULL) {
            return NULL;
        }
        ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);
    }
    ctx->session = s;

    ctx->push_evt.data = s;
    ctx->push_evt.log = s->connection->log;
    /* 設定該 push_evt 事件的回撥函式 */
    ctx->push_evt.handler = ngx_rtmp_relay_push_reconnect;

    if (ctx->publish) {
        return NULL;
    }

    if (ngx_rtmp_relay_copy_str(s->connection->pool, &ctx->name, name)
            != NGX_OK)
    {
        return NULL;
    }

    return ctx;
}

從 ngx_rtmp_relay_create_local_ctx 函式返回後,就一直返回到 ngx_rtmp_relay_publish 函式中,接著執行 next_publish 的下
一個函式。這裡為 ngx_rtmp_live_publish。

2.6.5 ngx_rtmp_live_publish

static ngx_int_t
ngx_rtmp_live_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
    ngx_rtmp_live_app_conf_t       *lacf;
    ngx_rtmp_live_ctx_t            *ctx;

    lacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_live_module);

    if (lacf == NULL || !lacf->live) {
        goto next;
    }

    ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                   "live: publish: name=‘%s‘ type=‘%s‘",
                   v->name, v->type);

    /* join stream as publisher */

    /* 構建一個 ngx_rtmp_live_ctx_t 結構體作為釋出者 */
    ngx_rtmp_live_join(s, v->name, 1);

    /* 這裡獲取到的就是上面構建的 ngx_rtmp_live_ctx_t 結構體  */
    ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_live_module);
    if (ctx == NULL || !ctx->publishing) {
        goto next;
    }

    ctx->silent = v->silent;

    if (!ctx->silent) {
        /* 對之前客戶端傳送的 publish 返回一個響應 */
        ngx_rtmp_send_status(s, "NetStream.Publish.Start",
                             "status", "Start publishing");
    }

next:
    return next_publish(s, v);
}

send: onStatus(‘NetStream.Publish.Start‘) 圖(12)

技術分享圖片

之後又回到 epoll_wait 處,等待監聽的事件觸發。接下來的分析先看 nginx 的一段列印:

ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle                                                     
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59761                                                
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0004 d:088F6950                                    
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705070                                         
ngx_send.c:ngx_unix_send:37 send: fd:9 1537 of 1537
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:3 ev:00002001                              
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 7                                            
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0                                                               
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 9: 60000:958705071                                   
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:5 ev:0001 d:088F67E8                                    
ngx_event_accept.c:ngx_event_accept:58 accept on 0.0.0.0:1935, ready: 0                                           
ngx_alloc.c:ngx_memalign:66 posix_memalign: 08930870:4096 @16                                                     
ngx_event_accept.c:ngx_event_accept:293 *3 accept: 192.168.1.82:39334 fd:10                                       
ngx_rtmp_init.c:ngx_rtmp_init_connection:124 *3 client connected ‘192.168.1.82‘                                    
ngx_rtmp_handler.c:ngx_rtmp_set_chunk_size:823 setting chunk_size=128                                          
ngx_alloc.c:ngx_memalign:66 posix_memalign: 089318A0:4096 @16                                                  
ngx_rtmp_limit_module.c:ngx_rtmp_limit_connect:87 rtmp limit: connect                                          
ngx_rtmp_handshake.c:ngx_rtmp_handshake:589 handshake: start server handshake                                  
ngx_rtmp_handshake.c:ngx_rtmp_alloc_handshake_buffer:208 handshake: allocating buffer                          
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:0                                                               
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071                                  
ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001                             
ngx_event.c:ngx_process_events_and_timers:247 timer delta: 1                                                      
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle                                                     
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760                                                
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:10 ev:0001 d:088F69C8                                   
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 10: 958705071                                        
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1                                                               
ngx_recv.c:ngx_unix_recv:72 recv: fd:10 1537 of 1537                                                           
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:10 op:2 ev:00000000                             
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 2                                            
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=14.13.0.12 epoch=958645070 
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=638                 
ngx_send.c:ngx_unix_send:37 send: fd:10 1537 of 1537                                                           
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 3                                            
ngx_send.c:ngx_unix_send:37 send: fd:10 1536 of 1536                                                           
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 4                                            
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1                                                               
ngx_recv.c:ngx_unix_recv:72 recv: fd:10 -1 of 1536                                                             
ngx_recv.c:ngx_unix_recv:150 recv() not ready (11: Resource temporarily unavailable)                           
ngx_event_timer.h:ngx_event_add_timer:82 event timer add: 10: 60000:958705071                                  
ngx_epoll_module.c:ngx_epoll_add_event:625 epoll add event: fd:10 op:1 ev:80002001                             
ngx_event.c:ngx_process_events_and_timers:247 timer delta: 0                                                      
ngx_process_cycle.c:ngx_single_process_cycle:307 worker cycle                                                     
ngx_epoll_module.c:ngx_epoll_process_events:798 epoll timer: 59760                                                
ngx_epoll_module.c:ngx_epoll_process_events:860 epoll: fd:9 ev:0001 d:088F6950                                    
ngx_event_timer.h:ngx_event_del_timer:36 event timer del: 9: 958705071                                         
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1                                                               
ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1537 of 1537                                                            
ngx_epoll_module.c:ngx_epoll_del_event:686 epoll del event: fd:9 op:2 ev:00000000                              
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 8                                            
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:303 handshake: peer version=13.10.14.13 epoch=958645071
ngx_rtmp_handshake.c:ngx_rtmp_handshake_parse_challenge:320 handshake: digest found at pos=557                 
ngx_recv.c:ngx_unix_recv:58 recv: eof:0, avail:1                                                               
ngx_recv.c:ngx_unix_recv:72 recv: fd:9 1536 of 1536                                                            
ngx_rtmp_handshake.c:ngx_rtmp_handshake_recv:429 handshake: stage 9                                            
ngx_send.c:ngx_unix_send:37 send: fd:9 1536 of 1536                                                            
ngx_rtmp_handshake.c:ngx_rtmp_handshake_send:544 handshake: stage 10                                           
ngx_rtmp_handshake.c:ngx_rtmp_handshake_done:362 handshake: done                                               
ngx_rtmp_relay_module.c:ngx_rtmp_relay_handshake_done:1319 rtmp relay module: handhshake done

首先 fd = 9 為連線上游伺服器(192.168.1.82:1935) 時建立的作為客戶端的 STREAM 型別的 socket 套接字,而 fd = 5 為 nginx
啟動時建立的 STREAM 型別的 socket 監聽套接字。因此,從列印中可以看出,上面的列印是這麼一個流程:

  1. epoll 監聽的 fd 為 9 的套接字可寫,因此呼叫該套接字上寫事件的回撥函式,從之前的原始碼可知,為
    ngx_rtmp_handshake_send 函式,該函式將已經準備好的 C0 和 C1 通過該寫事件對應的 send 函式,即
    ngx_unix_send 函式傳送給上游伺服器(192.168.1.82:1935);傳送完後進入 CLIENT_RECV_CHALLENGE(7) 階段,
    該階段為等待接收伺服器 S0 和 S1 的階段;
  2. epool 監控到伺服器 fd:5 有資料可讀,且為新連線,因此呼叫 ngx_event_accept 接收該客戶端(192.168.1.82:39334)的
    連線,接受連線後伺服器使用 fd:10 與客戶端進行互動,接著伺服器開始進入 handshake 階段;
  3. 下面就開始了伺服器 (192.168.1.82:1935, fd = 10) 和 客戶端(192.168.1.82:39334, fd = 9) 的 hanshake 過程,就不再詳
    述,和之前分析的 hanshake 一樣。

客戶端傳送 C2 後,會進入 NGX_RTMP_HANDSHAKE_CLIENT_DONE(10) 階段,接著會呼叫該函式 ngx_rtmp_handshake_done:

static void
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
{
    ngx_rtmp_free_handshake_buffers(s);

    ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
            "handshake: done");

    if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,
                NULL, NULL) != NGX_OK)
    {
        ngx_rtmp_finalize_session(s);
        return;
    }

    ngx_rtmp_cycle(s);
}

該函式接著會呼叫到 ngx_rtmp_relay_handshake_done 函式:

static ngx_int_t
ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
        ngx_chain_t *in)
{
    ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: handhshake done");
    
    ngx_rtmp_relay_ctx_t   *ctx;

    ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
    if (ctx == NULL || !s->relay) {
        ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: return");
        return NGX_OK;
    }

    /* 主要是向伺服器傳送 connect 連線命令 */
    return ngx_rtmp_relay_send_connect(s);
}

2.7 客戶端(fd = 9)傳送:connect

客戶端(192.168.1.82:39334, fd = 9) hanshake 成功後會向伺服器傳送 connec 連線命令。

2.7.1 ngx_rtmp_relay_send_connect

static ngx_int_t
ngx_rtmp_relay_send_connect(ngx_rtmp_session_t *s)
{
    ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "rtmp relay module: send connect");

    static double               trans = NGX_RTMP_RELAY_CONNECT_TRANS;
    static double               acodecs = 3575;
    static double               vcodecs = 252;

    static ngx_rtmp_amf_elt_t   out_cmd[] = {

        { NGX_RTMP_AMF_STRING,
          ngx_string("app"),
          NULL, 0 }, /* <-- fill */

        { NGX_RTMP_AMF_STRING,
          ngx_string("tcUrl"),
          NULL, 0 }, /* <-- fill */

        { NGX_RTMP_AMF_STRING,
          ngx_string("pageUrl"),
          NULL, 0 }, /* <-- fill */

        { NGX_RTMP_AMF_STRING,
          ngx_string("swfUrl"),
          NULL, 0 }, /* <-- fill */

        { NGX_RTMP_AMF_STRING,
          ngx_string("flashVer"),
          NULL, 0 }, /* <-- fill */

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("audioCodecs"),
          &acodecs, 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("videoCodecs"),
          &vcodecs, 0 }
    };

    static ngx_rtmp_amf_elt_t   out_elts[] = {

        { NGX_RTMP_AMF_STRING,
          ngx_null_string,
          "connect", 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_null_string,
          &trans, 0 },

        { NGX_RTMP_AMF_OBJECT,
          ngx_null_string,
          out_cmd, sizeof(out_cmd) }
    };

    ngx_rtmp_core_app_conf_t   *cacf;
    ngx_rtmp_core_srv_conf_t   *cscf;
    ngx_rtmp_relay_ctx_t       *ctx;
    ngx_rtmp_header_t           h;
    size_t                      len, url_len;
    u_char                     *p, *url_end;


    cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);
    cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
    ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
    if (cacf == NULL || ctx == NULL) {
        return NGX_ERROR;
    }

    /* app */
    if (ctx->app.len) {
        out_cmd[0].data = ctx->app.data;
        out_cmd[0].len  = ctx->app.len;
    } else {
        out_cmd[0].data = cacf->name.data;
        out_cmd[0].len  = cacf->name.len;
    }

    /* tcUrl */
    if (ctx->tc_url.len) {
        out_cmd[1].data = ctx->tc_url.data;
        out_cmd[1].len  = ctx->tc_url.len;
    } else {
        len = sizeof("rtmp://") - 1 + ctx->url.len +
            sizeof("/") - 1 + ctx->app.len;
        p = ngx_palloc(s->connection->pool, len);
        if (p == NULL) {
            return NGX_ERROR;
        }
        out_cmd[1].data = p;
        p = ngx_cpymem(p, "rtmp://", sizeof("rtmp://") - 1);

        url_len = ctx->url.len;
        url_end = ngx_strlchr(ctx->url.data, ctx->url.data + ctx->url.len, ‘/‘);
        if (url_end) {
            url_len = (size_t) (url_end - ctx->url.data);
        }

        p = ngx_cpymem(p, ctx->url.data, url_len);
        *p++ = ‘/‘;
        p = ngx_cpymem(p, ctx->app.data, ctx->app.len);
        out_cmd[1].len = p - (u_char *)out_cmd[1].data;
    }

    /* pageUrl */
    out_cmd[2].data = ctx->page_url.data;
    out_cmd[2].len  = ctx->page_url.len;

    /* swfUrl */
    out_cmd[3].data = ctx->swf_url.data;
    out_cmd[3].len  = ctx->swf_url.len;

    /* flashVer */
    if (ctx->flash_ver.len) {
        out_cmd[4].data = ctx->flash_ver.data;
        out_cmd[4].len  = ctx->flash_ver.len;
    } else {
        out_cmd[4].data = NGX_RTMP_RELAY_FLASHVER;
        out_cmd[4].len  = sizeof(NGX_RTMP_RELAY_FLASHVER) - 1;
    }

    ngx_memzero(&h, sizeof(h));
    h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
    h.type = NGX_RTMP_MSG_AMF_CMD;

    return ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK
        || ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK
        || ngx_rtmp_send_amf(s, &h, out_elts,
            sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
        ? NGX_ERROR
        : NGX_OK;
}

傳送完這幾個 RTMP 包,後,又回到 epoll_wait 中進行監聽。

下面的分析區分一個伺服器,兩個客戶端:

  • 伺服器:192.168.1.82:1935
  • 客戶端:obs 推流
  • 客戶端:192.168.1.82:xxxx

2.8 伺服器 接收 客戶端 obs: amf_meta(18)

此時,監聽到 obs 客戶端傳送的型別為 amf_meta(18) 的 rtmp 訊息。

receive: @setDataFrame(meta_data 18) 圖(13)

技術分享圖片
對於 "@setDataFrame",僅有 ngx_rtmp_codec_module 模組對其設定了會調函式,為 ngx_rtmp_codec_meta_data 函式:

2.8.1 ngx_rtmp_codec_meta_data

static ngx_int_t
ngx_rtmp_codec_meta_data(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
        ngx_chain_t *in)
{
    ngx_rtmp_codec_app_conf_t      *cacf;
    ngx_rtmp_codec_ctx_t           *ctx;
    ngx_uint_t                      skip;

    static struct {
        double                      width;
        double                      height;
        double                      duration;
        double                      frame_rate;
        double                      video_data_rate;
        double                      video_codec_id_n;
        u_char                      video_codec_id_s[32];
        double                      audio_data_rate;
        double                      audio_codec_id_n;
        u_char                      audio_codec_id_s[32];
        u_char                      profile[32];
        u_char                      level[32];
    }                               v;

    static ngx_rtmp_amf_elt_t       in_video_codec_id[] = {

        { NGX_RTMP_AMF_NUMBER,
          ngx_null_string,
          &v.video_codec_id_n, 0 },

        { NGX_RTMP_AMF_STRING,
          ngx_null_string,
          &v.video_codec_id_s, sizeof(v.video_codec_id_s) },
    };

    static ngx_rtmp_amf_elt_t       in_audio_codec_id[] = {

        { NGX_RTMP_AMF_NUMBER,
          ngx_null_string,
          &v.audio_codec_id_n, 0 },

        { NGX_RTMP_AMF_STRING,
          ngx_null_string,
          &v.audio_codec_id_s, sizeof(v.audio_codec_id_s) },
    };

    static ngx_rtmp_amf_elt_t       in_inf[] = {

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("width"),
          &v.width, 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("height"),
          &v.height, 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("duration"),
          &v.duration, 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("framerate"),
          &v.frame_rate, 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("fps"),
          &v.frame_rate, 0 },

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("videodatarate"),
          &v.video_data_rate, 0 },

        { NGX_RTMP_AMF_VARIANT,
          ngx_string("videocodecid"),
          in_video_codec_id, sizeof(in_video_codec_id) },

        { NGX_RTMP_AMF_NUMBER,
          ngx_string("audiodatarate"),
          &v.audio_data_rate, 0 },

        { NGX_RTMP_AMF_VARIANT,
          ngx_string("audiocodecid"),
          in_audio_codec_id, sizeof(in_audio_codec_id) },

        { NGX_RTMP_AMF_STRING,
          ngx_string("profile"),
          &v.profile, sizeof(v.profile) },

        { NGX_RTMP_AMF_STRING,
          ngx_string("level"),
          &v.level, sizeof(v.level) },
    };

    static ngx_rtmp_amf_elt_t       in_elts[] = {

        { NGX_RTMP_AMF_STRING,
          ngx_null_string,
          NULL, 0 },

        { NGX_RTMP_AMF_OBJECT,
          ngx_null_string,
          in_inf, sizeof(in_inf) },
    };

    cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_codec_module);

    ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_codec_module);
    if (ctx == NULL) {
        ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_codec_ctx_t));
        ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_codec_module);
    }

    ngx_memzero(&v, sizeof(v));

    /* use -1 as a sign of unchanged data;
     * 0 is a valid value for uncompressed audio */
    v.audio_codec_id_n = -1;

    /* FFmpeg sends a string in front of actal metadata; ignore it */
    skip = !(in->buf->last > in->buf->pos
            && *in->buf->pos == NGX_RTMP_AMF_STRING);
    if (ngx_rtmp_receive_amf(s, in, in_elts + skip,
                sizeof(in_elts) / sizeof(in_elts[0]) - skip))
    {
        ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
                "codec: error parsing data frame");
        return NGX_OK;
    }

    ctx->width = (ngx_uint_t) v.width;
    ctx->height = (ngx_uint_t) v.height;
    ctx->duration = (ngx_uint_t) v.duration;
    ctx->frame_rate = (ngx_uint_t) v.frame_rate;
    ctx->video_data_rate = (ngx_uint_t) v.video_data_rate;
    ctx->video_codec_id = (ngx_uint_t) v.video_codec_id_n;
    ctx->audio_data_rate = (ngx_uint_t) v.audio_data_rate;
    ctx->audio_codec_id = (v.audio_codec_id_n == -1
            ? 0 : v.audio_codec_id_n == 0
            ? NGX_RTMP_AUDIO_UNCOMPRESSED : (ngx_uint_t) v.audio_codec_id_n);
    ngx_memcpy(ctx->profile, v.profile, sizeof(v.profile));
    ngx_memcpy(ctx->level, v.level, sizeof(v.level));

    ngx_log_debug8(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
            "codec: data frame: "