接下來我們分析RTMP_SendPacket函數(shù)。我們先了解一下rtmp的消息格式chunk。
RTMP的head組成
RTMP的head在協(xié)議中的表現(xiàn)形式是chunk head,前面已經(jīng)說到一個Message + head可以分成一個和多個chunk,為了區(qū)分這些chunk,肯定是需要一個chunk head的,具體的實現(xiàn)就把Message head的信息和chunk head的信息合并在一起以chunk head的形式表現(xiàn)。 一個完整的chunk的組成如下圖所示
![]()
Chunk basic header: 該字段包含chunk的stream ID和 type 。chunk的Type決定了消息頭的編碼方式。該字段的長度完全依賴于stream ID,該字段是一個可變長的字段。


Chunk Msg Header:0, 3 ,7, 11 該字段包含了將要發(fā)送的消息的信息(或者是一部分,一個消息拆成多個chunk的情況下是一部分)該字段的長度由chunk basic header中的type決定。

Extend Timestamp: 0 ,4 bytes該字段發(fā)送的時候必須是正常的時間戳設(shè)置成0xffffff時,當(dāng)正常時間戳不為0xffffff時,該字段不發(fā)送。當(dāng)時間戳比0xffffff小該字段不發(fā)送,當(dāng)時間戳比0xffffff大時該字段必須發(fā)送,且正常時間戳設(shè)置成0xffffff。
Chunk Data實際數(shù)據(jù)(Payload),可以是信令,也可以是媒體數(shù)據(jù)。
總結(jié)如下圖所示:


6.1.2 塊消息頭有四種格式的塊消息ID,供塊流基本頭中的fmt 字段選擇。一個實現(xiàn)應(yīng)該使用最緊致的方式來表示塊消息頭。
6.1.2.1 類型00 類型的塊長度為11 字節(jié)。在一個塊流的開始和時間戳返回的時候必須有這種塊。

時間戳:3 字節(jié)對于0 類型的塊。消息的絕對時間戳在這里發(fā)送。如果時間戳大于或等于16777215(16 進(jìn)制0x00ffffff),該值必須為16777215,并且擴展時間戳必須出現(xiàn)。否則該值就是整個的時間戳。
6.1.2.2. 類型1類型1 的塊占7 個字節(jié)長。消息流ID 不包含在本塊中。塊的消息流ID 與先前的塊相同。具有可變大小消息的流,在第一個消息之后的每個消息的第一個塊應(yīng)該使用這個格式。

6.1.2.3. 類型2類型2 的塊占3 個字節(jié)。既不包含流ID 也不包含消息長度。本塊使用的流ID 和消息長度與先前的塊相同。具有固定大小消息的流,在第一個消息之后的每個消息的第一個塊應(yīng)該使用這個格式。
6.1.2.4 類型3類型3 的塊沒有頭。流ID,消息長度,時間戳都不出現(xiàn)。這種類型的塊使用與先前塊相同的數(shù)據(jù)。當(dāng)一個消息被分成多個塊,除了第一塊以外,所有的塊都應(yīng)使用這種類型。示例可參考6.2.2 節(jié)中的例2 。由相同大小,流ID,和時間間隔的流在類型2 的塊之后應(yīng)使用這種塊。示例可參考6.2.1 節(jié)中的例1 。如果第一個消息和第二個消息的時間增量與第一個消息的時間戳相同,那么0類型的塊之后必須是3 類型的塊而,不需要類型2 的塊來注冊時間增量。如果類型3 的塊在類型0 的塊之后,那么類型3 的時間戳增量與0 類型的塊的時間戳相同。
時間戳增量:3 字節(jié)對于類型1 的塊和類型2 的塊,本字段表示先前塊的時間戳與當(dāng)前塊的時間戳的差值。如果增量大于等于1677215(16 進(jìn)制0x00ffffff),這個值必須是16777215 ,并且擴展時間戳必須出現(xiàn)。否則這個值就是整個的增量。
消息長度:3 字節(jié)對于類型0 或類型1 的塊本字段表示消息的長度。注意,這個值通常與負(fù)載長度是不相同的。The chunk payload length is the maximum chunk size for all but the last chunk, and the remainder (which may be the entire length, for small messages) for the last chunk.
消息類型ID:1 字節(jié)對于0 類型和1 類型的塊,本字段發(fā)送消息類型。
消息流ID:4 字節(jié)對于0 類型的塊,本字段存儲消息流ID。通常,在一個塊流中的消息來自于同一個消息流。雖然,由于不同的消息可能復(fù)用到一個塊流中而使頭壓縮無法有效實施。但是,如果一個消息流關(guān)閉而另一個消息流才打開,那么通過發(fā)送一個新的0 類型的塊重復(fù)使用一個存在的塊流也不是不可以。
6.1.3. 擴展時間戳只有當(dāng)塊消息頭中的普通時間戳設(shè)置為0x00ffffff 時,本字段才被傳送。如果普通時間戳的值小于0x00ffffff,那么本字段一定不能出現(xiàn)。如果時間戳字段不出現(xiàn)本字段也一定不能出現(xiàn)。類型3 的塊一定不能含有本字段。本字段在塊消息頭之后,塊時間之前。

