1. 程式人生 > >RTMPdump(libRTMP) 原始碼分析 8: 傳送訊息(Message)

RTMPdump(libRTMP) 原始碼分析 8: 傳送訊息(Message)

=====================================================

RTMPdump(libRTMP) 原始碼分析系列文章:

=====================================================

函式呼叫結構圖

RTMPDump (libRTMP)的整體的函式呼叫結構圖如下圖所示。


詳細分析

之前寫了一系列的文章介紹RTMPDump各種函式。比如怎麼建立網路連線(NetConnection),怎麼建立網路流(NetStream)之類的,唯獨沒有介紹這些傳送或接收的資料,在底層到底是怎麼實現的。本文就是要剖析一下其內部的實現。即這些訊息(Message)到底是怎麼傳送和接收的。

先來看看傳送訊息吧。

  • 傳送connect命令使用函式SendConnectPacket()
  • 傳送createstream命令使用RTMP_SendCreateStream()
  • 傳送realeaseStream命令使用SendReleaseStream()
  • 傳送publish命令使用SendPublish()
  • 傳送deleteStream的命令使用SendDeleteStream()
  • 傳送pause命令使用RTMP_SendPause()

不再一一例舉,發現函式命名有兩種規律:RTMP_Send***()或者Send***(),其中*號代表命令的名稱。

SendConnectPacket()這個命令是每次程式開始執行的時候傳送的第一個命令訊息,內容比較多,包含了很多AMF編碼的內容,在此不多做分析,貼上程式碼:

//傳送“connect”命令
static int
SendConnectPacket(RTMP *r, RTMPPacket *cp)
{
  RTMPPacket packet;
  char pbuf[4096], *pend = pbuf + sizeof(pbuf);
  char *enc;

  if (cp)
    return RTMP_SendPacket(r, cp, TRUE);

  packet.m_nChannel = 0x03;	/* control channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
  packet.m_packetType = 0x14;	/* INVOKE */
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = 0;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

  enc = packet.m_body;
  enc = AMF_EncodeString(enc, pend, &av_connect);
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  *enc++ = AMF_OBJECT;

  enc = AMF_EncodeNamedString(enc, pend, &av_app, &r->Link.app);
  if (!enc)
    return FALSE;
  if (r->Link.protocol & RTMP_FEATURE_WRITE)
    {
      enc = AMF_EncodeNamedString(enc, pend, &av_type, &av_nonprivate);
      if (!enc)
	return FALSE;
    }
  if (r->Link.flashVer.av_len)
    {
      enc = AMF_EncodeNamedString(enc, pend, &av_flashVer, &r->Link.flashVer);
      if (!enc)
	return FALSE;
    }
  if (r->Link.swfUrl.av_len)
    {
      enc = AMF_EncodeNamedString(enc, pend, &av_swfUrl, &r->Link.swfUrl);
      if (!enc)
	return FALSE;
    }
  if (r->Link.tcUrl.av_len)
    {
      enc = AMF_EncodeNamedString(enc, pend, &av_tcUrl, &r->Link.tcUrl);
      if (!enc)
	return FALSE;
    }
  if (!(r->Link.protocol & RTMP_FEATURE_WRITE))
    {
      enc = AMF_EncodeNamedBoolean(enc, pend, &av_fpad, FALSE);
      if (!enc)
	return FALSE;
      enc = AMF_EncodeNamedNumber(enc, pend, &av_capabilities, 15.0);
      if (!enc)
	return FALSE;
      enc = AMF_EncodeNamedNumber(enc, pend, &av_audioCodecs, r->m_fAudioCodecs);
      if (!enc)
	return FALSE;
      enc = AMF_EncodeNamedNumber(enc, pend, &av_videoCodecs, r->m_fVideoCodecs);
      if (!enc)
	return FALSE;
      enc = AMF_EncodeNamedNumber(enc, pend, &av_videoFunction, 1.0);
      if (!enc)
	return FALSE;
      if (r->Link.pageUrl.av_len)
	{
	  enc = AMF_EncodeNamedString(enc, pend, &av_pageUrl, &r->Link.pageUrl);
	  if (!enc)
	    return FALSE;
	}
    }
  if (r->m_fEncoding != 0.0 || r->m_bSendEncoding)
    {	/* AMF0, AMF3 not fully supported yet */
      enc = AMF_EncodeNamedNumber(enc, pend, &av_objectEncoding, r->m_fEncoding);
      if (!enc)
	return FALSE;
    }
  if (enc + 3 >= pend)
    return FALSE;
  *enc++ = 0;
  *enc++ = 0;			/* end of object - 0x00 0x00 0x09 */
  *enc++ = AMF_OBJECT_END;

  /* add auth string */
  if (r->Link.auth.av_len)
    {
      enc = AMF_EncodeBoolean(enc, pend, r->Link.lFlags & RTMP_LF_AUTH);
      if (!enc)
	return FALSE;
      enc = AMF_EncodeString(enc, pend, &r->Link.auth);
      if (!enc)
	return FALSE;
    }
  if (r->Link.extras.o_num)
    {
      int i;
      for (i = 0; i < r->Link.extras.o_num; i++)
	{
	  enc = AMFProp_Encode(&r->Link.extras.o_props[i], enc, pend);
	  if (!enc)
	    return FALSE;
	}
    }
  packet.m_nBodySize = enc - packet.m_body;
  //----------------
  r->dlg->AppendMLInfo(20,1,"命令訊息","Connect");
  //-----------------------------
  return RTMP_SendPacket(r, &packet, TRUE);
}

