国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 學院 > 開發設計 > 正文

.NET下可復用的TCP通信層實現之TCP組件

2019-11-17 04:41:55
字體:
來源:轉載
供稿:網友

  2006年已經來臨,回首剛走過的2005,心中感慨萬千。在人生和生活的目標上,有了清楚明確的定位,終于知道了自己喜歡什么樣的生活,喜歡什么樣的生活方式;在技術上,成熟了不少,眼界也開闊的不少,從面向對象到組件、從.Net到J2EE、從微軟到開源,頗有收獲。
非凡值得一提的是,熟悉了Rod Johnson這個大牛人,也終于在自己的項目中正式使用SPRing.net框架來開發了,這確實是一個優秀的框架。而在已經到來的2006年,我有一個主要目標就是B/S應用開發,來填補自己在企業級開發上的另一半空白。

  以前就很想將自己在Tcp通信層的開發心得、經驗共享出來,但一直沒有實現,究其原因,還是自己太懶了。今天終于找到一個時機,寫下這篇文章,也算是對2005年的另一種形式的回憶吧。

  絕大多數C/S(包括多層)結構的系統中,終端與服務器的通信都是通過Tcp進行的(使用Udp的也有一些,但是其相對于Tcp簡單許多,所以不在這里的討論之列)。通常,這樣的C/S系統都需要處理極大的并發,也就是說隨時都可能有成千上萬個用戶在線,并且每分鐘都可能有數以百計的用戶上線/下線。由于每個用戶都與服務器存在著一個Tcp連接,如何治理所有這些連接,并使我們的Tcp通信層穩定高效地工作,是我開發的這個“TcpTcp通信層”設計實現的主要目標。

  自從2004年9月開始至今,我就一直負責某C/S系統的服務器端的架構設計,并負責整個通信層的實現,在探索的過程中,逐漸形成了一套可復用的“Tcp通信層框架”(“框架”這個詞真的蠻嚇人,呵呵),其位于EnterpriseServerBase類庫的EnterpriseServerBase.Network命名空間中。現將我在通信層這一塊的設計/開發經驗記錄于此,以便日后回顧。也期大家多多賜教。

  我期望的“Tcp通信層”并不只是能接受連接、治理連接、轉發用戶請求這么簡單,為了構建一個高度可復用的、靈活的、可接插的Tcp通信層,需要定義很多的規則、接口、契約,這需要做很多的工作。“Tcp通信層”決不僅僅只是Tcp協議通信,由于通信與消息聯系緊密,不可避免的需要將“通信的消息”納入到我們的分析中來,比如,基于Tcp傳輸的特性,我們可能需要對接收到的消息進行分裂、重組等(后文中會解釋為什么、以及如何做)。請答應我在這里澄清一下,假如只是解決“僅僅”的Tcp通信問題,我只需要介紹Tcp組件就可以了,但是假如要解決“整個Tcp通信層”的問題,并使之可高度復用,那就需要介紹很多額外的東西,比如,上面提到的“消息”,以及“消息”所涉及的通信協議。

  在我們應用的通信層中,存在以Tcp組件為核心的多個組件,這些組件相互協作,以構建/實現高度可復用的Tcp通信層。這些組件之間的關系簡單圖示如下:

.NET下可復用的TCP通信層實現之TCP組件(圖一)

  我先解釋一下上圖。當網絡(Tcp)組件從某個Tcp連接上接收到一個請求時,會將請求轉發給消息分派器,消息分派器通過IDataStreamHelper組件獲取請求消息的類型,然后根據此類型要求處理器工廠創建對應類型的請求處理器,請求處理器處理請求并返回結果。接下來再由網絡組件把結果返回給終端用戶。在消息分派器進行請求消息分派之前,可能涉及一系列的操作,像消息加密/解密、消息分裂/重組、消息驗證等。而且,根據不同的應用,可能有其它的消息轉換要求,而且這些操作可能是多樣化的,為了滿足這種多樣性和可接插性,這就需要消息分派器提供一個插入點,讓我們可以隨心所欲地插入自定義的對請求/回復消息的預處理和后處理。

  上圖中消息分派器中可接插的操作除了消息分裂器(使用實線框)是必須的,消息加密器和消息驗證器(使用虛線框)是可選的,應根據你應用的實際情況加以決定是否使用。關于這幾個典型的可接插的組件的功能作用會在后文中介紹。在繼續介紹Tcp組件的實現之前,有必要先提一下IDataStreamHelper接口的作用,IDataStreamHelper接口用于抽象我們實際的通信協議,并能從任何一請求/回復消息中提取關于本條消息的元數據,比如,消息的長度、類型等信息。具體的應用必須根據自己的消息協議來實現IDataStreamHelper接口。關于該接口的定義也在后文中給出。

  關于上圖,需要提醒的是,整個消息的流動是由Tcp組件驅動的!這篇文章以Tcp組件和消息分派器組件為索引來組織整個可復用的Tcp通信層的實現。首先,我們來深入到Tcp組件的具體實現中去。

  一.Tcp組件

  1.Tcp組件的主要職責

  Tcp組件的主要職責并不是在一個很短的時間內總結出來的,它是逐步完善的(至今可能還不夠全面)。為了使Tcp組件具有高度的可復用性,需要考慮很多的需求,而所有這些需求中具有共性的、占主導位置的需求就被納入到Tcp組件的職責中來了。這個職責的集合如下:

  (1) 治理所有的Tcp連接以及連接對應的上下文(Context)。

  (2) 當某用戶上線或下線時,能發出事件通知。

  (3) 當在線用戶(連接)的數量發生變化時,能發出事件通知。

  (4) 當用戶的請求得到回復時,發出事件通知。這一點對于記錄用戶請求和跟蹤用戶請求非常有用)

  (5) 能及時主動關閉指定連接。比如,當某一非法用戶登錄后,用戶驗證組件通知Tcp組件強行關閉該用戶對應的連接。

  (6) 除了能轉發用戶請求及對請求的應答(通過消息分派器)外,還能直接對指定的用戶發送數據。這也要求我們的Tcp連接是多線程安全的。

  (7) 提供繞開Tcp組件直接從Tcp連接同步接收數據的功能。比如,客戶端需要上傳一個Blob,我們可能希望直接從Tcp連接進行接收數據,這是有好處的,后面可以看到。

  這里列出的是Tcp組件的主要職責,還有很多細節性的沒有羅列出來,假如一個Tcp組件解決了上述所有問題,對我來說,應該就是一個很好用、很適用的Tcp組件了。

  2.Tcp組件接口定義

  相信很多朋友和我一樣,剛接觸Tcp服務端開發的時候,通常是當一個Tcp連接建立的時候,就分配一個線程在該連接上監聽請求消息,這種方式的缺點有很多,最主要的缺點是效率低、治理復雜。

  我的最初的Tcp組件是C++版本的,那時很有幸接觸到了windows平臺上最高效的Tcp通信模型――完成端口模型,完全理解這個模型需要點時間,但是《Win32 多線程程序設計》(侯捷翻譯)和《windows網絡編程》這兩本書可以給你不少幫助。異步機制是完成端口的基礎,完成端口模型的本質思想是將"啟動異步操作的線程"和"提供服務的線程"(即工作者線程)拆伙。理解這一點很重要。在.Net中沒有對應的組件或類對應于完成端口模型,解決方案有兩個:一是通過P/Invoke來實現自己的完成端口組件,另一種方式是通過.Net的現有通信設施來模擬完成端口實現。

  本文給出第二種方案的實現說明,另外,我也給出通過“異步+線程池”的方式的Tcp組件實現,這種方式對于大并發量也可以很好的治理。也就是我,我的EnterpriseServerBase類庫中,有兩種不同方式的Tcp組件實現,一個是模擬完成端口模型,一個是“異步+線程池”方式。無論是哪種方式,它們都實現了相同的接口ITcp。ITcp這個接口涵蓋了上述的Tcp組件的所有職責,這個接口并不復雜,假如理解了,使用起來也非常簡單。我們來看看這個接口的定義:


