1. 程式人生 > >SRS(simple-rtmp-server)流媒體伺服器原始碼分析--系統啟動

SRS(simple-rtmp-server)流媒體伺服器原始碼分析--系統啟動

SRS(simple-rtmp-server)流媒體伺服器原始碼分析--系統啟動

一、前言

       小卒最近看SRS原始碼,隨手寫下部落格,其一為了整理思路,其二也是為日後翻看方便。如果不足之處,請指教!

首先總結一下SRS原始碼的優點:

       1、輕量級,程式碼結構清楚,目前SRS3.0程式碼8萬行左右,但幾乎滿足直播業務的所有要求。

       2、SRS採用State Threads,支援高併發量,高效能。

       3、SRS支援rtmp和hls,滿足PC和移動直播要求。

       4、SRS支援叢集部署。小叢集Forward,大叢集edge。

程式碼分析可分為兩個階段:
       一:分析程式碼框架,理清楚組織流程
       二:分析程式碼細節,熟悉SRS工作原理

二、程式碼分析

相關SRS原始碼其他總結:

       SRS(simple-rtmp-server)流媒體伺服器原始碼分析--系統啟動


       SRS(simple-rtmp-server)流媒體伺服器原始碼分析--RTMP訊息play

       SRS(simple-rtmp-server)流媒體伺服器原始碼分析--RTMP資訊Publish

           SRS(simple-rtmp-server)流媒體伺服器原始碼分析--HLS切片

現階段,我主要以程式碼框架梳理為主。Srs原始碼框架如下圖:


       
         系統在啟動時,初始化相關類,監聽相關埠,若來一個訪問請求,則為該連結建立一個執行緒,專門處理與該連結的操作。          main函式在srs_main_server.cpp這個檔案中。在main函式中,啟動引數在這裡不做過多介紹。直接從run()-> run_master()看起。

   
  1. int run_master()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. if ((ret = _srs_server->initialize_st()) != ERROR_SUCCESS) {
  5. return ret;
  6. }
  7. if ((ret = _srs_server->initialize_signal()) != ERROR_SUCCESS) {
  8. return ret;
  9. }
  10. //將pid程序號寫進檔案
  11. if ((ret = _srs_server->acquire_pid_file()) != ERROR_SUCCESS) {
  12. return ret;
  13. }
  14. //客戶端監聽
  15. if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
  16. return ret;
  17. }
  18. if ((ret = _srs_server->register_signal()) != ERROR_SUCCESS) {
  19. return ret;
  20. }
  21. if ((ret = _srs_server->http_handle()) != ERROR_SUCCESS) {
  22. return ret;
  23. }
  24. if ((ret = _srs_server->ingest()) != ERROR_SUCCESS) {
  25. return ret;
  26. }
  27. if ((ret = _srs_server->cycle()) != ERROR_SUCCESS) {
  28. return ret;
  29. }
  30. return 0;
  31. }

進入客戶監聽


   
  1. if ((ret = _srs_server->listen()) != ERROR_SUCCESS) {
  2. return ret;
  3. }
  監聽內容:  不同的連線請求,有不同的監聽

   
  1. int SrsServer::listen()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. // 建立一個rtmp的Streamlistener
  5. if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
  6. return ret;
  7. }
  8. if ((ret = listen_http_api()) != ERROR_SUCCESS) {
  9. return ret;
  10. }
  11. if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
  12. return ret;
  13. }
  14. if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
  15. return ret;
  16. }
  17. return ret;
  18. }