RTMP_SendCreateStream()命令相對而言比較簡單,程式碼如下:
//傳送“createstream”命令
int
RTMP_SendCreateStream(RTMP *r)
{
  RTMPPacket packet;
  char pbuf[256], *pend = pbuf + sizeof(pbuf);
  char *enc;

  packet.m_nChannel = 0x03;	/* control channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
  packet.m_packetType = 0x14;	/* INVOKE */
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = 0;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

  enc = packet.m_body;
  enc = AMF_EncodeString(enc, pend, &av_createStream);
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  *enc++ = AMF_NULL;		/* NULL */

  packet.m_nBodySize = enc - packet.m_body;
  //----------------
  r->dlg->AppendMLInfo(20,1,"命令訊息","CreateStream");
  //-----------------------------
  return RTMP_SendPacket(r, &packet, TRUE);
}

同樣,SendReleaseStream()內容也比較簡單,我對其中部分內容作了註釋:
//傳送RealeaseStream命令
static int
SendReleaseStream(RTMP *r)
{
  RTMPPacket packet;
  char pbuf[1024], *pend = pbuf + sizeof(pbuf);
  char *enc;

  packet.m_nChannel = 0x03;	/* control channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM;
  packet.m_packetType = 0x14;	/* INVOKE */
  packet.m_nTimeStamp = 0;
  packet.m_nInfoField2 = 0;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;

 enc = packet.m_body;
  //對“releaseStream”字串進行AMF編碼
  enc = AMF_EncodeString(enc, pend, &av_releaseStream);
  //對傳輸ID(0)進行AMF編碼?
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  //命令物件
  *enc++ = AMF_NULL;
  //對播放路徑字串進行AMF編碼
  enc = AMF_EncodeString(enc, pend, &r->Link.playpath);
  if (!enc)
    return FALSE;

  packet.m_nBodySize = enc - packet.m_body;
  //----------------
  r->dlg->AppendMLInfo(20,1,"命令訊息","ReleaseStream");
  //-----------------------------
  return RTMP_SendPacket(r, &packet, FALSE);
}

再來看一個SendPublish()函式,用於傳送“publish”命令
//傳送Publish命令
static int
SendPublish(RTMP *r)
{
  RTMPPacket packet;
  char pbuf[1024], *pend = pbuf + sizeof(pbuf);
  char *enc;
  //塊流ID為4
  packet.m_nChannel = 0x04;	/* source channel (invoke) */
  packet.m_headerType = RTMP_PACKET_SIZE_LARGE;
  //命令訊息,型別20
  packet.m_packetType = 0x14;	/* INVOKE */
  packet.m_nTimeStamp = 0;
  //流ID
  packet.m_nInfoField2 = r->m_stream_id;
  packet.m_hasAbsTimestamp = 0;
  packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE;
  //指向Chunk的負載
  enc = packet.m_body;
   //對“publish”字串進行AMF編碼
  enc = AMF_EncodeString(enc, pend, &av_publish);
  enc = AMF_EncodeNumber(enc, pend, ++r->m_numInvokes);
  //命令物件為空
  *enc++ = AMF_NULL;
  enc = AMF_EncodeString(enc, pend, &r->Link.playpath);
  if (!enc)
    return FALSE;

  /* FIXME: should we choose live based on Link.lFlags & RTMP_LF_LIVE? */
  enc = AMF_EncodeString(enc, pend, &av_live);
  if (!enc)
    return FALSE;

  packet.m_nBodySize = enc - packet.m_body;
  //----------------
  r->dlg->AppendMLInfo(20,1,"命令訊息","Pulish");
  //-----------------------------
  return RTMP_SendPacket(r, &packet, TRUE);
}