public interface ITcp :INet ,ITcpEventList ,ITcpClientsController
{
 int ConnectionCount{get ;} //當前連接的數量
}
  這個接口繼續了另外三個接口,INet ,ITcpEventList ,ITcpClientsController。INet接口是為了統一基于Tcp和Udp的通信組件而抽象出來的,它包含了以下內容:

public interface INet
{
 void InitializeAll(IReqestStreamDispatcher i_dispatcher ,int port , bool userValidated) ;
 void InitializeAll() ;
 void UnitializeAll() ;

 NetAddinType GetProtocalType() ; //Udp, Tcp
 event CallBackDynamicMessage DynamicMsgArrived ;
 //通常是通信插件中一些與服務和用戶無關的動態信息,如監聽線程重啟等
 void Start() ;
 void Stop() ;

 IReqestStreamDispatcher Dispatcher{set;} //支持依靠注入
 int Port{get ;set ;}
 bool UserValidated{set ;}
}

public enum NetAddinType
{
 Tcp ,Udp
}

public delegate void CallBackDynamicMessage(string msg) ;
  IReqestStreamDispatcher就是我們上述圖中的消息分派器,這是Tcp通信層中的中心,它的重要性已從前面的關系圖中可見一斑了。IReqestStreamDispatcher需要在初始化的時候提供,或者通過Dispatcher屬性通過IOC容器進行設值法注入。UserValidated屬性用于決定當用戶的第一個請求不是登錄請求時,是否立即關閉Tcp連接。其它的屬性已經加上了注釋,非常輕易理解。

  ITcpEventList接口說明了Tcp組件應當發布的事件,主要對應于前述Tcp組件職責的(2)(3)(4)點。其定義如下:

public interface ITcpEventList
{
 event CallBackForTcpUser2 SomeOneConnected ; //上線
 event CallBackForTcpUser1 SomeOneDisConnected ; //掉線
 event CallBackForTcpCount ConnectionCountChanged ;//在線人數變化
 event CallBackForTcpMonitor ServiceCommitted ;//用戶請求的服務的回復信息
 event CallBackForTcpUser UserAction ;
}
  每一個在線用戶都對應著一個Tcp連接,我們使用tcp連接的Hashcode作為ConnectID來標志每一個連接。UserAction將用戶與服務器的交互分為三類:登錄、退出和標準功能訪問,如以下枚舉所示。

public enum TcpUserAction
{
 Logon , Exit , Functionaccess , //標準的功能訪問
}
  最后一個接口ITcpClientsController,主要用來完成上述Tcp組件職責的(5)(6)(7)三點。定義如下:

/// <summary>
/// ITcpController 用于服務器主動控制TCP客戶的連接
/// </summary>
public interface ITcpClientsController
{
 //同步接收消息
 bool SynRecieveFrom(int ConnectID ,byte[] buffer, int offset, int size ,out int readCount) ;

 //主動給某個客戶同步發信息
 void SendData(int ConnectID ,byte[] data) ;
 void SendData(int ConnectID, byte[] data ,int offset ,int size) ;

 //主動關閉連接
 void DispoSEOneConnection(int connectID ,DisconnectedCause cause) ;
}
  這個接口中的方法的含義是一目了然的。

  上述的幾個接口已經完整的覆蓋了前述的Tcp組件的所有職責,在了解了這些接口定義的基礎上,大家已經能夠使用EnterpriseServerBase類庫中的Tcp組件了。假如想復用的不僅僅是Tcp組件,而是整個Tcp通信層,你就需要關注后面的內容。不管怎樣,為了文章的完整性,我在這里先給出前面提到的Tcp組件的兩種實現。

  3.Tcp組件基本元素實現

  在實現Tcp組件之前,有一些基本元素需要先建立起來,比如安全的網絡流、Tcp監聽器、用戶連接上下文、上下文治理者等。(1)安全的網絡流SafeNetworkStream

  前面已經提到過,為了能在Tcp組件外部 對指定的連接發送數據,必須保證我們的Tcp連接是線程安全的,而System.Net.Sockets.NetworkStream是非線程安全的,我們必須自己對其進行封裝,以保證這一點。System.Net.Sockets.NetworkStream的線程安全的封裝就是EnterpriseServerBase.Network.SafeNetworkStream類,它繼續了ISafeNetworkStream接口:


