為了進一步封裝(讓繼承類少寫一些代碼),搞出了一個虛函數continuePlaying()。讓我們來看一下:
[cpp] view plain copyBoolean MultiFramedRTPSink::continuePlaying() { // Send the first packet. // (This will also schedule any future sends.) buildAndSendPacket(True); return True; } MultiFramedRTPSink是與幀有關的類,其實它要求每次必須從source獲得一個幀的數據,所以才叫這個name。可以看到continuePlaying()完全被buildAndSendPacket()代替。看一下buildAndSendPacket():[cpp] view plain copyvoid MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) { //此函數中主要是準備rtp包的頭,為一些需要跟據實際數據改變的字段留出位置。 fIsFirstPacket = isFirstPacket; // Set up the RTP header: unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later) rtpHdr |= (fRTPPayloadType << 16); rtpHdr |= fSeqNo; // sequence number fOutBuf->enqueueWord(rtpHdr);//向包中加入一個字 // Note where the RTP timestamp will go. // (We can't fill this in until we start packing payload frames.) fTimestampPosition = fOutBuf->curPacketSize(); fOutBuf->skipBytes(4); // leave a hole for the timestamp 在緩沖中空出時間戳的位置 fOutBuf->enqueueWord(SSRC()); // Allow for a special, payload-format-specific header following the // RTP header: fSpecialHeaderPosition = fOutBuf->curPacketSize(); fSpecialHeaderSize = specialHeaderSize(); fOutBuf->skipBytes(fSpecialHeaderSize); // Begin packing as many (complete) frames into the packet as we can: fTotalFrameSpecificHeaderSizes = 0; fNoFramesLeft = False; fNumFramesUsedSoFar = 0; // 一個包中已打入的幀數。 //頭準備好了,再打包幀數據 packFrame(); } 繼續看packFrame():[cpp] view plain copyvoid MultiFramedRTPSink::packFrame() { // First, see if we have an overflow frame that was too big for the last pkt if (fOutBuf->haveOverflowData()) { //如果有幀數據,則使用之。OverflowData是指上次打包時剩下的幀數據,因為一個包可能容納不了一個幀。 // Use this frame before reading a new one from the source unsigned frameSize = fOutBuf->overflowDataSize(); struct timeval PResentationTime = fOutBuf->overflowPresentationTime(); unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds(); fOutBuf->uSEOverflowData(); afterGettingFrame1(frameSize, 0, presentationTime,durationInMicroseconds); } else { //一點幀數據都沒有,跟source要吧。 // Normal case: we need to read a new frame from the source if (fSource == NULL) return; //更新緩沖中的一些位置 fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize(); fCurFrameSpecificHeaderSize = frameSpecificHeaderSize(); fOutBuf->skipBytes(fCurFrameSpecificHeaderSize); fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize; //從source獲取下一幀 fSource->getNextFrame(fOutBuf->curPtr(),//新數據存放開始的位置 fOutBuf->totalBytesAvailable(),//緩沖中空余的空間大小 afterGettingFrame, //因為可能source中的讀數據函數會被放在任務調度中,所以把獲取幀后應調用的函數傳給source this, ourHandleClosure, //這個是source結束時(比如文件讀完了)要調用的函數。 this); } } 可以想像下面就是source從文件(或某個設備)中讀取一幀數據,讀完后返回給sink,當然不是從函數返回了,而是以調用afterGettingFrame這個回調函數的方式。所以下面看一下afterGettingFrame():[cpp] view plain copyvoid MultiFramedRTPSink::afterGettingFrame(void* clientData, unsigned numBytesRead, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { MultiFramedRTPSink* sink = (MultiFramedRTPSink*) clientData; sink->afterGettingFrame1(numBytesRead, numTruncatedBytes, presentationTime, durationInMicroseconds); } 沒什么可看的,只是過度為調用成員函數,所以afterGettingFrame1()才是重點:[cpp] view plain copyvoid MultiFramedRTPSink::afterGettingFrame1( unsigned frameSize, unsigned numTruncatedBytes, struct timeval presentationTime, unsigned durationInMicroseconds) { if (fIsFirstPacket) { // Record the fact that we're starting to play now: gettimeofday(&fNextSendTime, NULL); } //如果給予一幀的緩沖不夠大,就會發生截斷一幀數據的現象。但也只能提示一下用戶 if (numTruncatedBytes > 0) { unsigned const bufferSize = fOutBuf->totalBytesAvailable(); envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size (" << bufferSize << "). " << numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing /"OutPacketBuffer::maxSize/" to at least " << OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'. (Current value is " << OutPacketBuffer::maxSize << ".)/n"; } unsigned curFragmentationOffset = fCurFragmentationOffset; unsigned numFrameBytesToUse = frameSize; unsigned overflowBytes = 0; //如果包只已經打入幀數據了,并且不能再向這個包中加數據了,則把新獲得的幀數據保存下來。 // If we have already packed one or more frames into this packet, // check whether this new frame is eligible to be packed after them. // (This is independent of whether the packet has enough room for this // new frame; that check comes later.) if (fNumFramesUsedSoFar > 0) { //如果包中已有了一個幀,并且不允許再打入新的幀了,則只記錄下新的幀。 if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize)) { // Save away this frame for next time: numFrameBytesToUse = 0; fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize, presentationTime, durationInMicroseconds); } } //表示當前打入的是否是上一個幀的最后一塊數據。 fPreviousFrameEndedFragmentation = False; //下面是計算獲取的幀中有多少數據可以打到當前包中,剩下的數據就作為overflow數據保存下來。 if (numFrameBytesToUse > 0) { // Check whether this frame overflows the packet if (fOutBuf->wouldOverflow(frameSize)) { // Don't use this frame now; instead, save it as overflow data, and // send it in the next packet instead. However, if the frame is too // big to fit in a packet by itself, then we need to fragment it (and // use some of it in this packet, if the payload format permits this.) if (isTooBigForAPacket(frameSize) && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) { // We need to fragment this frame, and use some of it now: overflowBytes = computeOverflowForNewFrame(frameSize); numFrameBytesToUse -= overflowBytes; fCurFragmentationOffset += numFrameBytesToUse; } else { // We don't use any of this frame now: overflowBytes = frameSize; numFrameBytesToUse = 0; } fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse, overflowBytes, presentationTime, durationInMicroseconds); } else if (fCurFragmentationOffset > 0) { // This is the last fragment of a frame that was fragmented over // more than one packet. Do any special handling for this case: fCurFragmentationOffset = 0; fPreviousFrameEndedFragmentation = True; } } if (numFrameBytesToUse == 0 && frameSize > 0) { //如果包中有數據并且沒有新數據了,則發送之。(這種情況好像很難發生啊!) // Send our packet now, because we have filled it up: sendPacketIfNecessary(); } else { //需要向包中打入數據。 // Use this frame in our outgoing packet: unsigned char* frameStart = fOutBuf->curPtr(); fOutBuf->increment(numFrameBytesToUse); // do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes // Here's where any payload format specific processing gets done: doSpecialFrameHandling(curFragmentationOffset, frameStart, numFrameBytesToUse, presentationTime, overflowBytes); ++fNumFramesUsedSoFar; // Update the time at which the next packet should be sent, based // on the duration of the frame that we just packed into it. // However, if this frame has overflow data remaining, then don't // count its duration yet. if (overflowBytes == 0) { fNextSendTime.tv_usec += durationInMicroseconds; fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000; fNextSendTime.tv_usec %= 1000000; } //如果需要,就發出包,否則繼續打入數據。 // Send our packet now if (i) it's already at our preferred size, or // (ii) (heuristic) another frame of the same size as the one we just // read would overflow the packet, or // (iii) it contains the last fragment of a fragmented frame, and we // don't allow anything else to follow this or // (iv) one frame per packet is allowed: if (fOutBuf->isPreferredSize() || fOutBuf->wouldOverflow(numFrameBytesToUse) || (fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment()) || !frameCanAppearAfterPacketStart( fOutBuf->curPtr() - frameSize, frameSize)) { // The packet is ready to be sent now sendPacketIfNecessary(); } else { // There's room for more frames; try getting another: packFrame(); } } } 看一下發送數據的函數:[cpp] view plain copyvoid MultiFramedRTPSink::sendPacketIfNecessary() { //發送包 if (fNumFramesUsedSoFar > 0) { // Send the packet: #ifdef TEST_LOSS if ((our_random()%10) != 0) // simulate 10% packet loss ##### #endif if (!fRTPInterface.sendPacket(fOutBuf->packet(),fOutBuf->curPacketSize())) { // if failure handler has been specified, call it if (fOnSendErrorFunc != NULL) (*fOnSendErrorFunc)(fOnSendErrorData); } ++fPacketCount; fTotalOctetCount += fOutBuf->curPacketSize(); fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes; ++fSeqNo; // for next time } //如果還有剩余數據,則調整緩沖區 if (fOutBuf->haveOverflowData() && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) { // Efficiency hack: Reset the packet start pointer to just in front of // the overflow data (allowing for the RTP header and special headers), // so that we probably don't have to "memmove()" the overflow data // into place when building the next packet: unsigned newPacketStart = fOutBuf->curPacketSize()- (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize()); fOutBuf->adjustPacketStart(newPacketStart); } else { // Normal case: Reset the packet start pointer back to the start: fOutBuf->resetPacketStart(); } fOutBuf->resetOffset(); fNumFramesUsedSoFar = 0; if (fNoFramesLeft) { //如果再沒有數據了,則結束之 // We're done: onSourceClosure(this); } else { //如果還有數據,則在下一次需要發送的時間再次打包發送。 // We have more frames left to send. Figure out when the next frame // is due to start playing, then make sure that we wait this long before // sending the next packet. struct timeval timeNow; gettimeofday(&timeNow, NULL); int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec; int64_t uSecondsToGo = secsDiff * 1000000 + (fNextSendTime.tv_usec - timeNow.tv_usec); if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative: uSecondsToGo = 0; } // Delay this amount of time: nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo, (TaskFunc*) sendNext, this); } } 可以看到為了延遲包的發送,使用了delay task來執行下次打包發送任務。sendNext()中又調用了buildAndSendPacket()函數,呵呵,又是一個圈圈。
總結一下調用過程:

最后,再說明一下包緩沖區的使用:
MultiFramedRTPSink中的幀數據和包緩沖區共用一個,只是用一些額外的變量指明緩沖區中屬于包的部分以及屬于幀數據的部分(包以外的數據叫做overflow data)。它有時會把overflow data以mem move的方式移到包開始的位置,有時把包的開始位置直接設置到overflow data開始的地方。那么這個緩沖的大小是怎樣確定的呢?是跟據調用者指定的的一個最大的包的大小+60000算出的。這個地方把我搞胡涂了:如果一次從source獲取一個幀的話,那這個緩沖應設為不小于最大的一個幀的大小才是,為何是按包的大小設置呢?可以看到,當緩沖不夠時只是提示一下:
[cpp] view plain copyif (numTruncatedBytes > 0) { unsigned const bufferSize = fOutBuf->totalBytesAvailable(); envir() << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size (" << bufferSize << "). " << numTruncatedBytes << " bytes of trailing data was dropped! Correct this by increasing /"OutPacketBuffer::maxSize/" to at least " << OutPacketBuffer::maxSize + numTruncatedBytes << ", *before* creating this 'RTPSink'. (Current value is " << OutPacketBuffer::maxSize << ".)/n"; }
當然此時不會出錯,但有可能導致時間戳計算不準,或增加時間戳計算與source端處理的復雜性(因為一次取一幀時間戳是很好計算的)。新聞熱點
疑難解答