其他的命令不再一一例舉,總體的思路是宣告一個RTMPPacket型別的結構體,然後設定各種屬性值,最後交給RTMP_SendPacket()進行傳送。

RTMPPacket型別的結構體定義如下,一個RTMPPacket對應RTMP協議規範裡面的一個塊(Chunk)。

//Chunk資訊
  typedef struct RTMPPacket
  {
    uint8_t m_headerType;//ChunkMsgHeader的型別(4種)
    uint8_t m_packetType;//Message type ID(1-7協議控制;8,9音視訊;10以後為AMF編碼訊息)
    uint8_t m_hasAbsTimestamp;	/* Timestamp 是絕對值還是相對值? */
    int m_nChannel;			//塊流ID
    uint32_t m_nTimeStamp;	// Timestamp
    int32_t m_nInfoField2;	/* last 4 bytes in a long header,訊息流ID */
    uint32_t m_nBodySize;	//訊息長度
    uint32_t m_nBytesRead;
    RTMPChunk *m_chunk;
    char *m_body;
  } RTMPPacket;

下面我們來看看RTMP_SendPacket()吧,各種的RTMPPacket(即各種Chunk)都需要用這個函式進行傳送。
//自己編一個數據報傳送出去!
//非常常用
int
RTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue)
{
  const RTMPPacket *prevPacket = r->m_vecChannelsOut[packet->m_nChannel];
  uint32_t last = 0;
  int nSize;
  int hSize, cSize;
  char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c;
  uint32_t t;
  char *buffer, *tbuf = NULL, *toff = NULL;
  int nChunkSize;
  int tlen;
  //不是完整ChunkMsgHeader
  if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE)
    {
      /* compress a bit by using the prev packet's attributes */
	//獲取ChunkMsgHeader的型別
	//前一個Chunk和這個Chunk對比
      if (prevPacket->m_nBodySize == packet->m_nBodySize
	  && prevPacket->m_packetType == packet->m_packetType
	  && packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM)
	packet->m_headerType = RTMP_PACKET_SIZE_SMALL;


      if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp
	  && packet->m_headerType == RTMP_PACKET_SIZE_SMALL)
	packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM;
	  //上一個packet的TimeStamp
      last = prevPacket->m_nTimeStamp;
    }
  
  if (packet->m_headerType > 3)	/* sanity */
    {
      RTMP_Log(RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.",
	  (unsigned char)packet->m_headerType);
      return FALSE;
    }
  //chunk包頭大小;packetSize[] = { 12, 8, 4, 1 }
  nSize = packetSize[packet->m_headerType];
  hSize = nSize; cSize = 0;
  //相對的TimeStamp
  t = packet->m_nTimeStamp - last;

  if (packet->m_body)
    {
	//Header的Start
	//m_body是指向負載資料首地址的指標;“-”號用於指標前移
      header = packet->m_body - nSize;
	//Header的End
      hend = packet->m_body;
    }
  else
    {
      header = hbuf + 6;
      hend = hbuf + sizeof(hbuf);
    }
  //當ChunkStreamID大於319時
  if (packet->m_nChannel > 319)
	//ChunkBasicHeader是3個位元組
    cSize = 2;
  //當ChunkStreamID大於63時
  else if (packet->m_nChannel > 63)
	//ChunkBasicHeader是2個位元組
    cSize = 1;
  if (cSize)
    {
	//header指標指向ChunkMsgHeader
      header -= cSize;
	//hsize加上ChunkBasicHeader的長度
      hSize += cSize;
    }
  //相對TimeStamp大於0xffffff,此時需要使用ExtendTimeStamp
  if (nSize > 1 && t >= 0xffffff)
    {
      header -= 4;
      hSize += 4;
    }

  hptr = header;
  //把ChunkBasicHeader的Fmt型別左移6位
  c = packet->m_headerType << 6;
  switch (cSize)
    {
	//把ChunkBasicHeader的低6位設定成ChunkStreamID
    case 0:
      c |= packet->m_nChannel;
      break;
	//同理,但低6位設定成000000
    case 1:
      break;
	//同理,但低6位設定成000001
    case 2:
      c |= 1;
      break;
    }
  //可以拆分成兩句*hptr=c;hptr++,此時hptr指向第2個位元組
  *hptr++ = c;
  //CSize>0,即ChunkBasicHeader大於1位元組
  if (cSize)
    {
	//將要放到第2位元組的內容tmp
      int tmp = packet->m_nChannel - 64;
	//獲取低位儲存與第2位元組
      *hptr++ = tmp & 0xff;
	//ChunkBasicHeader是最大的3位元組時
      if (cSize == 2)
	//獲取高位儲存於最後1個位元組(注意:排序使用大端序列,和主機相反)
	*hptr++ = tmp >> 8;
    }
  //ChunkMsgHeader。注意一共有4種,包含的欄位數不同。
  //TimeStamp(3B)
  if (nSize > 1)
    {
	//相對TimeStamp和絕對TimeStamp?
      hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t);
    }
  //MessageLength+MessageTypeID(4B)
  if (nSize > 4)
    {
	//MessageLength
      hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize);
	//MessageTypeID
      *hptr++ = packet->m_packetType;
    }
  //MessageStreamID(4B)
  if (nSize > 8)
    hptr += EncodeInt32LE(hptr, packet->m_nInfoField2);
  
  //ExtendedTimeStamp
  if (nSize > 1 && t >= 0xffffff)
    hptr = AMF_EncodeInt32(hptr, hend, t);
  //負載長度,指向負載的指標
  nSize = packet->m_nBodySize;
  buffer = packet->m_body;
  //Chunk大小,預設128位元組
  nChunkSize = r->m_outChunkSize;

  RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket,
      nSize);
  /* send all chunks in one HTTP request */
  //使用HTTP
  if (r->Link.protocol & RTMP_FEATURE_HTTP)
    {
	//nSize:Message負載長度;nChunkSize:Chunk長度;
	//例nSize:307,nChunkSize:128;
	//可分為(307+128-1)/128=3個
	//為什麼+nChunkSize-1?因為除法會只取整數部分!
      int chunks = (nSize+nChunkSize-1) / nChunkSize;
	//Chunk個數超過一個
      if (chunks > 1)
        {
	//注意:CSize=1表示ChunkBasicHeader是2位元組
	//訊息分n塊後總的開銷:
	//n個ChunkBasicHeader,1個ChunkMsgHeader,1個Message負載
	//實際中只有第一個Chunk是完整的,剩下的只有ChunkBasicHeader
	  tlen = chunks * (cSize + 1) + nSize + hSize;
	//分配記憶體
	  tbuf = (char *) malloc(tlen);
	  if (!tbuf)
	    return FALSE;
	  toff = tbuf;
	}
	//訊息的負載+頭
    }
  while (nSize + hSize)
    {
      int wrote;
	  //訊息負載<Chunk大小(不用分塊)
      if (nSize < nChunkSize)
	//Chunk可能小於設定值
	nChunkSize = nSize;

      RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)header, hSize);
      RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize);
      if (tbuf)
        {
	//void *memcpy(void *dest, const void *src, int n);
	//由src指向地址為起始地址的連續n個位元組的資料複製到以dest指向地址為起始地址的空間內
	  memcpy(toff, header, nChunkSize + hSize);
	  toff += nChunkSize + hSize;
	}
      else
        {
	  wrote = WriteN(r, header, nChunkSize + hSize);
	  if (!wrote)
	    return FALSE;
	}
	  //訊息負載長度-Chunk負載長度
      nSize -= nChunkSize;
	  //Buffer指標前移1個Chunk負載長度
      buffer += nChunkSize;
      hSize = 0;
	  
	  //如果訊息沒有發完
      if (nSize > 0)
	{
	//ChunkBasicHeader
	  header = buffer - 1;
	  hSize = 1;
	  if (cSize)
	    {
	      header -= cSize;
	      hSize += cSize;
	    }
	  //ChunkBasicHeader第1個位元組
	  *header = (0xc0 | c);
	  //ChunkBasicHeader大於1位元組
	  if (cSize)
	    {
	      int tmp = packet->m_nChannel - 64;
	      header[1] = tmp & 0xff;
	      if (cSize == 2)
		header[2] = tmp >> 8;
	    }
	}
    }
  if (tbuf)
    {
	//
      int wrote = WriteN(r, tbuf, toff-tbuf);
      free(tbuf);
      tbuf = NULL;
      if (!wrote)
        return FALSE;
    }

  /* we invoked a remote method */
  if (packet->m_packetType == 0x14)
    {
      AVal method;
      char *ptr;
      ptr = packet->m_body + 1;
      AMF_DecodeString(ptr, &method);
      RTMP_Log(RTMP_LOGDEBUG, "Invoking %s", method.av_val);
      /* keep it in call queue till result arrives */
      if (queue) {
        int txn;
        ptr += 3 + method.av_len;
        txn = (int)AMF_DecodeNumber(ptr);
	AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn);
      }
    }

  if (!r->m_vecChannelsOut[packet->m_nChannel])
    r->m_vecChannelsOut[packet->m_nChannel] = (RTMPPacket *) malloc(sizeof(RTMPPacket));
  memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket));
  return TRUE;
}