/// <summary>
/// ISafeNetworkStream 線程安全的網絡流 。
/// 注重:假如調用的異步的begin方法,就一定要調用對應的End方法,否則鎖將得不到釋放。
/// 作者:朱偉 sky.zhuwei@163.com
/// </summary>
public interface ISafeNetworkStream :ITcpSender ,ITcpReciever
{
 void Flush();
 void Close() ;
}

//用于在TCP連接上發送數據,支持同步和異步
public interface ITcpSender
{
 void Write(byte[] buffer ,int offset ,int size) ;
 IAsyncResult BeginWrite(byte[] buffer, int offset, int size, AsyncCallback callback, object state );
 void EndWrite(IAsyncResult asyncResult );
}

//用于在TCP連接上接收數據,支持同步和異步
public interface ITcpReciever
{
 int Read (byte[] buffer ,int offset ,int size) ;
 IAsyncResult BeginRead( byte[] buffer, int offset, int size, AsyncCallback callback, object state );
 int EndRead(IAsyncResult asyncResult );
}
  該接口幾乎與System.Net.Sockets.NetworkStream提供的方法一樣,只不過它們是線程安全的。這樣,針對同一個SafeNetworkStream,我們就可以在不同的線程中同時在其上進行數據接收/發送(主要是發送)了。

  (2)Tcp監聽器EnterpriseServerBase.Network.XTcpListener

  不可否認,System.Net.Sockets.TcpListener只是提供了一些最低階的工作,為了將監聽線程、端口、監聽事件整合起來,我引入了EnterpriseServerBase.Network.XTcpListener類,它可以啟動和停止,并且當有Tcp連接建立的時候,會觸發事件。XTcpListener實現了IXTcpListener接口,其定義如下:

public interface IXTcpListener
{
 void Start() ; //開始或啟動監聽線程
 void Stop() ; //暫停,但不退出監聽線程

 void ExitListenThread() ;//退出監聽線程

 event CBackUserLogon TcpConnectionEstablished ; //新的Tcp連接成功建立
 event CallBackDynamicMsg DynamicMsgArrived ;
}
  XTcpListener可以在不同的Tcp組件中復用,這是一種更細粒度的復用。

  (3)用戶連接上下文ContextKey

  ContextKey用于將所有的與一個用戶Tcp連接相關的信息(比如接收緩沖區、連接的狀態――空閑還是忙碌、等)封裝起來,并且還能保存該用戶的請求中上次未處理完的數據,將其放于接收緩沖區的頭部,并與后面接收到的數據進行重組。說到這里,你可能不太明白,我需要解釋一下。Tcp協議可以保證我們發出的消息完整的、有序的、正確的到達目的地,但是它不能保證,我們一次發送的數據對方也能一次接收完全。比如,我們發送了一個100Bytes的數據,對方可能要接收兩次才能完全,先收到60Bytes,再收到40Bytes,這表明我們可能會收到“半條”消息。還有一種情況,你連續發了兩條100Bytes的消息,而對方可能一次就接收了160Bytes,所以需要對消息進行分裂,從中分裂出完整的消息然后進行處理。這,就是前面所說的需要對消息進行分裂、重組的原因。知道這點后,IContextKey接口應該比較輕易理解了,因為該接口的很多元素的存在都是為了輔助解決這個問題。IContextKey的定義如下:

public interface IContextKey
{
 NetStreamState StreamState{get ;set ;} //網絡流的當前狀態--空閑、忙碌
 ISafeNetworkStream NetStream{get ;set ;}

 byte[] Buffer{get ;set ;} //接收緩沖區
 int BytesRead{get ;set ;} //本次接收的字節數
 int PreLeftDataLen{get ;set ;}
 bool IsFirstMsg{get ;set ;} //是否為建立連接后的第一條消息

 int StartOffsetForRecieve{get ;}
 int MaxRecieveCapacity{get ;} //本次可以接收的最大字節數
 RequestData RequestData{get ;}

 void ResetBuffer(byte[] leftData) ;
 //leftData 表示上次沒有處理完的數據,需要與后面來的數據進行重組,然后再次處理
}
  對于消息的分裂和重組是由消息分裂器完成的,由于Tcp組件的實現不需要使用消息分裂器,所以消息分裂器的說明將在后面的消息分派器實現中講解。

  (4)上下文治理者ContextKeyManager

  ContextKeyManager用于治理所有的ContextKey,其實現的接口IContextKeyManager很輕易理解:


public interface IContextKeyManager
{
 void InsertContextKey(ContextKey context_key) ;
 void DisposeAllContextKey() ;
 bool IsAllStreamSafeToStop() ; //是否可以安全退出
 void RemoveContextKey(int streamHashCode) ;
 int ConnectionCount {get ;}
 ISafeNetworkStream GetNetStream(int streamHashCode) ;
 event CallBackCountChanged StreamCountChanged ;
}
  在上述四個基本元素的支持下,再來實現Tcp組件就方便了許多,無論是以何種方式(如完成端口模型、異步方式)實現Tcp組件,這些基本元素都是可以通用的,所以假如你要實現自己的Tcp組件,也可以考慮復用上述的一些基本元素。復用可以在不同的粒度進行,復用真是無處不在,呵呵。
  4.完成端口Tcp組件實現

  前面已經提到,完成端口模型本質思想是將"啟動異步操作的線程"和"提供服務的線程"(即工作者線程)拆伙。只要做到這一點,就模擬了完成端口。

  分析一下我們需要幾種類型的線程,首先我們需要一個線程來接收TCP連接請求,這就是所謂監聽線程,當成功的接收到一個連接后,就向連接發送一個異步接收數據的請求,由于是異步操作,所以會立即返回,然后再去接收新的連接請求,如此監聽線程就循環運作起來了(已經封裝成前述的XTcpListener組件了)。值得提出的是,在異步接收的回調函數中,應該對接收到的數據進行處理,完成端口模型所做的就是將接收到的數據放在了完成端口隊列中,注重,是一個隊列。第二種線程類型,就是工作者線程。工作者線程的個數有個經驗值是( Cpu個數×2 + 2),當然具體取多少,還要取決于你的應用的要求。工作者線程的任務就是不斷地從完成端口隊列中取出數據,并處理它,然后假如有回復,再將回復寫入對應的連接。

  讓我們來定義接口IRequestQueueManager,用于模擬完成端口的隊列,該隊列是線程安全的,用于將所有的請求進行排隊,然后由工作者線程來輪流處理這些請求。

public interface IRequestQueueManager :IRequestPusher
{
 object Pop() ;//彈出隊列中的下一個請求
 void Clear() ;
 int Length {get ;} //隊列長度
}

public interface IRequestPusher
{
 void Push(object package) ; //向隊列中壓入一個請求
}
  在IRequestQueueManager的基礎上,可以將工作者線程和啟動異步操作的線程拆開了。由于工作者線程只與端口隊列相關,所以我決定將它們一起封裝起來--成為IIOCPManager,用于治理請求隊列和工作者線程。

/// <summary>
/// IIOCPManager 完成端口治理者,主要治理工作者線程和完成端口隊列。
/// </summary>
public interface IIOCPManager : IRequestPusher
{
 void Initialize(IOCPPackageHandler i_packageHandler ,int threadCount) ;
 void Start() ; //啟動工作者線程
 void Stop() ; //退出工作者線程

 int WorkThreadCount{get ;}

 event CallBackPackageHandled PackageHandled ;
}

//IOCPPackageHandler 用于處理從完成端口隊列中取出的package
public interface IOCPPackageHandler
{
 void HandlerPackage(object package) ; //一般以同步實現
}
  有了IRequestQueueManager和IIOCPManager的支持,實現基于完成端口模型的Tcp組件就非常簡單了。當然,你也可以單獨使用IIOCPManager。你只要提供一個監聽者線程接收連接,并將從連接接收到的數據通過IRequestPusher接口放入端口隊列就可以了。 當然,為了處理接收到的數據,我們需要提供一個實現了IOCPPackageHandler接口的對象給IOCPManager。值得提出的是,你可以在數據處理并發送了回復數據后,再次投遞一個異步接收請求,以保證能源源不斷的從對應的TCP連接接收數據。下面,我們來看基于完成端口模型的Tcp組件的完整實現。

  完成端口Tcp組件

