1. 程式人生 > >SRS學習筆記10-SrsConnection及其子類分析

SRS學習筆記10-SrsConnection及其子類分析

when red ins parse discovery bsp for port std

SrsConnection類代表一個client的連接,其中封裝了st thread,用於在一個單獨的st thread裏處理一個client的服務請求. SrsConnection在 int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd)裏創建
SrsConnection* conn = NULL;
    if (type == SrsListenerRtmpStream) {
        conn = new SrsRtmpConn(this, client_stfd);
    } else
if (type == SrsListenerHttpApi) { #ifdef SRS_AUTO_HTTP_API conn = new SrsHttpApi(this, client_stfd, http_api_mux); #else srs_warn("close http client for server not support http-api"); srs_close_stfd(client_stfd); return ret; #endif } else if (type == SrsListenerHttpStream) { #ifdef SRS_AUTO_HTTP_SERVER conn
= new SrsResponseOnlyHttpConn(this, client_stfd, http_server); #else srs_warn("close http client for server not support http-server"); srs_close_stfd(client_stfd); return ret; #endif } else { // TODO: FIXME: handler others } srs_assert(conn);

創建完成後調用 conn->start()啟動,此後這個client的請求都在這個st thread 裏完成

SrsConection類

/**
 * the manager for connection.
* SrsServer類實現了這一接口,用來對SrsConnection類進行管理.
*/ class IConnectionManager { public: IConnectionManager(); virtual ~IConnectionManager(); public: /** * remove the specified connection. */ virtual void remove(SrsConnection* c) = 0; }; /** * the basic connection of SRS, * all connections accept from listener must extends from this base class, * server will add the connection to manager, and delete it when remove. */ class SrsConnection : public virtual ISrsOneCycleThreadHandler, public virtual IKbpsDelta { private: /** * each connection start a green thread, * when thread stop, the connection will be delete by server.
  * 業務處理線程
*/ SrsOneCycleThread* pthread; /** * the id of connection. */ int id; protected: /** * the manager object to manage the connection. */ IConnectionManager* manager; /** * the underlayer st fd handler. */ st_netfd_t stfd; /** * the ip of client. */ std::string ip; /** * whether the connection is disposed, * when disposed, connection should stop cycle and cleanup itself. */ bool disposed; /** * whether connection is expired, application definition. * when expired, the connection must never be served and quit ASAP. */ bool expired; public: SrsConnection(IConnectionManager* cm, st_netfd_t c); virtual ~SrsConnection(); public: /** * to dipose the connection. */ virtual void dispose(); /** * start the client green thread. * when server get a client from listener, * 1. server will create an concrete connection(for instance, RTMP connection), * 2. then add connection to its connection manager, * 3. start the client thread by invoke this start() * when client cycle thread stop, invoke the on_thread_stop(), which will use server * to remove the client by server->remove(this). */ virtual int start(); // interface ISrsOneCycleThreadHandler public: /** * the thread cycle function, * when serve connection completed, terminate the loop which will terminate the thread, * thread will invoke the on_thread_stop() when it terminated. */ virtual int cycle(); /** * when the thread cycle finished, thread will invoke the on_thread_stop(), * which will remove self from server, server will remove the connection from manager * then delete the connection. */ virtual void on_thread_stop(); public: /** * get the srs id which identify the client. */ virtual int srs_id(); /** * set connection to expired. */ virtual void expire(); protected: /** * for concrete connection to do the cycle.
  * 具體connection 的處理函數,參見SrsRtmpConn.
*/ virtual int do_cycle() = 0; };
int SrsConnection::start()
{
  // 啟動SrsOneCycleThread類
return pthread->start(); } int SrsConnection::cycle() { int ret = ERROR_SUCCESS; //這是個全局的id生成器,也用來生成st thread的id _srs_context->generate_id(); id = _srs_context->get_id(); // 取得客戶端的ip地址 ip = srs_get_peer_ip(st_netfd_fileno(stfd)); // 調用子類如SrsRtmpConn的處理函數 ret = do_cycle(); // if socket io error, set to closed. if (srs_is_client_gracefully_close(ret)) { ret = ERROR_SOCKET_CLOSED; } // success. if (ret == ERROR_SUCCESS) { srs_trace("client finished."); } // client close peer. if (ret == ERROR_SOCKET_CLOSED) { srs_warn("client disconnect peer. ret=%d", ret); } return ERROR_SUCCESS; } void SrsConnection::on_thread_stop() { // TODO: FIXME: never remove itself, use isolate thread to do cleanup.
  // 用在SrsServer類的conns向量數組中刪除此對象
   manager->remove(this); } int SrsConnection::srs_id() { return id; } void SrsConnection::expire() { expired = true; }

SrsRtmpConn類

SrsRtmpConn::SrsRtmpConn(SrsServer* svr, st_netfd_t c)
    : SrsConnection(svr, c)
{
    server = svr;
    req = new SrsRequest();
    res = new SrsResponse();
    skt = new SrsStSocket(c);//實現ISrsProtocolReaderWriter接口,即st socket的讀寫函數,以及讀寫超時,發送和接受到的字節數
    rtmp = new SrsRtmpServer(skt);// 提供rtmp-command-protocol服務,是一個上層的面向協議,media stream的服務,
// 例如連接vhost/app,播放流,獲取視頻/音頻數據,與之對應的是 SrsRtmpClient.
refer
= new SrsRefer(); bandwidth = new SrsBandwidth(); security = new SrsSecurity(); duration = 0; kbps = new SrsKbps(); kbps->set_io(skt, skt); wakable = NULL; mw_sleep = SRS_PERF_MW_SLEEP; mw_enabled = false; realtime = SRS_PERF_MIN_LATENCY_ENABLED; send_min_interval = 0; tcp_nodelay = false; client_type = SrsRtmpConnUnknown; // 在配置改變時,支持reload _srs_config->subscribe(this); }

構造函數初始化成員變量

// TODO: return detail message when error for client.
int SrsRtmpConn::do_cycle()
{
    int ret = ERROR_SUCCESS;
    
    srs_trace("RTMP client ip=%s", ip.c_str());

// 設置SrsRtmpServer裏面的底層數據通訊類SrsProtocol的讀寫超時 rtmp
->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); // 握手 if ((ret = rtmp->handshake()) != ERROR_SUCCESS) { srs_error("rtmp handshake failed. ret=%d", ret); return ret; } srs_verbose("rtmp handshake success"); // 獲取連接的app if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) { srs_error("rtmp connect vhost/app failed. ret=%d", ret); return ret; } srs_verbose("rtmp connect app success"); // set client ip to request. req->ip = ip; // 假設推流地址 rtmp://192.168.151.151/live/marstv // discovery vhost, resolve the vhost from config
  req->vhost 是192.168.151.151
SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);
// 沒有設置 192.168.151.151 這樣的vhost parsed_vhost是默認的vhost
if (parsed_vhost) { req->vhost = parsed_vhost->arg0();// req->vhost 是 __defaultVhost__ } srs_info("discovery app success. schema=%s, vhost=%s, port=%s, app=%s", req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str()); if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) { ret = ERROR_RTMP_REQ_TCURL; srs_error("discovery tcUrl failed. " "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d", req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret); return ret; } // check vhost

// 在 check_vhost裏面會調用 http_hooks_on_connect if ((ret = check_vhost()) != ERROR_SUCCESS) { srs_error("check vhost failed. ret=%d", ret); return ret; } srs_verbose("check vhost success."); srs_trace("connect app, " "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s", req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), (req->args? "(obj)":"null")); // show client identity if(req->args) { std::string srs_version; std::string srs_server_ip; int srs_pid = 0; int srs_id = 0; SrsAmf0Any* prop = NULL; if ((prop = req->args->ensure_property_string("srs_version")) != NULL) { srs_version = prop->to_str(); } if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) { srs_server_ip = prop->to_str(); } if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) { srs_pid = (int)prop->to_number(); } if ((prop = req->args->ensure_property_number("srs_id")) != NULL) { srs_id = (int)prop->to_number(); } srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d", srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id); if (srs_pid > 0) { srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d", srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id); } } ret = service_cycle(); http_hooks_on_close(); return ret; }

SRS學習筆記10-SrsConnection及其子類分析