這個函式乍一看好像非常複雜,其實不然,他只是按照RTMP規範將資料編碼成符合規範的塊(Chunk),規範可以參考相關的文件。

具體怎麼編碼成塊(Chunk)就不多分析了,在這裡需要注意一個函式:WriteN()。該函式完成了將資料傳送出去的功能。

來看一下WriteN()函式:

//傳送資料報的時候呼叫(連線,buffer,長度)
static int
WriteN(RTMP *r, const char *buffer, int n)
{
  const char *ptr = buffer;
#ifdef CRYPTO
  char *encrypted = 0;
  char buf[RTMP_BUFFER_CACHE_SIZE];

  if (r->Link.rc4keyOut)
    {
      if (n > sizeof(buf))
	encrypted = (char *)malloc(n);
      else
	encrypted = (char *)buf;
      ptr = encrypted;
      RC4_encrypt2((RC4_KEY *)r->Link.rc4keyOut, n, buffer, ptr);
    }
#endif

  while (n > 0)
    {
      int nBytes;
	  //因方式的不同而呼叫不同函式
	  //如果使用的是HTTP協議進行連線
      if (r->Link.protocol & RTMP_FEATURE_HTTP)
        nBytes = HTTP_Post(r, RTMPT_SEND, ptr, n);
      else
        nBytes = RTMPSockBuf_Send(&r->m_sb, ptr, n);
      /*RTMP_Log(RTMP_LOGDEBUG, "%s: %d\n", __FUNCTION__, nBytes); */
	  //成功傳送位元組數<0
      if (nBytes < 0)
	{
	  int sockerr = GetSockError();
	  RTMP_Log(RTMP_LOGERROR, "%s, RTMP send error %d (%d bytes)", __FUNCTION__,
	      sockerr, n);

	  if (sockerr == EINTR && !RTMP_ctrlC)
	    continue;

	  RTMP_Close(r);
	  n = 1;
	  break;
	}

      if (nBytes == 0)
	break;

      n -= nBytes;
      ptr += nBytes;
    }

#ifdef CRYPTO
  if (encrypted && encrypted != buf)
    free(encrypted);
#endif

  return n == 0;
}

