證據(jù)一:向所有客戶端發(fā)出數(shù)據(jù):
[cpp] view plain copyBoolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) { Boolean success = True; // we'll return False instead if any of the sends fail // Normal case: Send as a UDP packet: if (!fGS->output(envir(), fGS->ttl(), packet, packetSize)) success = False; // Also, send over each of our TCP sockets: for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { if (!sendRTPOverTCP(packet, packetSize, streams->fStreamSocketNum, streams->fStreamChannelId)) { success = False; } } return success; } 很明顯啊,先發(fā)送udp數(shù)據(jù),一對多的問題在GroupSocket中解決。再發(fā)送tcp數(shù)據(jù),一對多的問題本地解決。證據(jù)二:從所有客戶端讀取數(shù)據(jù):我現(xiàn)在找不到直接的證據(jù),所以我就憶想一下吧:當udp端口或tcp端口收到數(shù)據(jù)時,分析后,是哪個客戶端的數(shù)據(jù)就發(fā)給對應這個客戶端的RTPSink或RTCPInstance。好像已經(jīng)把最開始的問題解答完了。下面讓我們來分析一下RTPInterface吧。[cpp] view plain copyvoid RTPInterface::setStreamSocket(int sockNum, unsigned char streamChannelId) { fGS->removeAllDestinations(); addStreamSocket(sockNum, streamChannelId); } void RTPInterface::addStreamSocket(int sockNum, unsigned char streamChannelId) { if (sockNum < 0) return; for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { if (streams->fStreamSocketNum == sockNum && streams->fStreamChannelId == streamChannelId) { return; // we already have it } } fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams); } setStreamSocket()沒必要說了吧,看一下addStreamSocke()。從字面意思應能了解,添加一個流式Socket,也就是添加tcp socket了。循環(huán)中查找是否已經(jīng)存在了,最后如果不存在,就創(chuàng)建之,在tcpStreamRecord的構(gòu)造函數(shù)中己經(jīng)把自己加入了鏈表。對于參數(shù),sockNum很易理解,就是socket()返回的那個SOCKET型數(shù)據(jù)唄,streamChannelId是什么呢?我們不防再猜測一下(很奇怪,我每次都能猜對,嘿嘿...):rtp over tcp時,這個tcp連接是直接利用了RTSP所用的那個tcp連接,如果同時有很多rtp session,再加上rtsp session,大家都用這一個socket通信,怎么區(qū)分你的還是我的?我想這個channel id就是用于解決這個問題。給每個session分配一個唯一的id,在發(fā)送自己的包時為包再加上個頭部,頭部中需要有session的標記--也就是這個channel id,包的長度等等字段。這樣大家就可以穿一條褲子了,術(shù)語叫多路復用,但要注意只有tcp才進行多路復用,udp是不用的,因為udp是一個session對應一個socket(加上RTCP是兩個)。想像一下,服務(wù)端要從這個tcp socket讀寫數(shù)據(jù),必須把一個handler加入TaskScheduler中,這個handler在可讀數(shù)據(jù)時進行讀,在可寫數(shù)據(jù)時進行寫。在讀數(shù)據(jù)時,對讀出的數(shù)據(jù)進行分析,取得數(shù)據(jù)包的長度,以及其channel id,跟據(jù)channel id找到相應的處handler和對象,交給它們?nèi)ヌ幚碜约旱臄?shù)據(jù)。試想兩個建立在tcp上的rtp session,這個兩個tcp socket既擔負著rtsp通訊,又擔負著rtp通訊。如果這兩個rtp session共用一個stream,那么最終負責這兩個session通信的就只有一個RTPInterface,那么這個RTPInterface中的fTCPStreams這個鏈表中就會有兩項,分別對應這兩個session。tcpStreamRecord主要用于socket number與channel id的對應。這些tcpStreamRecord是通過addStreamSocket()添加的。處理數(shù)據(jù)的handler是通過startNetworkReading()添加的,看一下下:[cpp] view plain copyvoid RTPInterface::startNetworkReading(TaskScheduler::BackgroundHandlerPRoc* handlerProc) { // Normal case: Arrange to read UDP packets: envir().taskScheduler().turnOnBackgroundReadHandling(fGS->socketNum(),handlerProc, fOwner); // Also, receive RTP over TCP, on each of our TCP connections: fReadHandlerProc = handlerProc; for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) { // Get a socket descriptor for "streams->fStreamSocketNum": SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum); // Tell it about our subChannel: socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this); } } 用UDP時很簡單,直接把處理函數(shù)做為handler加入taskScheduler即可。而TCP時,需向所有的session的socket都注冊自己。可以想像,socketDescriptor代表一個tcp socket,并且它有一個鏈表之類的東西,其中保存了所有的對這個socket感興趣的RTPInterface,同時也記錄了RTPInterface對應的channal id。只有向socketDescriptor注冊了自己,socketDescriptor在讀取數(shù)據(jù)時,才能跟據(jù)分析出的channel id找到對應的RTPInterface,才能調(diào)用RTPInterface中的數(shù)據(jù)處理handler,當然,這個函數(shù)也不是RTPInteface自己的,而是從startNetworkReading()這個函數(shù)接收到的調(diào)用者的。上述主要講的是一個RTPInterface對應多個客戶端tcp socket的情形?,F(xiàn)在又發(fā)現(xiàn)一個問題:SocketDescriptor為什么需要對應多個RTPInterface呢?上面已經(jīng)講了,是為了多路復用,因為這個socket即負擔rtsp通信又負擔rtp通信還負擔RTCP通信。SocketDescriptor記錄多路復用數(shù)據(jù)(也就是RTPInterface與channel id)用了一個Hash table:HashTable* fSubChannelHashTable。SocketDescriptor讀數(shù)據(jù)使用函數(shù):static void tcpReadHandler(SocketDescriptor*, int mask)。證據(jù)如下:[cpp] view plain copyvoid SocketDescriptor::registerRTPInterface( unsigned char streamChannelId, RTPInterface* rtpInterface) { Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty(); fSubChannelHashTable->Add((char const*) (long) streamChannelId, rtpInterface); if (isFirstRegistration) { // Arrange to handle reads on this TCP socket: TaskScheduler::BackgroundHandlerProc* handler = (TaskScheduler::BackgroundHandlerProc*) &tcpReadHandler; fEnv.taskScheduler().turnOnBackgroundReadHandling(fOurSocketNum, handler, this); } } 可見在注冊第一個多路復用對象時啟動reand handler??匆幌潞瘮?shù)主體:[cpp] view plain copyvoid SocketDescriptor::tcpReadHandler1(int mask) { // We expect the following data over the TCP channel: // optional RTSP command or response bytes (before the first '$' character) // a '$' character // a 1-byte channel id // a 2-byte packet size (in network byte order) // the packet data. // However, because the socket is being read asynchronously, this data might arrive in pieces. u_int8_t c; struct sockaddr_in fromAddress; if (fTCPReadingState != AWAITING_PACKET_DATA) { int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress); if (result != 1) { // error reading TCP socket, or no more data available if (result < 0) { // error fEnv.taskScheduler().turnOffBackgroundReadHandling( fOurSocketNum); // stops further calls to us } return; } } switch (fTCPReadingState) { case AWAITING_DOLLAR: { if (c == '$') { fTCPReadingState = AWAITING_STREAM_CHANNEL_ID; } else { // This character is part of a RTSP request or command, which is handled separately: if (fServerRequestAlternativeByteHandler != NULL) { (*fServerRequestAlternativeByteHandler)( fServerRequestAlternativeByteHandlerClientData, c); } } break; } case AWAITING_STREAM_CHANNEL_ID: { // The byte that we read is the stream channel id. if (lookupRTPInterface(c) != NULL) { // sanity check fStreamChannelId = c; fTCPReadingState = AWAITING_SIZE1; } else { // This wasn't a stream channel id that we expected. We're (somehow) in a strange state. Try to recover: fTCPReadingState = AWAITING_DOLLAR; } break; } case AWAITING_SIZE1: { // The byte that we read is the first (high) byte of the 16-bit RTP or RTCP packet 'size'. fSizeByte1 = c; fTCPReadingState = AWAITING_SIZE2; break; } case AWAITING_SIZE2: { // The byte that we read is the second (low) byte of the 16-bit RTP or RTCP packet 'size'. unsigned short size = (fSizeByte1 << 8) | c; // Record the information about the packet data that will be read next: RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId); if (rtpInterface != NULL) { rtpInterface->fNextTCPReadSize = size; rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum; rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId; } fTCPReadingState = AWAITING_PACKET_DATA; break; } case AWAITING_PACKET_DATA: { // Call the appropriate read handler to get the packet data from the TCP stream: RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId); if (rtpInterface != NULL) { if (rtpInterface->fNextTCPReadSize == 0) { // We've already read all the data for this packet. fTCPReadingState = AWAITING_DOLLAR; break; } if (rtpInterface->fReadHandlerProc != NULL) { rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask); } } return; } } } 最開始的注釋中解釋了多路復用頭的格式。這一段引起了我的興趣:[cpp] view plain copycase AWAITING_DOLLAR: { if (c == $) { fTCPReadingState = AWAITING_STREAM_CHANNEL_ID; } else { // This character is part of a RTSP request or command, which is handled separately: if (fServerRequestAlternativeByteHandler != NULL) { (*fServerRequestAlternativeByteHandler)( fServerRequestAlternativeByteHandlerClientData, c); } } break; } 啊!原來ServerRequestAlternativeByteHandler是用于處理RTSP數(shù)據(jù)的。也就是從這個socket收到RTSP數(shù)據(jù)時,調(diào)用ServerRequestAlternativeByteHandler。如果收到RTP/RTCP數(shù)據(jù)時,先查看其channel id,跟據(jù)id找到RTPInterface(RTCP也是用了RTPIterface進行通信),設(shè)置RTPInterface中與讀緩沖有關(guān)的變量,然后當讀到包數(shù)據(jù)的開始位置時,調(diào)用rtpInterface中保存的數(shù)據(jù)處理handler。還記得吧,rtpInterface中的這個數(shù)據(jù)處理handler在UDP時也被使用,在這個函數(shù)中要做的是讀取一個包的數(shù)據(jù),然后處理這個包。而SocketDescriptor把讀取位置置于包數(shù)據(jù)開始的位置再交給數(shù)據(jù)處理handler,正好可以使用與UDP相同的數(shù)據(jù)處理handler!還有,socketDescriptor們并不屬于任何RTPInterface,而是單獨保存在一個Hash table中,這樣多個RTPInterface都可以注冊到一個socketDescriptor中,以實現(xiàn)多路復用??偨Y(jié)一下通過RTPInterface,live555不僅實現(xiàn)了rtp over udp,還實現(xiàn)了rtp over tcp,而且還實現(xiàn)了同時即有rtp over tcp,又有rtp over udp!最后,channel id是從哪里來的呢?是在RTSP請求中指定的。在哪個請求中呢?自己找去吧。新聞熱點
疑難解答