1. 程式人生 > >LIVE555研究之五:RTPServer(二)

LIVE555研究之五:RTPServer(二)

tpch live555 循環調用 family 每一個 函數 計算 ack close

LIVE555研究之五:RTPServer(二)

接上文,main函數的幾行代碼創建了RTSPServer類的子類DynamicRTSPServer對象。RTPServer類是server類的基類。DynamicRTSPServer代表詳細的server子類。我們今天介紹的server程序就是基於該類實現的。

在創建DynamicRTSPServer時傳入了值為554的port號。這是由於RTSP默認port號為554,http默認使用80port是一樣的。

DynamicRTSPServer

繼承關系:

技術分享

Medium是非常多類的基類。內部定義了指向環境類的引用和一個char類型媒體名稱。並定義了依照媒體名稱,查找相應媒體的成員函數lookupByName

由於MediaSinkMediaSouceMediaSessionRTSPClient

RTPServer均繼承自該類。因此在Medium中定義了非常多推斷該類是哪個媒體類型的函數:

  virtual Boolean isSource() const;

  virtual Boolean isSink() const;

  virtual Boolean isRTCPInstance() const;

  virtual Boolean isRTSPClient() const;

  virtual Boolean isRTSPServer() const;

  virtual Boolean isMediaSession() const;

  virtual Boolean isServerMediaSession() const;

  virtual Boolean isDarwinInjector() const;

Medium中的實現均是返回false。在相應的子類中均會重定義相應函數。並返回true

TaskToken fNextTask用來保存延遲任務的ID

保存的任務ID用於被又一次調度。或者在該媒體對象被銷毀時從延遲隊列中取消調度。

RTPServer類是server類的基類。代表了server對象。在整個server執行期間,該對象一直存在。

定義了下面成員變量:

  HashTable* fServerMediaSessions; 

  HashTable* fClientConnections; 

  HashTable* fClientConnectionsForHTTPTunneling;   

  HashTable* fClientSessions; 

  HashTable* fPendingRegisterRequests;

 


從其成員變量能夠看到RTPServer中維護了ServerMediaSession對象、ClientConnectionClientSession對象的HashTable

ServerMediaSessionSession相應server端一個媒體文件。當client請求多個媒體文件時,RTPServer內會維護相應的多個ServerMediaSession對象。ServerMediaSession對象通過媒體文件名稱進行標識,如client請求a.264文件。則server就會在保存ServerMediaSessionHashTable中搜索相應文件名稱為a.264ServerMediaSession。如未找到,則說明還未為該媒體文件創建相應的ServerMediaSession。並創建一個新的ServerMediaSession與媒體文件名稱關聯後加入到HashTable

lookupServerMediaSession用於在map中搜索相應媒體文件名稱相應的ServerMediaSession

void addServerMediaSession(ServerMediaSession* serverMediaSession);

  virtual ServerMediaSession* lookupServerMediaSession(char const* streamName);

  void removeServerMediaSession(ServerMediaSession* serverMediaSession);

  void removeServerMediaSession(char const* streamName);


以上三個成員函數分別用來加入、查詢和刪除相應ServerMediaSession項。

removeServerMediaSession被調用後。在RTPServer中維護的fServerMediaSessionHashTable中。該ServerMediaSession會被刪除。可是相應的ServerMediaSession對象並不一定會被釋放。

由於此時其它client還有可能在使用該媒體文件。僅僅有當其它client都釋放了對該媒體文件的引用後,該對象才會被釋放。

closeAllClientSessionsForServerMediaSession用於刪除全部client對某一個媒體文件的引用。

deleteServerMediaSession在從fServerMediaSession中刪除相應項目時同一時候也會刪除全部client的引用,此後該對象的引用計數為0能夠被安全釋放。

removeServerMediaSession時會檢查引用計數,僅僅有當引用計數為0時該對象才會被釋放。

