国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > .NET > 正文

.NET可復(fù)用TCP通信層之消息分派器組件

2024-07-10 13:04:47
字體:
供稿:網(wǎng)友


  上一篇主要講到了tcp通信層中的核心組件――tcp組件的實現(xiàn),tcp組件是整個通信層的消息驅(qū)動源,甚至,可以將tcp組件看作是我們整個服務(wù)器系統(tǒng)的消息驅(qū)動源,消息處理過程從這里引發(fā)。類似的消息驅(qū)動源還有發(fā)布的webservice接口、remoting接口等。今天我們需要關(guān)注的是tcp通信層中的“中央”組件――消息分派器組件itcpreqstreamdispatcher,大家已經(jīng)從前文的組件關(guān)系圖中看到了消息分派器的大致位置和作用了,它是tcp通信組件和消息處理器之間的“橋梁”。我們再對前文描述的通信層組件之間關(guān)系的一段話回顧一下:

  “當(dāng)網(wǎng)絡(luò)(tcp)組件從某個tcp連接上接收到一個請求時,會將請求轉(zhuǎn)發(fā)給消息分派器,消息分派器通過idatastreamhelper組件獲取請求消息的類型,然后根據(jù)此類型要求處理器工廠創(chuàng)建對應(yīng)類型的請求處理器,請求處理器處理請求并返回結(jié)果。接下來再由網(wǎng)絡(luò)組件把結(jié)果返回給終端用戶。在消息分派器進行請求消息分派之前,可能涉及一系列的操作,像消息加密/解密、消息分裂/重組、消息驗證等。”

  上面的描述中已經(jīng)體現(xiàn)出了消息分派器的主要職責(zé),在理解了消息分派器職責(zé)的基礎(chǔ)上,我們可以進一步來看看消息分派器的定義和實現(xiàn)了。 
  二.消息分派器組件
  1.消息分派器組件接口的定義

  消息分派器的接口很簡單:

    public interface itcpreqstreamdispatcher : ireqeststreamdispatcher
    {       
        arraylist dealrequestmessage(requestdata requestdata ,out byte[] leftdata ,ref requestvalidation validation) ;//同步回復(fù)
        bool      dealrequestmessage(requestdata requestdata , networkstream userstream ,out byte[] leftdata) ; //異步回復(fù)       
    }

  這個接口只有兩個方法,第二個方法用于異步發(fā)送回復(fù)(即繞開tcp組件發(fā)送回復(fù)),該方法的核心部分可以由第一個方法實現(xiàn),我們把注意力放在第一個方法上,而tcp組件與消息分派器進行交互的也正是第一個方法。我先解釋一下這個方法的幾個參數(shù)的含義:

  requestdata是對請求消息的封裝:

    //從網(wǎng)絡(luò)接收到的原始數(shù)據(jù)的封裝
    public class requestdata
    {
        public int  connectid = 0 ;
        public bool isfirstmsg = false ; //標志是否為連接建立后的第一條消息
        public byte[] buff     = null ; //接收數(shù)據(jù)緩沖區(qū) ,可能其頭部包含上次未處理完的數(shù)據(jù)
        public int validcount  = 0 ; //緩沖區(qū)中有效字節(jié)的個數(shù) >= 本次接收的字節(jié)數(shù)       
    }

  前面已經(jīng)提到過,connectid用于標志每一個tcp連接,isfirstmsg用于表明是否為tcp連接建立后的第一個消息,因為我們可能需要對第一個消息進行額外的驗證,比如,果第一個消息不是登錄請求,就關(guān)閉該tcp連接。

  第二個參數(shù)leftdata,表示requestdata.buff中的數(shù)據(jù)經(jīng)過消息分裂器分裂之后余下的數(shù)據(jù)(一條非完整的消息),這些數(shù)據(jù)被tcp組件用來放在下一次收到的數(shù)據(jù)的頭部進行消息重組。

  第三個參數(shù)validation,是個ref參數(shù),用于通知tcp組件對消息驗證的結(jié)果,如果驗證失敗,tcp組件將關(guān)閉對應(yīng)的tcp連接。

  該方法的返回值是回復(fù)的集合,每一個回復(fù)對應(yīng)一個請求,而requestdata.buff中的數(shù)據(jù)可能分裂成多個請求。另外要注意,有些請求可能是沒有回復(fù)消息的。

  在我們的tcp組件的兩種實現(xiàn)中,都可以看到類似下面的與消息分派器交互的語句:

                //處理請求   
                byte[] leftdata = null ;               
                arraylist repondlist = this.messagedispatcher.dealrequestmessage(key.requestdata  ,out leftdata , ref key.validation) ;
                if(this.validaterequest)
                {
                    if(key.validation.gotocloseconnection)
                    {
                        this.disposeoneconnection(streamhashcode ,key.validation.cause) ;
                    }
                }
 
  2.消息分派器組件基本元素的實現(xiàn)

  正如在實現(xiàn)tcp組件之前需要構(gòu)建一些基本元素,在實現(xiàn)消息分派器之前也是如此,用于支持消息分派器實現(xiàn)的基本元素包括:idatastreamhelper、消息分裂器、消息處理器工廠、itcpstreamdispatcherhook等。

  (1)idatastreamhelper消息分裂器

  idatastreamhelper,前文中已經(jīng)提到,idatastreamhelper用于從請求/回復(fù)消息中提取消息的“元數(shù)據(jù)”,并提供一些輔助方法,每個特定的應(yīng)用,它們對idatastreamhelper的實現(xiàn)可能是不一樣的。idatastreamhelper接口定義如下:

     /// <summary>
    /// idatastreamhelper 通信協(xié)議的面向流輔助設(shè)施。
    /// </summary>
    public interface idatastreamhelper :istringencoder
    {
        int maxrecievebuffsize{get ;} //接收緩沖區(qū)的大小
        int messageheaderlength{get ;} //消息頭的長度
        int offsetoflengthfield{get ;} //表示消息長度的字段在消息頭中的偏移
        idatastreamheader parsemessageheader(byte[] data ,int offset) ; //解析消息頭
        lengthtypeinheader lengthtypeinheader{get ;}
        byte[] getrespondwhenfailure(byte[] reqdata ,servicefailuretype failtype) ;    //根據(jù)服務(wù)失敗類型獲取失敗回復(fù)消息
        byte[] getrespondwhenfailure(byte[] reqdata ,string errormsg) ;           
    }
    /// <summary>
    /// stringencoder 限定字符串編碼格式
    /// </summary>
    public interface istringencoder
    {
        string getstrfromstream(byte[] stream ,int offset ,int len) ;
        byte[] getbytesfromstr(string ss) ;
    }
    /// <summary>
    /// servicefailuretype 服務(wù)失敗類型
    /// </summary>
    public enum servicefailuretype
    {
        invalidmessge ,parsefailure ,handlefailure ,servicestopped ,serviceisnotexit ,serverisbusy
    }

  idatastreamheader即是我們所說的消息的“元數(shù)據(jù)”,如其名所示,它也是消息的“消息頭”。請讓我補充說明一下,依照我的經(jīng)驗,消息由消息頭header和消息主體body組成,消息頭用于存放消息的“元數(shù)據(jù)”等信息,而消息主體用于存放與特定請求相關(guān)的數(shù)據(jù)。消息頭的長度固定,比如都是64字節(jié)或都是128字節(jié)。請求消息和回復(fù)消息公用相同格式的消息頭。我們來看看消息頭接口idatastreamheader的定義:

    public interface idatastreamheader
    {
        int messagelength    {get ;set ;} //本消息長度
        int typekey            {get ;set ;} //請求的目錄類型
        int servicekey        {get ;set ;} //請求類型
        int serviceitemindex{get ;set ;} //請求細分索引
        int randomnum        {get ;set ;} //用于將回復(fù)與請求一一對應(yīng)起來       
        int result            {get ;set ;} //服務(wù)結(jié)果   
   
        string userid        {get ;set ;} //發(fā)出請求的用戶編號

        byte[] todatastream() ;              //將消息頭轉(zhuǎn)化為流,流的長度位消息頭的長度
        void   todatastream(byte[] buff ,int offset);   
    }

  需要解釋一下typekey、servicekey、serviceitemindex,我們實際上將服務(wù)類型分為三級,可以舉個不太恰當(dāng)?shù)睦幼尨蠹矣袀€感性的認識。比如,生活中的衣、食、住、行可以作為不同的typekey,而“衣”中的春裝、冬裝可作為servicekey,而“春裝”中的t恤、夾克可作為serviceitemindex。對于服務(wù)的類型,你可以根據(jù)自己的意愿分成任意層級,但據(jù)我的經(jīng)驗,通常情況下,三層已經(jīng)夠用了。

  (2)消息分裂器

  前面已經(jīng)多次提到消息分裂器messagesplitter,它用于將接收緩沖區(qū)中的數(shù)據(jù)分裂成一個個完整的消息,并且把余下的非完整數(shù)據(jù)返回,其接口定義如下:

public interface imessagesplitter
    {
        void initialize(int maxbuffsize ,int headerlen ,int offsetlenfield ,lengthtypeinheader lentype) ;
        arraylist splitrequestmsgs(byte[] buff ,int validcount , out byte[] leftdata) ;//arraylist 中每條記錄都是是byte[],表示一個完整的請求
    }
    //消息頭中的長度是body長度還是總長度
    public enum lengthtypeinheader
    {
        totallen ,bodylen
    }

  其中,initialize方法中的參數(shù)都可以由idatastreamheader提供。leftdata是余下的非完整消息的數(shù)據(jù)。splitrequestmsgs方法返回的集合中是一條條完整的請求消息。

  (3)消息處理器工廠

  消息處理器工廠根據(jù)消息的類型(typekey、servicekey)創(chuàng)建對應(yīng)的消息處理器來出來該消息,其接口定義如下:

    public interface irequestdealerfactory
    {
        irequestdealer createdealer(int requesttype ,int servertypekey)  ;//servertypekey 比如城市代號
        event cbackrequestrecieved requestrecieved ;
    }

  createdealer方法返回的irequestdealer就是消息處理器,每一個消息處理器用于處理某種特定類型(servicekey)的所有請求。通常,可以將消息處理器封裝成插件dll,以實現(xiàn)功能服務(wù)的“熱插拔”。

  (4)消息處理器

  消息處理器irequestdealer定義如下:

    public interface irequestdealer
    {       
        byte[]  dealrequestmessage(roundedrequestmsg reqmsg ) ;//同步回復(fù)

        event cbackrequestrecieved requestrecieved ;
    }
    public delegate void cbackrequestrecieved(roundedrequestmsg roundedmsg) ;
    /// <summary>
    /// roundedrequestmsg 對應(yīng)于一條完整的請求
    /// </summary>
    public struct roundedrequestmsg
    {
        public int connectid ; //請求所對應(yīng)的tcp連接
        public byte[] data ;
    }

  roundedrequestmsg.data是經(jīng)消息分裂器分裂得到的一個完整的請求消息,一個字節(jié)不多、一個字節(jié)也不少。

  (5)itcpstreamdispatcherhook

  itcpstreamdispatcherhook是一個hook,它為用戶提供了一個自定義的對請求/回復(fù)消息進行操作的插入點。itcpstreamdispatcherhook由tcpstreamdispatcher使用,用于對請求消息和回復(fù)消息進行截獲,然后處理或轉(zhuǎn)換這些消息,比如常用的處理/轉(zhuǎn)換操作包括:加密/解密、消息驗證等等。itcpstreamdispatcherhook定義如下:

    /// <summary>
    /// itcpstreamdispatcherhook 由tcpstreamdispatcher使用,用于對請求消息和回復(fù)消息進行截獲,然后處理轉(zhuǎn)換這些消息,
    /// 比如加密/解密。 
    /// </summary>
    public interface itcpstreamdispatcherhook
    {
        //轉(zhuǎn)換消息
        byte[] capturerequestmsg(byte[] roundedmsg) ;
        byte[] capturerespondmsg(byte[] roundedmsg) ;

        //驗證消息,以下驗證的消息是還沒有被捕獲的消息
        bool verifyfirstmsgofuser(byte[] roundedmsg ,ref requestvalidation validation) ;
        bool verifyothermessage(byte[]   roundedmsg ,ref requestvalidation validation) ;
    }

  關(guān)于這個接口中各方法的含義可以在消息分派器的實現(xiàn)中更好的領(lǐng)會!

  3.消息分派器實現(xiàn)

  在前述的基本元素的基礎(chǔ)上,實現(xiàn)消息分派器非常簡單,我們來看其核心方法dealrequestmessage的實現(xiàn)源碼:

      private imessagesplitter               curmsgsplitter = new messagespliter() ;
      private idatastreamhelper            curmsghelper ;  //必須設(shè)置
      private irequestdealerfactory       curdealerfactory ;  //必須設(shè)置
      private itcpstreamdispatcherhook tcpstreamdispatcherhook ;

       public arraylist dealrequestmessage(requestdata requestdata, out byte[] leftdata, ref requestvalidation validation)
        {
            //消息分裂
            arraylist respondlist = new arraylist() ;
            arraylist reqlist = this.curmsgsplitter.splitrequestmsgs(requestdata.buff ,requestdata.validcount ,out leftdata) ;
            if(reqlist == null)
            {
                return respondlist ;
            }               
            bool verified = true ;
            for(int i=0; i<reqlist.count ;i++)
            {       
                byte[] thedata = (byte[])reqlist[i] ;
                #region 驗證消息               
                if(requestdata.isfirstmsg && (i == 0))
                {                       
                    verified = this.tcpstreamdispatcherhook.verifyfirstmsgofuser(thedata ,ref validation) ;                   
                }
                else
                {                           
                    verified = this.tcpstreamdispatcherhook.verifyothermessage(thedata ,ref validation ) ;                   
                }

                if(! verified)
                {
                    if(validation.gotocloseconnection)
                    {
                        return null ;
                    }

                    this.addrespondtolist(respondlist ,this.curmsghelper.getrespondwhenfailure(thedata ,servicefailuretype.invalidmessge)) ;
                    continue ;
                }
                #endregion
               
                //接插,捕獲/轉(zhuǎn)換請求消息
                byte[] reqdata = this.tcpstreamdispatcherhook.capturerequestmsg(thedata) ;
                #region 處理消息
                //處理消息
                idatastreamheader header = this.curmsghelper.parsemessageheader(reqdata ,0);
                irequestdealer dealer = this.curdealerfactory.createdealer(header.servicekey ,header.typekey) ;
                if(dealer == null)
                {
                    this.addrespondtolist(respondlist ,this.curmsghelper.getrespondwhenfailure(reqdata ,servicefailuretype.serviceisnotexit)) ;
                    continue ;
                }
                roundedrequestmsg roundreqmsg = new roundedrequestmsg();
                roundreqmsg.connectid = requestdata.connectid ;
                roundreqmsg.data = reqdata ;   
                try
                {
                    byte[] responddata = dealer.dealrequestmessage(roundreqmsg) ;
                   
                    if(responddata != null)
                    {
                        this.addrespondtolist(respondlist ,responddata) ;
                    }
                }
                catch(exception ee)
                {                   
                    this.addrespondtolist(respondlist , this.curmsghelper.getrespondwhenfailure(reqdata ,ee.message)) ;
                }   
                #endregion
            }

            return respondlist;
        }
        //將回復(fù)消息加密后放入list
        private void addrespondtolist(arraylist list ,byte[] theresponddata)
        {
            //接插,捕獲/轉(zhuǎn)換回復(fù)消息
            byte[] responddata = this.tcpstreamdispatcherhook.capturerespondmsg(theresponddata) ;
            list.add(responddata) ;
        }

  如果你是一直按順序讀下來的,理解上面的實現(xiàn)一定不成什么問題。到這里,tcp通信層的所有重要的設(shè)施基本都已介紹完畢,最后,給出了提示,即,在你的應(yīng)用中,如何使用這個可復(fù)用的tcp通信層。步驟如下:

  (1)實現(xiàn)idatastreamhelper接口。

  (2)實現(xiàn)ireqeststreamdispatcher接口,如果采用的是tcp協(xié)議,則可直接使用參考實現(xiàn)tcpstreamdispatcher

  (3)實現(xiàn)各種請求處理器,這些處理器實現(xiàn)irequestdealer接口。

  (4)實現(xiàn)irequestdealerfactory接口。

  接下來,還有什么?其實,還有很多,都可以提高到框架的層次,以便復(fù)用。比如,前面我們處理消息都是基于流(byte[])的形式,在此基礎(chǔ)上,我們可以更上一層,采用基于對象的形式――即,將請求消息和回復(fù)消息都封裝成類,這就涉及了流的解析(流=>對象)和對象序列化(消息對象=>流)問題;另外,我們甚至可以將tcp用戶管理納入到框架的高度,以進行復(fù)用,比如,通常基于tcp服務(wù)的系統(tǒng)都需要管理在線的tcp用戶,并記錄tcp用戶請求服務(wù)的具體信息、在線時間等,這些經(jīng)過良好的分析概括都可以提高到復(fù)用的高度。以后有時間,我會將這樣的經(jīng)驗和大家分享。

  最后,把enterpriseserverbase類庫中的network命名空間中的源碼和大家共享,希望對大家有所幫助!(另,該命名空間中已經(jīng)包含了上述的基于對象的消息和tcp用戶管理的可復(fù)用組件)。

發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 错那县| 汉沽区| 临洮县| 昌宁县| 忻城县| 通山县| 买车| 彭泽县| 绥棱县| 哈巴河县| 漳浦县| 廉江市| 会泽县| 芒康县| 个旧市| 武威市| 清涧县| 桑日县| 夏津县| 江都市| 兴城市| 白银市| 虞城县| 佛教| 南充市| 全椒县| 延长县| 长宁区| 汉寿县| 厦门市| 西安市| 桂阳县| 县级市| 乌鲁木齐县| 靖边县| 安化县| 栾城县| 腾冲县| 五台县| 富宁县| 天津市|