代碼分析如下:
int RTMP_SendPacket(RTMP *r, RTMPPacket *packet, int queue){ const RTMPPacket *PRevPacket = r->m_vecChannelsOut[packet->m_nChannel]; uint32_t last = 0; int nSize; int hSize, cSize; char *header, *hptr, *hend, hbuf[RTMP_MAX_HEADER_SIZE], c; uint32_t t; char *buffer, *tbuf = NULL, *toff = NULL; int nChunkSize; int tlen; // 前一個packet存在且不是完整的ChunkMsgHeader,因此有可能需要調(diào)整塊消息頭的類型 //fmt字節(jié) /*case 0:chunk msg header 長度為11 * case 1:chunk msg header 長度為7 * case 2:chunk msg header 長度為3 * case 3:chunk msg header 長度為0 */ if (prevPacket && packet->m_headerType != RTMP_PACKET_SIZE_LARGE) { /* compress a bit by using the prev packet's attributes */ // 獲取ChunkMsgHeader類型,前一個Chunk與當(dāng)前Chunk比較 // 如果前后兩個塊的大小、包類型及塊頭類型都相同,則將塊頭類型fmt設(shè)為2, // 即可省略消息長度、消息類型id、消息流id // 可以參考官方協(xié)議:流的分塊 --- 6.1.2.3節(jié) if (prevPacket->m_nBodySize == packet->m_nBodySize&& prevPacket->m_packetType == packet->m_packetType && packet->m_headerType == RTMP_PACKET_SIZE_MEDIUM) packet->m_headerType = RTMP_PACKET_SIZE_SMALL; // 前后兩個塊的時間戳相同,且塊頭類型fmt為2,則相應(yīng)的時間戳也可省略,因此將塊頭類型置為3 // 可以參考官方協(xié)議:流的分塊 --- 6.1.2.4節(jié) if (prevPacket->m_nTimeStamp == packet->m_nTimeStamp && packet->m_headerType == RTMP_PACKET_SIZE_SMALL) packet->m_headerType = RTMP_PACKET_SIZE_MINIMUM; last = prevPacket->m_nTimeStamp;// 前一個包的時間戳 } // 塊頭類型fmt取值0、1、2、3,超過3就表示出錯(fmt占二個字節(jié)) if (packet->m_headerType > 3) /* sanity */ { RTMP_Log(RTMP_LOGERROR, "sanity failed!! trying to send header of type: 0x%02x.", (unsigned char)packet->m_headerType); return FALSE; } // 塊頭初始大小 = 基本頭(1字節(jié)) + 塊消息頭大小(11/7/3/0) = [12, 8, 4, 1] // 塊基本頭是1-3字節(jié),因此用變量cSize來表示剩下的0-2字節(jié) // nSize 表示塊頭初始大小, hSize表示塊頭大小 nSize = packetSize[packet->m_headerType]; hSize = nSize; cSize = 0; // 時間戳增量 t = packet->m_nTimeStamp - last; if (packet->m_body) { // m_body是指向負(fù)載數(shù)據(jù)首地址的指針;“-”號用于指針前移 header = packet->m_body - nSize; // 塊頭的首指針 hend = packet->m_body; // 塊頭的尾指針 } else { header = hbuf + 6; hend = hbuf + sizeof(hbuf); } if (packet->m_nChannel > 319)// 塊流id(cs id)大于319,則塊基本頭占3個字節(jié) cSize = 2; else if (packet->m_nChannel > 63)// 塊流id(cs id)在64與319之間,則塊基本頭占2個字節(jié) cSize = 1; // ChunkBasicHeader的長度比初始長度還要長 if (cSize) { header -= cSize;// header指向塊頭 hSize += cSize;// hSize加上ChunkBasicHeader的長度(比初始長度多出來的長度) } // nSize > 1表示塊消息頭至少有3個字節(jié),即存在timestamp字段 // 相對TimeStamp大于0xffffff,此時需要使用ExtendTimeStamp if (nSize > 1 && t >= 0xffffff) { header -= 4; hSize += 4; } hptr = header; c = packet->m_headerType << 6;// 把ChunkBasicHeader的Fmt類型左移6位 // 設(shè)置basic header的第一個字節(jié)值,前兩位為fmt. 可以參考官方協(xié)議:流的分塊 --- 6.1.1節(jié) switch (cSize) { case 0:// 把ChunkBasicHeader的低6位設(shè)置成ChunkStreamID( cs id ) c |= packet->m_nChannel; break; case 1:// 同理,但低6位設(shè)置成000000 break; case 2:// 同理,但低6位設(shè)置成000001 c |= 1; break; } *hptr++ = c;// 可以拆分成兩句*hptr=c; hptr++,此時hptr指向第2個字節(jié) // 設(shè)置basic header的第二(三)個字節(jié)值 if (cSize) { int tmp = packet->m_nChannel - 64;// 將要放到第2字節(jié)的內(nèi)容tmp *hptr++ = tmp & 0xff;// 獲取低位存儲與第2字節(jié) if (cSize == 2)// ChunkBasicHeader是最大的3字節(jié)時,獲取高位存儲于最后1個字節(jié)(注意:排序使用大端序列,和主機相反) *hptr++ = tmp >> 8; } if (nSize > 1)// ChunkMsgHeader長度為11、7、3, 都含有timestamp(3字節(jié)) { // 將時間戳(相對或絕對)轉(zhuǎn)化為3個字節(jié)存入hptr,如果時間戳超過0xffffff,則后面還要填入Extend Timestamp hptr = AMF_EncodeInt24(hptr, hend, t > 0xffffff ? 0xffffff : t); } if (nSize > 4)// ChunkMsgHeader長度為11、7,都含有 msg length + msg type id { // 將消息長度(msg length)轉(zhuǎn)化為3個字節(jié)存入hptr hptr = AMF_EncodeInt24(hptr, hend, packet->m_nBodySize); *hptr++ = packet->m_packetType; } // ChunkMsgHeader長度為11, 含有msg stream id( 小端) if (nSize > 8) hptr += EncodeInt32LE(hptr, packet->m_nInfoField2); if (nSize > 1 && t >= 0xffffff)// 如果時間戳大于0xffffff,則需要寫入Extend Timestamp hptr = AMF_EncodeInt32(hptr, hend, t); // 到此為止,已經(jīng)將塊頭填寫好了 // 此時nSize表示負(fù)載數(shù)據(jù)的長度, buffer是指向負(fù)載數(shù)據(jù)區(qū)的指針 nSize = packet->m_nBodySize; buffer = packet->m_body; nChunkSize = r->m_outChunkSize;//Chunk大小,默認(rèn)是128字節(jié) RTMP_Log(RTMP_LOGDEBUG2, "%s: fd=%d, size=%d", __FUNCTION__, r->m_sb.sb_socket, nSize); /* send all chunks in one HTTP request ,使用HTTP協(xié)議 */ if (r->Link.protocol & RTMP_FEATURE_HTTP) { // nSize:Message負(fù)載長度;nChunkSize:Chunk長度; // 例nSize:307,nChunkSize:128; // 可分為(307 + 128 - 1)/128 = 3個 // 為什么加 nChunkSize - 1?因為除法會只取整數(shù)部分! int chunks = (nSize + nChunkSize - 1) / nChunkSize; if (chunks > 1)// Chunk個數(shù)超過一個 { // 注意:ChunkBasicHeader的長度 = cSize + 1 // 消息分n塊后總的開銷: // n個ChunkBasicHeader,1個ChunkMsgHeader,1個Message負(fù)載 // 實際上只有第一個Chunk是完整的,剩下的只有ChunkBasicHeader tlen = chunks * (cSize + 1) + nSize + hSize; tbuf = malloc(tlen); if (!tbuf) return FALSE; toff = tbuf; } } // 消息的負(fù)載 + 頭 while (nSize + hSize) { int wrote; if (nSize < nChunkSize)// 消息負(fù)載大小 < Chunk大小(不用分塊) nChunkSize = nSize;// Chunk可能小于設(shè)定值 RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)header, hSize); RTMP_LogHexString(RTMP_LOGDEBUG2, (uint8_t *)buffer, nChunkSize); // 如果r->Link.protocol采用Http協(xié)議,則將RTMP包數(shù)據(jù)封裝成多個Chunk,然后一次性發(fā)送。 // 否則每封裝成一個塊,就立即發(fā)送出去 if (tbuf) { // 將從Chunk頭開始的nChunkSize + hSize個字節(jié)拷貝至toff中, // 這些拷貝的數(shù)據(jù)包括塊頭數(shù)據(jù)(hSize字節(jié))和nChunkSize個負(fù)載數(shù)據(jù) memcpy(toff, header, nChunkSize + hSize); toff += nChunkSize + hSize; } else// 負(fù)載數(shù)據(jù)長度不超過設(shè)定的塊大小,不需要分塊,因此tbuf為NULL;或者r->Link.protocol不采用Http { // 直接將負(fù)載數(shù)據(jù)和塊頭數(shù)據(jù)發(fā)送出去 wrote = WriteN(r, header, nChunkSize + hSize); if (!wrote) return FALSE; } nSize -= nChunkSize;// 消息負(fù)載長度 - Chunk負(fù)載長度 buffer += nChunkSize;// buffer指針前移1個Chunk負(fù)載長度 hSize = 0;// 重置塊頭大小為0,后續(xù)的塊只需要有基本頭(或加上擴展時間戳)即可 // 如果消息負(fù)載數(shù)據(jù)還沒有發(fā)完,準(zhǔn)備填充下一個塊的塊頭數(shù)據(jù) if (nSize > 0) { header = buffer - 1; hSize = 1; if (cSize) { header -= cSize; hSize += cSize; } *header = (0xc0 | c); if (cSize) { int tmp = packet->m_nChannel - 64; header[1] = tmp & 0xff; if (cSize == 2) header[2] = tmp >> 8; } } } if (tbuf) { int wrote = WriteN(r, tbuf, toff - tbuf); free(tbuf); tbuf = NULL; if (!wrote) return FALSE; } /* we invoked a remote method */ if (packet->m_packetType == 0x14) { AVal method; char *ptr; ptr = packet->m_body + 1; AMF_DecodeString(ptr, &method); RTMP_Log(RTMP_LOGDEBUG, "Invoking %s", method.av_val); /* keep it in call queue till result arrives */ if (queue) { int txn; ptr += 3 + method.av_len; txn = (int)AMF_DecodeNumber(ptr); AV_queue(&r->m_methodCalls, &r->m_numCalls, &method, txn); } } if (!r->m_vecChannelsOut[packet->m_nChannel]) r->m_vecChannelsOut[packet->m_nChannel] = malloc(sizeof(RTMPPacket)); memcpy(r->m_vecChannelsOut[packet->m_nChannel], packet, sizeof(RTMPPacket)); return TRUE;}
新聞熱點
疑難解答