if (serverMediaSession->referenceCount() == 0) //僅僅有當引入計數為0時才會被釋放
{
    Medium::close(serverMediaSession);
} 
else 
{
  serverMediaSession->deleteWhenUnreferenced() = True;
}

ClientConnection對象

ClientConnection對象定義在RTPServer內部,為其內部類。

主要用於和client的通信。當有新的client連接到server時,會新建ClientConnection對象。其內部定義了發送、接收socket以及發送和接收緩沖區,並對client的命令進行處理和回應。

void handleRequestBytes(int newBytesRead);

用於處理client命令,在對RTSP命令進行分析後。提取出各種信息。然後進行分流處理。

對於OPTIONSDESCRIBE、命令不支持、命令有誤等其它錯誤命令的響應會直接在ClientConnection中進行處理。

而對於SETUPPLAYPAUSETERARDOWN等命令會傳遞到ClientSession中進行處理。

下面為分流代碼:

 else if (strcmp(cmdName, "TEARDOWN") == 0
               || strcmp(cmdName, "PLAY") == 0
               || strcmp(cmdName, "PAUSE") == 0
               || strcmp(cmdName, "GET_PARAMETER") == 0
               || strcmp(cmdName, "SET_PARAMETER") == 0)
         {
              if (clientSession != NULL) {

                clientSession->handleCmd_withinSession(this, cmdName, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);

              } else
              {
                handleCmd_sessionNotFound();
              }

ClientSession對象會在client請求SETUP命令時在ClientConnection中創建。並分配一個ClientSessionID

對於SETUP之前和對一些出錯處理命令會在ClientConnection中進行響應。

ClientConnection維護了RTPServer的指針,能夠在新建ClientSession對象後將其增加到RTPServer維護的fClientSessions中。

ClientSession中定義的成員:

RTSPServer& fOurServer;

 u_int32_t fOurSessionId;

ServerMediaSession* fOurServerMediaSession;

 

ClientSession也維護了對RTPServer的引用。同一時候也保存了指向ServerMediaSession的指針。在對SETUP的響應中。有這樣一句話:

  if (fOurServerMediaSession == NULL) 
{
       // We‘re accessing the "ServerMediaSession" for the first time.
       fOurServerMediaSession = sms;
       fOurServerMediaSession->incrementReferenceCount();
 } 
else if (sms != fOurServerMediaSession) 
{
       // The client asked for a stream that‘s different from the one originally requested for this stream id.  Bad request:
       ourClientConnection->handleCmd_bad();
       break;
}

由此我們知道依照眼下的實現。每一個clientSession僅僅能相應一個ServerMediaSession。即每一個client僅僅能請求一個媒體文件,不能同一時候請求兩個媒體文件。假設須要同一時候支持多個媒體文件,就須要在ClientSession中維護一個ServerMediaSession集合。

ClientSessionnoteLiveness用於client保活。

其內部實現例如以下:
void RTSPServer::RTSPClientSession::noteLiveness()

{
   if (fOurServer.fReclamationTestSeconds > 0) 
    {
     envir().taskScheduler()
       .rescheduleDelayedTask(fLivenessCheckTask,
                          fOurServer.fReclamationTestSeconds*1000000,
                          (TaskFunc*)livenessTimeoutTask, this);
    }
}

上述代碼向調度器請求又一次調度一個延遲任務,在fReclamationTestSeconds後會調用livenessTimeoutTask

事實上現非常easy只刪除自身。

void RTSPServer::RTSPClientSession
   ::livenessTimeoutTask(RTSPClientSession* clientSession) 
{
   delete clientSession;
}

當server收到相應client的RR包時會調用noteLiveness,又一次計時。

fReclamationTestSecondsRTPServer構造時傳入,默覺得65s。表示如65s內未收到clientRTCP包即覺得client已斷開。

假設在fReclamationTestSeconds的時間內再次調用noteLiveness,則該延遲任務會被設置成新的時間。原來的調度不再起作用。

  struct streamState 
  {

     ServerMediaSubsession* subsession;
      void* streamToken;
  } * fStreamStates;

fStreamStates指向一個動態分配的數組。fNumStreamStates表示該數組包括的元素個數。

ServerMediaSession代表一個track(媒體流)。streamTokenvoid*類型的指針,但它指向StreamState類的對象。StreamState對象代表一個真正流動起來的數據流。這個流從XXXXFileSouce流向RTPSink

能夠看到一個ServerMediaSubSession相應一個StreamState

ServerMediaSubSession相應一個靜態的流。能夠被多個client重用。

如:多個client可能會請求同一個媒體文件裏的trackStreamState代表一個動態的流。

ServerMediaSession

ServerMediaSession代表server端一個媒體文件。

其成員例如以下:

ServerMediaSubsession* fSubsessionsHead;

  ServerMediaSubsession* fSubsessionsTail;

  unsigned fSubsessionCounter; 
  char* fStreamName;
  char* fInfoSDPString;
  char* fDescriptionSDPString;
  char* fMiscSDPLines;
  struct timeval fCreationTime;
  unsigned fReferenceCount;
  Boolean fDeleteWhenUnreferenced;

能夠看到其主要成員為fSubsessionsHeadfSubsessionsTail。代表該媒體文件裏的多個媒體流track

fStreamName為該媒體文件名稱。

fDescritionSDPString代表SDP字符串。用於在client發送DESCRIBE命令時返回給client。

fReferenceCount為引用計數。

當將fDeleteWhenUnreferenced設置為true。且引用計數為0時。ServerMediaSession會被釋放。

該值在構造函數中默認賦值為false。即全部ServerMediaSession即使不存在被client引用時,也不會被釋放。對於長時間執行的server程序將會出現內存消耗耗盡的情況。

解決方式就是在構造時將fDeleteWhenUnreferenced的默認值賦值為true

其它成員函數是用來操縱MediaSubSession

MediaSubSession

假設一個媒體文件裏既包括音頻流又包括視頻流。我們稱這個媒體文件裏包括兩個track。每一個track相應一個ServerMediaSubsession

ServerMediaSession* fParentSession;
 netAddressBits fServerAddressForSDP;
 portNumBits fPortNumForSDP;
private:
  ServerMediaSubsession* fNext;
  unsigned fTrackNumber; // within an enclosing ServerMediaSession
  char const* fTrackId; 

fParentSession指向該MediaSubSession所屬的ServerMediaSession。

fNext指向下一個同屬於一個ServerMediaSession的ServerMediaSubsession。假設只包括一個媒體流。則fNext指針為NULL。

fTrackNumber為track號。

在client發送DESCRIBE命令時,server端會為每一個媒體流分配一個TrackID。

fTrackId 為字符串指針,該字符串由track和fTrackNumber拼接而成。如track1、track2。

ServerMediaSubsession中只定義了空的接口,詳細實現均放在其子類。

OnDemandServerMediaSubsession

HashTable* fDestinationsHashTable; 存儲sessionID和Destinations的映射。

Destinations為目的地址。

每一個ClientSession在HashTable中都有與自己相應的項。

Destinations能夠維護一對RTP和RTCP的port和地址信息。

StreamState

前面說過StreamState代表一個真正流動的流。如今讓我們看下StreamState的到底實現了什麽功能。

  OnDemandServerMediaSubsession& fMaster;
  Boolean fAreCurrentlyPlaying;
  unsigned fReferenceCount;
  Port fServerRTPPort, fServerRTCPPort;
  RTPSink* fRTPSink;
  BasicUDPSink* fUDPSink;
  float fStreamDuration;
  unsigned fTotalBW;
  RTCPInstance* fRTCPInstance;
  FramedSource* fMediaSource;
  float fStartNPT;
  Groupsock* fRTPgs;
  Groupsock* fRTCPgs;

fMaster為對OnDemandServerMediaSubsession或其子類的引用。

fReferenceCount為引用計數。

fServerRTPPort為RTPport

fServerRTCPPort為RTCPport

fRTPSink抽象Sink類。

fMediaSource為Souce基類。

能夠看到StreamState既維護了Sink。又維護了Souce。事實上在StreamState

GroupSock主要用於處理組播。但也能夠處理單播。

Groupsock* fRTPgs和 Groupsock* fRTCPgs為RTP和RTCP的地址。用於向RTP和RTCPport發送數據。

RTCPInstance

RTCPInsance是對RTCP通信的封裝。

RTCP的功能是統計包的收發,為流量統計提供根據。因為其封裝的比較完整。因此RTCPInstance與其它類間的關系不是那麽緊密。

RTCPInstanceRTPInterface提供支持。所以它既支持RTP over UDP,又支持RTP over TCP

  void setByeHandler(TaskFunc* handlerTask, void* clientData,
            Boolean handleActiveParticipantsOnly = True);
  void setSRHandler(TaskFunc* handlerTask, void* clientData);
  void setRRHandler(TaskFunc* handlerTask, void* clientData);
  void setSpecificRRHandler(netAddressBits fromAddress, Port fromPort,
               TaskFunc* handlerTask, void* clientData);

以上四個成員函數均是用來設置回調函數。在滿足一定條件時該回調被調用。

setByeHandler用於設置在client結束與server的RTCP通信時的回調。

setSRHandler用於設置在收到client的SR包時的回調。在收到SR包時該回調被調用。

setRRHandler用於設置在收到client的RR包時的回調。在收到RR包時該回調被調用。

setSpecificRRHandler該成員函數與SetRRHandler的差別在於。它能夠設置針對某一client的RR包的回調。

RTPClientSession就是調用此回調。為指定client註冊noteClientLiveness。用於檢測client保活。如在一定時間內收不到RR包時即覺得client已經斷開了連接。

此時將會刪除相應的clientSession對象。這裏提供了一種監視client執行狀態的好方法。

每一個MediaSubSession相應一個StreamState對象。它們被保存在在ServerMediaClient中被StreamState數組中。

在收到client的PLAYM命令後。ServerMediaClient的響應函數內會為每一個StreamState調用play

// Now, start streaming:

  for (i = 0; i < fNumStreamStates; ++i)

 {
    if (subsession == NULL /* means: aggregated operation */
       || subsession == fStreamStates[i].subsession) 
     {
      unsigned short rtpSeqNum = 0;
      unsigned rtpTimestamp = 0;
      if (fStreamStates[i].subsession == NULL) continue;
      fStreamStates[i].subsession->startStream(fOurSessionId,
                         fStreamStates[i].streamToken,
                         (TaskFunc*)noteClientLiveness, this,
                         rtpSeqNum, rtpTimestamp,                         

      //略去部分代碼

     }
 }

RTSPClientSession的handleCmd_SETUP中會依據ServerMediaSubSession的個數創建streamStates數組。

if (fStreamStates == NULL) 
{
      // 計算ServerMediaSubSession個數
       ServerMediaSubsessionIterator iter(*fOurServerMediaSession);
      for (fNumStreamStates = 0; iter.next() != NULL; ++fNumStreamStates) {}      
      fStreamStates = new struct streamState[fNumStreamStates];      
      iter.reset();
      ServerMediaSubsession* subsession; 
      //將ServerMediaSubSession與streamStates通過fStreamStates數組進行關聯
       for (unsigned i = 0; i < fNumStreamStates; ++i)
     {
        subsession = iter.next();
        fStreamStates[i].subsession = subsession;
        fStreamStates[i].streamToken = NULL;      
     }
}

上述代碼中與ServerMediaSubSession 關聯的streamToken被賦值為NULL

並會在後面的getStreamParameters中被賦值,最後一個參數為指針的引用。用於在getStreamParameters中改動該指針。

subsession->getStreamParameters(fOurSessionId, ourClientConnection->fClientAddr.sin_addr.s_addr,
                  clientRTPPort, clientRTCPPort,
                  tcpSocketNum, rtpChannelId, rtcpChannelId,
                  destinationAddress, destinationTTL, fIsMulticast,
                  serverRTPPort, serverRTCPPort,
                  fStreamStates[streamNum].streamToken); 

getStreamParameters在OnDemandServerMediaSubsession又一次定義,能夠看到創建StreamStates對象的代碼:

// Set up the state of the stream.  The stream will get started later:
    streamToken = fLastStreamToken
      = new StreamState(*this, serverRTPPort, serverRTCPPort, rtpSink, udpSink,
           streamBitrate, mediaSource,
           rtpGroupsock, rtcpGroupsock);


能夠看到StreamStates關聯了Sink和Souce。

之所以要在OnDemandServerMediaSubsession又一次定義的getStreamParameters中分配StreamStates對象,是由於它定義了新的創建詳細MediaSouce和MediaSink的虛函數。

virtual FramedSource* createNewStreamSource(unsigned clientSessionId,
                       unsigned& estBitrate) = 0;
     // "estBitrate" is the stream‘s estimated bitrate, in kbps
   virtual RTPSink* createNewRTPSink(Groupsock* rtpGroupsock,
                  unsigned char rtpPayloadTypeIfDynamic,
                  FramedSource* inputSource) = 0;
 

StreamStates關聯的MediaSouce和MediaSink均是詳細的子類。

若媒體文件為H264碼流,則相應的Souce為H264VideoStreamFramer。相應的Sink為H264VideoRTPSink。

RTSPClientSession的handleCmd_PLAY中為每一個MediaSubSession循環調用startStream。並傳入與MediaSubSession關聯的StramStates對象指針:

for (i = 0; i < fNumStreamStates; ++i) 
{
    if (subsession == NULL /* means: aggregated operation */
      || subsession == fStreamStates[i].subsession) 
   {
      unsigned short rtpSeqNum = 0;
      unsigned rtpTimestamp = 0;
      if (fStreamStates[i].subsession == NULL) continue;
      fStreamStates[i].subsession->startStream(fOurSessionId,
                         fStreamStates[i].streamToken,
                        (TaskFunc*)noteClientLiveness, this,
                         rtpSeqNum, rtpTimestamp,
                         RTSPServer::RTSPClientConnection::handleAlternativeRequestByte, ourClientConnection);
     }

}

startStram內部調用了StreamStates的startPlaying:

void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
                     void* streamToken,
                     TaskFunc* rtcpRRHandler,
                     void* rtcpRRHandlerClientData,
                     unsigned short& rtpSeqNum,
                     unsigned& rtpTimestamp,
                     ServerRequestAlternativeByteHandler* serverRequestAlternativeByteHandler,
                      void* serverRequestAlternativeByteHandlerClientData) {
  StreamState* streamState = (StreamState*)streamToken;
  Destinations* destinations
    = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));
  if (streamState != NULL)
  {
    streamState->startPlaying(destinations,
                 rtcpRRHandler, rtcpRRHandlerClientData,
                    serverRequestAlternativeByteHandler, serverRequestAlternativeByteHandlerClientData);
    RTPSink* rtpSink = streamState->rtpSink(); // alias
    if (rtpSink != NULL)
    {
      rtpSeqNum = rtpSink->currentSeqNo();

      rtpTimestamp = rtpSink->presetNextTimestamp();
    }
  }
}

streamStates的startPlaying內部則創建了RTCPInstance對象並調用了RTPSink的startPlaying函數:

fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);

第一個參數即為詳細的MediaSouce子類。

StartPlaying之後,Sink會調用Souce的getNextFrame獲得一幀數據。

上面介紹的各種類是支撐LIVE555的各種基礎設施。對於各種碼流都是通用的。

2014.8.28於浙江杭州

LIVE555研究之五:RTPServer(二)