1/**//// <summary>
2 /// IocpTcp 完成端口Tcp組件。
3 /// </summary>
4 public class IocpTcp :ITcp ,IOCPPackageHandler
5 {
6 members#region members
7 private const int BufferSize = 1024 ;
8 private const int MaxWorkThreadNum = 50 ;
9
10 private IXTcpListener xtcpListener ;
11 private IIOCPManager iocpMgr = null ;
12 private ITcpReqStreamDispatcher messageDispatcher = null ;
13 private ContextKeyManager contextKeyMgr = new ContextKeyManager() ;
14 private bool stateIsStop = true ;
15 private bool validateRequest = false ;
16 private int curPort = 8888 ;
17 #endregion
18
19 public IocpTcp()
20 {
21
22 }
23 ITcp 成員#region ITcp 成員
24 public int ConnectionCount
25 {
26 get
27 {
28 return this.contextKeyMgr.ConnectionCount ;
29 }
30 }
31
32 #endregion
33
34 INet 成員#region INet 成員
35
36 InitializeAll ,UnitializeAll#region InitializeAll ,UnitializeAll
37 public void InitializeAll(IReqestStreamDispatcher i_dispatcher ,int port , bool userValidated)
38 {
39 this.messageDispatcher = i_dispatcher as ITcpReqStreamDispatcher;
40 if(this.messageDispatcher == null)
41 {
42 throw new Exception("Can't convert IReqestStreamDispatcher to ITcpReqStreamDispatcher in CompletePortManager.InitializeAll method ! ") ;
43 }
44
45 this.validateRequest = userValidated ;
46 this.curPort = port ;
47
48 this.InitializeAll() ;
49 }
50
51 public void InitializeAll()
52 {
53 this.xtcpListener = new XTcpListener(this.curPort) ;
54 this.xtcpListener.TcpConnectionEstablished += new CBackUserLogon(xtcpListener_TcpConnectionEstablished);
55 this.xtcpListener.DynamicMsgArrived += new CallBackDynamicMsg(this.PutoutDynamicMsg) ;
56 this.contextKeyMgr.StreamCountChanged += new CallBackCountChanged (this.OnStreamCountChanged) ;
57
58 this.iocpMgr = new IOCPManager() ;
59 this.iocpMgr.Initialize(this , IocpTcp.MaxWorkThreadNum) ;
60 }
61
62 public void UnitializeAll()
63 {
64 this.Stop() ;
65 this.xtcpListener.ExitListenThread() ;
66
67 //將事件容器清空==》防止外部框架再多次初始化的過程中將一個事件預定多次
68 this.ConnectionCountChanged = null ;
69 this.DynamicMsgArrived = null ;
70 this.ServiceCommitted = null ;
71 this.SomeOneConnected = null ;
72 this.SomeOneDisConnected = null ;
73 this.UserAction = null ;
74 }
75 #endregion
76
77 Start ,Stop#region Start ,Stop
78 public void Start()
79 {
80 try
81 {
82 if(this.stateIsStop)
83 {
84 this.stateIsStop = false ;
85 this.xtcpListener.Start() ;
86 this.iocpMgr.Start() ;
87 }
88 }
89 catch(Exception ee)
90 {
91 throw ee ;
92 }
93 }
94
95 public void Stop()
96 {
97 if(this.stateIsStop)
98 {
99 return ;
100 }
101
102 this.stateIsStop = true ;
103 this.xtcpListener.Stop() ;
104 this.iocpMgr.Stop() ;
105
106 //關閉所有連接
107 int count = 0 ;
108 while(! this.contextKeyMgr.IsAllStreamSafeToStop()) //等待所有流到達停止安全點
109 {
110 Thread.Sleep(200) ;
111 if(10 == count++)
112 {
113 break ;
114 }
115 }
116 this.contextKeyMgr.DisposeAllContextKey() ;
117 }
118 #endregion
119
120 public event EnterpriseServerBase.Network.CallBackDynamicMessage DynamicMsgArrived;
121
122 public NetAddinType GetProtocalType()
123 {
124 return NetAddinType.Tcp ;
125 }
126
127 #endregion
128
129 ITcpEventList 成員#region ITcpEventList 成員
130 public event EnterpriseServerBase.Network.CallBackForTcpUser2 SomeOneConnected;
131
132 public event EnterpriseServerBase.Network.CallBackForTcpMonitor ServiceCommitted;
133
134 public event EnterpriseServerBase.Network.CallBackForTcpCount ConnectionCountChanged;
135
136 public event EnterpriseServerBase.Network.CallBackForTcpUser1 SomeOneDisConnected;
137
138 public event EnterpriseServerBase.Network.CallBackForTcpUser UserAction;
139
140 #endregion
141
142 ITcpClientsController 成員#region ITcpClientsController 成員
143
144 public void SendData(int ConnectID, byte[] data)
145 {
146 this.SendData(ConnectID ,data ,0 ,data.Length) ;
147 }
148
149 public void SendData(int ConnectID, byte[] data ,int offset ,int size)
150 {
151 if((data == null) (data.Length == 0) (offset <0) (size <0) (offset+size > data.Length))
152 {
153 return ;
154 }
155
156 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;
157 if(netStream == null)
158 {
159 return ;
160 }
161
162 netStream.Write(data ,offset ,size) ;
163 }
164
165 public bool SynRecieveFrom(int ConnectID ,byte[] buffer, int offset, int size ,out int readCount)
166 {
167 readCount = 0 ;
168 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;
169 if(netStream == null)
170 {
171 return false ;
172 }
173
174 readCount = netStream.Read(buffer ,offset ,size) ;
175
176 return true ;
177 }
178
179 public void DisposeOneConnection(int connectID, EnterpriseServerBase.Network.DisconnectedCause cause)
180 {
181 this.DisposeOneConnection(connectID) ;
182
183 if(this.SomeOneDisConnected != null)
184 {
185 this.SomeOneDisConnected(connectID ,cause) ;
186 }
187
188 this.ActivateUserActionEvent(connectID ,TcpUserAction.Exit) ;
189 }
190
191 /**//// <summary>
192 /// DisposeOneConnection 主要由用戶治理模塊調用--當無法檢測到掉線情況時,該方法保證資源被釋放
193 /// </summary>
194 private void DisposeOneConnection(int connectID)
195 {
196 this.contextKeyMgr.RemoveContextKey(connectID) ;
197 }
198
199 #endregion
200
201 private#region private
202 BindRequestToQueue#region BindRequestToQueue
203 private void BindRequestToQueue(IAsyncResult ar)
204 {
205 try
206 {
207 ContextKey key = (ContextKey)ar.AsyncState ;
208 key.BytesRead = key.NetStream.EndRead(ar) ;
209 if(! this.CheckData(key))
210 {
211 return ;
212 }
213
214 this.iocpMgr.Push(key) ;
215 }
216 catch(Exception ee)
217 {
218 ee = ee ;
219 }
220 }
221
222 CheckData#region CheckData
223 private bool CheckData(ContextKey key)
224 {
225 int streamHashcode = key.NetStream.GetHashCode() ;
226 if(this.stateIsStop)
227 {
228 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.ServerStopped) ;
229 return false;
230 }
231
232 if(key.BytesRead == 0) //表示客戶端掉線或非正常關閉連接
233 {
234 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;
235 return false ;
236 }
237
238 if(key.BytesRead == 8)//表示客戶端正常關閉連接
239 {
240 string ss = System.Text.Encoding.BigEndianUnicode.GetString(key.Buffer ,0 ,8) ;
241 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;
242 return false;
243 }
244
245 return true ;
246 }
247 #endregion
248 #endregion
249
250 xtcpListener_TcpConnectionEstablished#region xtcpListener_TcpConnectionEstablished
251 private void xtcpListener_TcpConnectionEstablished(NetworkStream stream)
252 {
253 ISafeNetworkStream safeStream = new SafeNetworkStream(stream) ;
254 ContextKey key = new ContextKey(safeStream ,IocpTcp.BufferSize) ;
255 key.ResetBuffer(null) ;
256 this.contextKeyMgr.InsertContextKey(key) ;
257 int connectID = key.NetStream.GetHashCode() ;
258 if(this.SomeOneConnected != null)
259 {
260 this.SomeOneConnected(connectID) ;
261 }
262
263 this.ActivateUserActionEvent(connectID ,TcpUserAction.Logon) ;
264
265 key.IsFirstMsg = true ;
266 this.RecieveDataFrom(key) ;
267 }
268 #endregion
269
270 ActivateUserActionEvent#region ActivateUserActionEvent
271 private void ActivateUserActionEvent(int ConnectID ,TcpUserAction action)
272 {
273 if(this.UserAction != null)
274 {
275 this.UserAction(ConnectID ,action) ;
276 }
277 }
278 #endregion
279
280 PutoutDynamicMsg#region PutoutDynamicMsg
281 private void PutoutDynamicMsg(string msg)
282 {
283 if(this.DynamicMsgArrived != null)
284 {
285 this.DynamicMsgArrived(msg) ;
286 }
287 }
288 #endregion
289
290 OnStreamCountChanged#region OnStreamCountChanged
291 private void OnStreamCountChanged(int count)
292 {
293 if(this.ConnectionCountChanged != null)
294 {
295 this.ConnectionCountChanged(count) ;
296 }
297 }
298 #endregion
299
300 RecieveDataFrom#region RecieveDataFrom
301 private void RecieveDataFrom(ContextKey key)
302 {
303 try
304 {
305 key.StreamState = NetStreamState.Reading ;
306 key.NetStream.BeginRead(key.Buffer ,key.StartOffsetForRecieve ,key.MaxRecieveCapacity ,new AsyncCallback(this.BindRequestToQueue) ,key) ;
307 }
308 catch(Exception ee)
309 {
310 ee = ee ;
311 }
312
313 }
314 #endregion
315 #endregion
316
317 IOCPPackageHandler 成員#region IOCPPackageHandler 成員
318
319 public void HandlerPackage(object package)
320 {
321 ContextKey key = package as ContextKey ;
322 if(key == null)
323 {
324 return ;
325 }
326
327 int streamHashCode = key.NetStream.GetHashCode() ; //是SafeNetworkStream的hashcode
328
329 //處理請求
330 try
331 {
332 byte[] leftData = null ;
333 ArrayList repondList = this.messageDispatcher.DealRequestMessage(key.RequestData ,out leftData , ref key.Validation) ;
334
335 if(this.validateRequest)
336 {
337 if(key.Validation.gotoCloseConnection)
338 {
339 this.DisposeOneConnection(streamHashCode ,key.Validation.cause) ;
340 return ;
341 }
342 }
343
344 key.StreamState = NetStreamState.Writing ;
345 if(repondList!= null && (repondList.Count != 0))
346 {
347 foreach(object obj in repondList)
348 {
349 byte[] respond_stream = (byte[])obj ;
350 key.NetStream.Write(respond_stream ,0 ,respond_stream.Length) ;
351 if(this.ServiceCommitted != null)
352 {
353 RespondInformation info = new RespondInformation() ;
354 info.ConnectID = streamHashCode ;
355 info.ServiceKey = this.messageDispatcher.GetServiceKey(respond_stream) ;
356 info.repondData = respond_stream ;
357 this.ServiceCommitted(info) ;
358 }
359 this.ActivateUserActionEvent(streamHashCode ,TcpUserAction.FunctionAccess) ;
360 }
361 }
362
363 if(key.IsFirstMsg)
364 {
365 if(repondList == null (repondList.Count == 0)) //表示第一條消息還未接收完全
366 {
367 key.IsFirstMsg = true ;
368 }
369 else
370 {
371 key.IsFirstMsg = false ;
372 }
373 }
374
375 key.StreamState = NetStreamState.Idle ;
376
377 key.ResetBuffer(leftData) ;
378
379 if(! this.stateIsStop)
380 {
381 //繼續接收請求
382 this.RecieveDataFrom(key) ;
383 }
384 else //停止服務
385 {
386 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;
387 }
388 }
389 catch(Exception ee)
390 {
391 if(ee is System.IO.IOException) //正在讀寫流的時候,連接斷開
392 {
393 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;
394 }
395
396 ee = ee ;
397 }
398 }
399
400 #endregion
401
402 INet 成員#region INet 成員
403
404 public IReqestStreamDispatcher Dispatcher
405 {
406 set
407 {
408 this.messageDispatcher = (ITcpReqStreamDispatcher)value ;
409 }
410 }
411
412 public int Port
413 {
414 set
415 {
416 this.curPort = value ;
417 }
418 get
419 {
420 return this.curPort ;
421 }
422 }
423
424 public bool UserValidated
425 {
426 set
427 {
428 this.validateRequest = value ;
429 }
430 }
431
432 #endregion
433 }


  5.異步Tcp組件實現

  這種方式的主要思想是:當一個新的Tcp連接建立時,就在該連接上發送一個異步接收的請求(BeginRead),并在異步回調中處理該請求,當請求處理完畢,再次發送異步接收請求,如此循環下去。異步接收啟用的是系統默認線程池中的線程,所以,在異步Tcp組件中不用顯式治理工作線程。異步Tcp組件的實現相對于完成端口模型而言簡單許多,也單純一些,不用治理請求隊列,不需使用工作者線程等等。但是,相比于完成端口模型,其也有明顯的缺陷:一個Tcp連接綁定到了一個線程,即使這個線程是后臺線程池中的。假如用戶數量巨大,這對性能是極其不利的;而完成端口模型,則可以限定工作者線程的個數,并且可以根據應用的類型進行靈活調節。

  異步Tcp組件實現源碼

  異步Tcp組件