該函式中,RTMPSockBuf_Send()完成了資料傳送的功能,再來看看這個函式(函式呼叫真是好多啊。。。。)
//Socket傳送(指明套接字,buffer緩衝區,資料長度)
//返回所發資料量
int
RTMPSockBuf_Send(RTMPSockBuf *sb, const char *buf, int len)
{
  int rc;

#ifdef _DEBUG
  fwrite(buf, 1, len, netstackdump);
#endif

#if defined(CRYPTO) && !defined(NO_SSL)
  if (sb->sb_ssl)
    {
      rc = TLS_write((SSL *)sb->sb_ssl, buf, len);
    }
  else
#endif
    {
	//向一個已連線的套介面傳送資料。
	//int send( SOCKET s, const char * buf, int len, int flags);
	//s:一個用於標識已連線套介面的描述字。
	//buf:包含待發送資料的緩衝區。   
	//len:緩衝區中資料的長度。
	//flags:呼叫執行方式。
	//rc:所發資料量。
      rc = send(sb->sb_socket, buf, len, 0);
    }
  return rc;
}

int
RTMPSockBuf_Close(RTMPSockBuf *sb)
{
#if defined(CRYPTO) && !defined(NO_SSL)
  if (sb->sb_ssl)
    {
      TLS_shutdown((SSL *)sb->sb_ssl);
      TLS_close((SSL *)sb->sb_ssl);
      sb->sb_ssl = NULL;
    }
#endif
  return closesocket(sb->sb_socket);
}

到這個函式的時候,發現一層層的呼叫終於完成了,最後呼叫了系統Socket的send()函式完成了資料的傳送功能。