1、首先分析RTMP連線 


   
  1. int SrsServer::listen_rtmp()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. // stream service port.
  5. std::vector <std::string> ip_ports = _srs_config->get_listens();
  6. srs_assert((int)ip_ports.size() > 0);
  7. close_listeners(SrsListenerRtmpStream);
  8. for (int i = 0; i < (int)ip_ports.size(); i++) {
  9. SrsListener* listener = new SrsStreamListener( this, SrsListenerRtmpStream);
  10. listeners.push_back( listener);
  11. std::string ip;
  12. int port;
  13. srs_parse_endpoint( ip_ports[ i], ip, port);
  14. if (( ret = listener->listen(ip, port)) != ERROR_SUCCESS) {
  15. srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);
  16. return ret;
  17. }
  18. }
  19. return ret;
  20. }
         這裡是listen_rtmp()函式,你也可以去看看listen_http_api()函式、listen_http_stream()函式,其實結構都很相似,只是在建立SrsStreamListener物件時,傳入了不同的引數SrsListenerRtmpStream、SrsListenerHttpApi、SrsListenerHttpStream,代表了不同型別的監聽物件。

   
  1. // listen_rtmp 中listen監聽走這裡了。
  2. int SrsStreamListener::listen(string i, int p)
  3. {
  4. int ret = ERROR_SUCCESS;
  5. ip = i;
  6. port = p;
  7. srs_freep(listener);
  8. listener = new SrsTcpListener(this, ip, port);
  9. if ((ret = listener->listen()) != ERROR_SUCCESS) {
  10. srs_error("tcp listen failed. ret=%d", ret);
  11. return ret;
  12. }
  13. srs_info("listen thread current_cid=%d, "
  14. "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
  15. _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
  16. srs_trace("%s listen at tcp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());
  17. return ret;
  18. }
注意,這裡有大量純虛擬函式,不要走錯路了。進入TCP監聽程式碼

   
  1. // rtmp tcp監聽
  2. int SrsTcpListener::listen()
  3. {
  4. //C++ Socket程式設計
  5. int ret = ERROR_SUCCESS;
  6. // 1、建立套接字,流式Socket(SOCK_STREAM)
  7. if ((_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
  8. ret = ERROR_SOCKET_CREATE;
  9. srs_error("create linux socket error. port=%d, ret=%d", port, ret);
  10. return ret;
  11. }
  12. srs_verbose("create linux socket success. port=%d, fd=%d", port, _fd);
  13. int reuse_socket = 1;
  14. if (setsockopt(_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)) == -1) {
  15. ret = ERROR_SOCKET_SETREUSE;
  16. srs_error("setsockopt reuse-addr error. port=%d, ret=%d", port, ret);
  17. return ret;
  18. }
  19. srs_verbose("setsockopt reuse-addr success. port=%d, fd=%d", port, _fd);
  20. sockaddr_in addr;
  21. addr.sin_family = AF_INET;
  22. addr.sin_port = htons(port);
  23. addr.sin_addr.s_addr = inet_addr(ip.c_str());
  24. // 2、繫結套接字到一個IP地址和一個埠上
  25. if (bind(_fd, (const sockaddr*)&addr, sizeof(sockaddr_in)) == -1) {
  26. ret = ERROR_SOCKET_BIND;
  27. srs_error("bind socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
  28. return ret;
  29. }
  30. srs_verbose("bind socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
  31. // 3、將套接字設定為監聽模式等待連線請求
  32. if (::listen(_fd, SERVER_LISTEN_BACKLOG) == -1) {
  33. ret = ERROR_SOCKET_LISTEN;
  34. srs_error("listen socket error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
  35. return ret;
  36. }
  37. srs_verbose("listen socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
  38. if ((_stfd = st_netfd_open_socket(_fd)) == NULL){
  39. ret = ERROR_ST_OPEN_SOCKET;
  40. srs_error("st_netfd_open_socket open socket failed. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
  41. return ret;
  42. }
  43. srs_verbose("st open socket success. ep=%s:%d, fd=%d", ip.c_str(), port, _fd);
  44. // 4、等到連線一個客戶之後,開啟一個新的執行緒
  45. if ((ret = pthread->start()) != ERROR_SUCCESS) {
  46. srs_error("st_thread_create listen thread error. ep=%s:%d, ret=%d", ip.c_str(), port, ret);
  47. return ret;
  48. }
  49. srs_verbose("create st listen thread success, ep=%s:%d", ip.c_str(), port);
  50. return ret;
  51. }
       此程式碼為C++ TCP  Socket程式碼,思路比較清晰,可以看到,每接受到一個rtmp訪問請求,建立一個”執行緒“,這裡暫時將其稱為執行緒,後面再做具體介紹。建立執行緒程式碼如下:

   
  1. int SrsReusableThread::start()
  2. {
  3. return pthread->start();
  4. }


   
  1. int SrsThread::start()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. if(tid) {
  5. srs_info( "thread %s already running.", _name);
  6. return ret;
  7. }
  8. if((tid = st_thread_create(thread_fun, this, (_joinable? 1: 0), 0)) == NULL){
  9. ret = ERROR_ST_CREATE_CYCLE_THREAD;
  10. srs_error( "st_thread_create failed. ret=%d", ret);
  11. return ret;
  12. }
  13. disposed = false;
  14. // we set to loop to true for thread to run.
  15. loop = true;
  16. // wait for cid to ready, for parent thread to get the cid.
  17. while (_cid < 0) {
  18. st_usleep( 10 * 1000);
  19. }
  20. // now, cycle thread can run.
  21. can_run = true;
  22. return ret;
  23. }

       來到了st_thread_create,這裡要注意,這是SRS開源專案具有高併發,高效能的重要一步。這裡建立的是協程,不是執行緒。協程是有別於程序和執行緒的一種元件,具有程序的獨立性和執行緒的輕量級,聽說微信能夠支援8億使用者量,也是採用協程這種網路服務框架:http://www.infoq.com/cn/articles/CplusStyleCorourtine-At-Wechat。

從這裡可以看出,srs是一個單執行緒的伺服器,採用協程,主持高併發,高效能。

建立協程,協程函式為:thread_fun()


    
  1. // 每連連結一個使用者,建立一個協程程,該函式為協程函式
  2. void* SrsThread::thread_fun(void* arg)
  3. {
  4. SrsThread* obj = (SrsThread*)arg;
  5. srs_assert(obj);
  6. // 進入執行緒迴圈
  7. obj->thread_cycle();
  8. // for valgrind to detect.
  9. SrsThreadContext* ctx = dynamic_cast <SrsThreadContext*>(_srs_context);
  10. if (ctx) {
  11. ctx->clear_cid();
  12. }
  13. st_thread_exit(NULL);
  14. return NULL;
  15. }
此時,真正進入了協程迴圈處理

    
  1. void SrsThread::thread_cycle()
  2. {
  3. int ret = ERROR_SUCCESS;
  4. _srs_context->generate_id();
  5. srs_info("thread %s cycle start", _name);
  6. _cid = _srs_context->get_id();
  7. srs_assert(handler);
  8. handler->on_thread_start();
  9. // thread is running now.
  10. really_terminated = false;
  11. // wait for cid to ready, for parent thread to get the cid.
  12. while (!can_run && loop) {
  13. st_usleep(10 * 1000);
  14. }
  15. while (loop) {
  16. if ((ret = handler->on_before_cycle()) != ERROR_SUCCESS) {
  17. srs_warn("thread %s on before cycle failed, ignored and retry, ret=%d", _name, ret);
  18. goto failed;
  19. }
  20. srs_info("thread %s on before cycle success", _name); <