1/**//// <summary>
2 /// AsynTcp 異步Tcp組件。
3 /// </summary>
4 public class AsynTcp :ITcp
5 {
6 members#region members
7 private const int BufferSize = 1024 ;
8
9 private IXTcpListener xtcpListener = null ;
10 private ITcpReqStreamDispatcher messageDispatcher = null ;
11 private ContextKeyManager contextKeyMgr = new ContextKeyManager() ;
12 private bool stateIsStop = true ;
13 private bool validateRequest = false ;
14 private int curPort = 8888 ;
15 #endregion
16
17
18 public AsynTcp()
19 {
20
21 }
22
23 INet 成員#region INet 成員
24
25 public event CallBackDynamicMessage DynamicMsgArrived;
26
27 public NetAddinType GetProtocalType()
28 {
29
30 return NetAddinType.Tcp;
31 }
32
33 InitializeAll ,UnitializeAll#region InitializeAll ,UnitializeAll
34 public void InitializeAll(IReqestStreamDispatcher i_dispatcher, int port, bool userValidated)
35 {
36 this.messageDispatcher = i_dispatcher as ITcpReqStreamDispatcher;
37 if(this.messageDispatcher == null)
38 {
39 throw new Exception("Can't convert IReqestStreamDispatcher to ITcpReqStreamDispatcher in CompletePortManager.InitializeAll method ! ") ;
40 }
41
42 this.curPort = port ;
43 this.validateRequest = userValidated ;
44
45 this.InitializeAll() ;
46 }
47
48 public void InitializeAll()
49 {
50 this.xtcpListener = new XTcpListener(this.curPort) ;
51 this.xtcpListener.TcpConnectionEstablished += new CBackUserLogon(xtcpListener_TcpConnectionEstablished);
52 this.xtcpListener.DynamicMsgArrived += new CallBackDynamicMsg(this.PutoutDynamicMsg) ;
53 this.contextKeyMgr.StreamCountChanged += new CallBackCountChanged(this.OnStreamCountChanged) ;
54 }
55
56 public void UnitializeAll()
57 {
58 this.Stop() ;
59 this.xtcpListener.ExitListenThread() ;
60
61 //將事件容器清空==》防止外部框架再多次初始化的過程中將一個事件預定多次
62 this.ConnectionCountChanged = null ;
63 this.DynamicMsgArrived = null ;
64 this.ServiceCommitted = null ;
65 this.SomeOneConnected = null ;
66 this.SomeOneDisConnected = null ;
67 this.UserAction = null ;
68 }
69
70 #endregion
71
72 Start ,Stop#region Start ,Stop
73 public void Start()
74 {
75 if(this.stateIsStop)
76 {
77 this.xtcpListener.Start() ;
78 this.stateIsStop = false ;
79 }
80 }
81
82 public void Stop()
83 {
84 if(this.stateIsStop)
85 {
86 return ;
87 }
88
89 this.stateIsStop = true ;
90 this.xtcpListener.Stop() ;
91
92 //關閉所有連接
93 int count = 0 ;
94 while(! this.contextKeyMgr.IsAllStreamSafeToStop()) //等待所有流到達停止安全點
95 {
96 Thread.Sleep(200) ;
97 if(10 == count++)
98 {
99 break ;
100 }
101 }
102 this.contextKeyMgr.DisposeAllContextKey() ;
103 }
104 #endregion
105
106 #endregion
107
108 ITcpEventList 成員#region ITcpEventList 成員
109
110 public event EnterpriseServerBase.Network.CallBackForTcpUser2 SomeOneConnected;
111
112 public event EnterpriseServerBase.Network.CallBackForTcpMonitor ServiceCommitted;
113
114 public event EnterpriseServerBase.Network.CallBackForTcpCount ConnectionCountChanged;
115
116 public event EnterpriseServerBase.Network.CallBackForTcpUser1 SomeOneDisConnected;
117
118 public event EnterpriseServerBase.Network.CallBackForTcpUser UserAction;
119
120 #endregion
121
122 ITcpClientsController 成員#region ITcpClientsController 成員
123
124 public bool SynRecieveFrom(int ConnectID ,byte[] buffer, int offset, int size ,out int readCount)
125 {
126 readCount = 0 ;
127 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;
128 if(netStream == null)
129 {
130 return false ;
131 }
132
133 readCount = netStream.Read(buffer ,offset ,size) ;
134
135 return true ;
136 }
137
138 public void SendData(int ConnectID, byte[] data)
139 {
140 this.SendData(ConnectID ,data ,0 ,data.Length) ;
141 }
142
143 public void SendData(int ConnectID, byte[] data ,int offset ,int size)
144 {
145 if((data == null) (data.Length == 0) (offset <0) (size <0) (offset+size > data.Length))
146 {
147 return ;
148 }
149
150 ISafeNetworkStream netStream = this.contextKeyMgr.GetNetStream(ConnectID) ;
151 if(netStream == null)
152 {
153 return ;
154 }
155
156 netStream.Write(data ,offset ,size) ;
157 }
158
159 public void DisposeOneConnection(int connectID, DisconnectedCause cause)
160 {
161 this.DisposeOneConnection(connectID) ;
162
163 if(this.SomeOneDisConnected != null)
164 {
165 this.SomeOneDisConnected(connectID , cause) ;
166 }
167
168 this.ActivateUserActionEvent(connectID ,TcpUserAction.Exit) ;
169 }
170
171 #endregion
172
173 ITcp 成員#region ITcp 成員
174 public int ConnectionCount
175 {
176 get
177 {
178 return this.contextKeyMgr.ConnectionCount ;
179 }
180 }
181
182 #endregion
183
184 private#region private
185
186 ActivateUserActionEvent#region ActivateUserActionEvent
187 private void ActivateUserActionEvent(int ConnectID ,TcpUserAction action)
188 {
189 if(this.UserAction != null)
190 {
191 this.UserAction(ConnectID ,action) ;
192 }
193 }
194 #endregion
195
196 DisposeOneConnection#region DisposeOneConnection
197 /**//// <summary>
198 /// DisposeOneConnection 主要由用戶治理模塊調用--當無法檢測到掉線情況時,該方法保證資源被釋放
199 /// </summary>
200 private void DisposeOneConnection(int connectID)
201 {
202 this.contextKeyMgr.RemoveContextKey(connectID) ;
203 }
204 #endregion
205
206 xtcpListener_TcpConnectionEstablished#region xtcpListener_TcpConnectionEstablished
207 private void xtcpListener_TcpConnectionEstablished(NetworkStream stream)
208 {
209 ISafeNetworkStream safeStream = new SafeNetworkStream(stream) ;
210
211 ContextKey key = new ContextKey(safeStream ,AsynTcp.BufferSize) ;
212 key.ResetBuffer(null) ;
213 this.contextKeyMgr.InsertContextKey(key) ;
214 int connectID = key.NetStream.GetHashCode() ;
215
216 if(this.SomeOneConnected != null)
217 {
218 this.SomeOneConnected(connectID) ;
219 }
220 this.ActivateUserActionEvent(connectID ,TcpUserAction.Logon) ;
221
222 key.IsFirstMsg = true ;
223 this.RecieveDataFrom(key) ;
224 }
225 #endregion
226
227 PutoutDynamicMsg#region PutoutDynamicMsg
228 private void PutoutDynamicMsg(string msg)
229 {
230 if(this.DynamicMsgArrived != null)
231 {
232 this.DynamicMsgArrived(msg) ;
233 }
234 }
235 #endregion
236
237 OnStreamCountChanged#region OnStreamCountChanged
238 private void OnStreamCountChanged(int count)
239 {
240 if(this.ConnectionCountChanged != null)
241 {
242 this.ConnectionCountChanged(count) ;
243 }
244 }
245 #endregion
246
247 RecieveDataFrom#region RecieveDataFrom
248 private void RecieveDataFrom(ContextKey key)
249 {
250 key.StreamState = NetStreamState.Reading ;
251 key.NetStream.BeginRead(key.Buffer ,key.StartOffsetForRecieve ,key.MaxRecieveCapacity ,new AsyncCallback(this.ServeOverLap) ,key) ;
252
253 }
254 #endregion
255
256 ServeOverLap#region ServeOverLap
257 private void ServeOverLap(IAsyncResult ar)
258 {
259 ContextKey key = (ContextKey)ar.AsyncState ;
260 int streamHashCode = key.NetStream.GetHashCode() ; //是SafeNetworkStream的hashcode
261
262 try
263 {
264 key.BytesRead = key.NetStream.EndRead(ar) ;
265
266 if(! this.CheckData(key))
267 {
268 return ;
269 }
270
271 //處理請求
272 byte[] leftData = null ;
273 ArrayList repondList = this.messageDispatcher.DealRequestMessage(key.RequestData ,out leftData , ref key.Validation) ;
274
275 if(this.validateRequest)
276 {
277 if(key.Validation.gotoCloseConnection)
278 {
279 this.DisposeOneConnection(streamHashCode ,key.Validation.cause) ;
280 }
281 }
282
283 key.StreamState = NetStreamState.Writing ;
284 if(repondList!= null && (repondList.Count != 0))
285 {
286 foreach(object obj in repondList)
287 {
288 byte[] respond_stream = (byte[])obj ;
289 key.NetStream.Write(respond_stream ,0 ,respond_stream.Length) ;
290 if(this.ServiceCommitted != null)
291 {
292 RespondInformation info = new RespondInformation() ;
293 info.ConnectID = streamHashCode ;
294 info.ServiceKey = this.messageDispatcher.GetServiceKey(respond_stream) ;
295 info.repondData = respond_stream ;
296 this.ServiceCommitted(info) ;
297 }
298
299 this.ActivateUserActionEvent(streamHashCode ,TcpUserAction.FunctionAccess) ;
300 }
301 }
302
303 if(key.IsFirstMsg)
304 {
305 if(repondList == null (repondList.Count == 0)) //表示第一條消息還未接收完全
306 {
307 key.IsFirstMsg = true ;
308 }
309 else
310 {
311 key.IsFirstMsg = false ;
312 }
313 }
314
315 key.StreamState = NetStreamState.Idle ;
316
317 key.ResetBuffer(leftData) ;
318
319 if(! this.stateIsStop)
320 {
321 //繼續接收請求
322 this.RecieveDataFrom(key) ;
323 }
324 else //停止服務
325 {
326 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;
327 }
328 }
329 catch(Exception ee)
330 {
331 if(ee is System.IO.IOException) //正在讀寫流的時候,連接斷開
332 {
333 this.DisposeOneConnection(streamHashCode ,DisconnectedCause.ServerStopped) ;
334 }
335
336 ee = ee ;
337 }
338 }
339 #endregion
340
341 CheckData#region CheckData
342 private bool CheckData(ContextKey key)
343 {
344 int streamHashcode = key.NetStream.GetHashCode() ;
345 if(this.stateIsStop)
346 {
347 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.ServerStopped) ;
348 return false;
349 }
350
351 if(key.BytesRead == 0) //表示客戶端掉線或非正常關閉連接
352 {
353 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;
354 return false ;
355 }
356
357 if(key.BytesRead == 8)//表示客戶端正常關閉連接
358 {
359 string ss = System.Text.Encoding.BigEndianUnicode.GetString(key.Buffer ,0 ,8) ;
360 this.DisposeOneConnection(streamHashcode ,DisconnectedCause.LineOff) ;
361 return false;
362 }
363
364 return true ;
365 }
366 #endregion
367 #endregion
368
369 INet 成員#region INet 成員
370
371 public IReqestStreamDispatcher Dispatcher
372 {
373 set
374 {
375 this.messageDispatcher = (ITcpReqStreamDispatcher)value ;
376 }
377 }
378
379 public int Port
380 {
381 set
382 {
383 this.curPort = value ;
384 }
385 get
386 {
387 return this.curPort ;
388 }
389 }
390
391 public bool UserValidated
392 {
393 set
394 {
395 this.validateRequest = value ;
396 }
397 }
398
399 #endregion
400 }
  今天介紹了Tcp通信層中的核心――Tcp組件,僅僅復用Tcp組件已經能為我們省去很多麻煩了,假如想進行更高層次的復用――整個Tcp通信層的復用,請關注本篇的續文。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 深圳市| 新密市| 安阳县| 新宾| 大丰市| 巩留县| 固始县| 汉阴县| 汝南县| 乌鲁木齐县| 金华市| 衡水市| 长岛县| 九龙城区| 江油市| 库车县| 驻马店市| 体育| 甘南县| 视频| 河池市| 广州市| 中西区| 尚义县| 和林格尔县| 甘南县| 咸丰县| 抚顺县| 通州市| 龙口市| 鄂伦春自治旗| 庆阳市| 汶上县| 罗江县| 荥经县| 临澧县| 南召县| 安远县| 富锦市| 闸北区| 西平县|