上一篇主要講到了Tcp通信層中的核心組件――Tcp組件的實現,Tcp組件是整個通信層的消息驅動源,甚至,
可以將Tcp組件看作是我們整個服務器系統的消息驅動源,消息處理過程從這里引發。類似的消息驅動源還有發布的WebService接口、Remoting接口等。今天我們需要關注的是Tcp通信層中的“中心”組件――消息分派器組件ITcPReqStreamDispatcher,大家已經從前文的組件關系圖中看到了消息分派器的大致位置和作用了,它是Tcp通信組件和消息處理器之間的“橋梁”。我們再對前文描述的通信層組件之間關系的一段話回顧一下:
“當網絡(Tcp)組件從某個Tcp連接上接收到一個請求時,會將請求轉發給消息分派器,消息分派器通過IDataStreamHelper組件獲取請求消息的類型,然后根據此類型要求處理器工廠創建對應類型的請求處理器,請求處理器處理請求并返回結果。接下來再由網絡組件把結果返回給終端用戶。在消息分派器進行請求消息分派之前,可能涉及一系列的操作,像消息加密/解密、消息分裂/重組、消息驗證等。”
上面的描述中已經體現出了消息分派器的主要職責,在理解了消息分派器職責的基礎上,我們可以進一步來看看消息分派器的定義和實現了。 二.消息分派器組件
1.消息分派器組件接口的定義
消息分派器的接口很簡單:
Word-BREAK: break-all; PADDING-TOP: 4px; BORDER-BOTTOM: #cccccc 1px solid; FONT-FAMILY: Verdana; BACKGROUND-COLOR: #eeeeee"> public interface ITcpReqStreamDispatcher : IReqestStreamDispatcher
{
ArrayList DealRequestMessage(RequestData requestData ,out byte[] leftData ,ref RequestValidation validation) ;//同步回復
bool DealRequestMessage(RequestData requestData , NetworkStream userStream ,out byte[] leftData) ; //異步回復
}
這個接口只有兩個方法,第二個方法用于異步發送回復(即繞開Tcp組件發送回復),該方法的核心部分可以由第一個方法實現,我們把注重力放在第一個方法上,而Tcp組件與消息分派器進行交互的也正是第一個方法。我先解釋一下這個方法的幾個參數的含義:
RequestData是對請求消息的封裝:
//從網絡接收到的原始數據的封裝
public class RequestData
{
public int ConnectID = 0 ;
public bool IsFirstMsg = false ; //標志是否為連接建立后的第一條消息
public byte[] Buff = null ; //接收數據緩沖區 ,可能其頭部包含上次未處理完的數據
public int ValidCount = 0 ; //緩沖區中有效字節的個數 >= 本次接收的字節數
}
前面已經提到過,ConnectID用于標志每一個Tcp連接,IsFirstMsg用于表明是否為tcp連接建立后的第一個消息,因為我們可能需要對第一個消息進行額外的驗證,比如,果第一個消息不是登錄請求,就關閉該Tcp連接。
第二個參數leftData,表示RequestData.Buff中的數據經過消息分裂器分裂之后余下的數據(一條非完整的消息),這些數據被Tcp組件用來放在下一次收到的數據的頭部進行消息重組。
第三個參數validation,是個ref參數,用于通知Tcp組件對消息驗證的結果,假如驗證失敗,Tcp組件將關閉對應的Tcp連接。
該方法的返回值是回復的集合,每一個回復對應一個請求,而RequestData.Buff中的數據可能分裂成多個請求。另外要注重,有些請求可能是沒有回復消息的。
在我們的Tcp組件的兩種實現中,都可以看到類似下面的與消息分派器交互的語句:
//處理請求
byte[] leftData = null ;
ArrayList repondList = this.messageDispatcher.DealRequestMessage(key.RequestData ,out leftData , ref key.Validation) ;
if(this.validateRequest)
{
if(key.Validation.gotoCloseConnection)
{
this.Dispo
SEOneConnection(streamHashCode ,key.Validation.cause) ;
}
}
2.消息分派器組件基本元素的實現
正如在實現Tcp組件之前需要構建一些基本元素,在實現消息分派器之前也是如此,用于支持消息分派器實現的基本元素包括:IDataStreamHelper、消息分裂器、消息處理器工廠、ITcpStreamDispatcherHook等。
(1)IDataStreamHelper消息分裂器
IDataStreamHelper,前文中已經提到,IDataStreamHelper用于從請求/回復消息中提取消息的“元數據”,并提供一些輔助方法,每個特定的應用,它們對IDataStreamHelper的實現可能是不一樣的。IDataStreamHelper接口定義如下:
/// <summary>
/// IDataStreamHelper 通信協議的面向流輔助設施。
/// </summary>
public interface IDataStreamHelper :IStringEncoder
{
int MaxRecieveBuffSize{get ;} //接收緩沖區的大小
int MessageHeaderLength{get ;} //消息頭的長度
int OffsetOfLengthField{get ;} //表示消息長度的字段在消息頭中的偏移
IDataStreamHeader ParseMessageHeader(byte[] data ,int offset) ; //解析消息頭
LengthTypeInHeader LengthTypeInHeader{get ;}
byte[] GetRespondWhenFailure(byte[] reqData ,ServiceFailureType failType) ; //根據服務失敗類型獲取失敗回復消息
byte[] GetRespondWhenFailure(byte[] reqData ,string errorMsg) ;
}
/// <summary>
/// StringEncoder 限定字符串編碼格式
/// </summary>
public interface IStringEncoder
{
string GetStrFromStream(byte[] stream ,int offset ,int len) ;
byte[] GetBytesFromStr(string ss) ;
}
/// <summary>
/// ServiceFailureType 服務失敗類型
/// </summary>
public enum ServiceFailureType
{
InvalidMessge ,ParseFailure ,HandleFailure ,ServiceStopped ,ServiceIsNotExit ,ServerIsBusy
}
IDataStreamHeader即是我們所說的消息的“元數據”,如其名所示,它也是消息的“消息頭”。請讓我補充說明一下,依照我的經驗,消息由消息頭Header和消息主體Body組成,消息頭用于存放消息的“元數據”等信息,而消息主體用于存放與特定請求相關的數據。消息頭的長度固定,比如都是64字節或都是128字節。請求消息和回復消息公用相同格式的消息頭。我們來看看消息頭接口IDataStreamHeader的定義:
public interface IDataStreamHeader
{
int MessageLength {get ;set ;} //本消息長度
int TypeKey {get ;set ;} //請求的目錄類型
int ServiceKey {get ;set ;} //請求類型
int ServiceItemIndex{get ;set ;} //請求細分索引
int RandomNum {get ;set ;} //用于將回復與請求一一對應起來
int Result {get ;set ;} //服務結果
string UserID {get ;set ;} //發出請求的用戶編號
byte[] ToDataStream() ; //將消息頭轉化為流,流的長度位消息頭的長度
void ToDataStream(byte[] buff ,int offset);
}
需要解釋一下TypeKey、ServiceKey、ServiceItemIndex,我們實際上將服務類型分為三級,可以舉個不太恰當的例子讓大家有個感性的熟悉。比如,生活中的衣、食、住、行可以作為不同的TypeKey,而“衣”中的春裝、冬裝可作為ServiceKey,而“春裝”中的T恤、夾克可作為ServiceItemIndex。對于服務的類型,你可以根據自己的意愿分成任意層級,但據我的經驗,通常情況下,三層已經夠用了。 (2)消息分裂器
前面已經多次提到消息分裂器MessageSplitter,它用于將接收緩沖區中的數據分裂成一個個完整的消息,并且把余下的非完整數據返回,其接口定義如下:public interface IMessageSplitter
{
void Initialize(int maxBuffSize ,int headerLen ,int offSetLenField ,LengthTypeInHeader lenType) ;
ArrayList SplitRequestMsgs(byte[] buff ,int validCount , out byte[] leftData) ;//ArrayList 中每條記錄都是是byte[],表示一個完整的請求
}
//消息頭中的長度是body長度還是總長度
public enum LengthTypeInHeader
{
TotalLen ,BodyLen
}
其中,Initialize方法中的參數都可以由IDataStreamHeader提供。leftData是余下的非完整消息的數據。SplitRequestMsgs方法返回的集合中是一條條完整的請求消息。
(3)消息處理器工廠
消息處理器工廠根據消息的類型(TypeKey、ServiceKey)創建對應的消息處理器來出來該消息,其接口定義如下: public interface IRequestDealerFactory
{
IRequestDealer CreateDealer(int requestType ,int serverTypeKey) ;//serverTypeKey 比如城市代號
event CbackRequestRecieved RequestRecieved ;
}
CreateDealer方法返回的IRequestDealer就是消息處理器,每一個消息處理器用于處理某種特定類型(ServiceKey)的所有請求。通常,可以將消息處理器封裝成插件DLL,以實現功能服務的“熱插拔”。 (4)消息處理器
消息處理器IRequestDealer定義如下: public interface IRequestDealer
{
byte[] DealRequestMessage(RoundedRequestMsg reqMsg ) ;//同步回復
event CbackRequestRecieved RequestRecieved ;
}
public delegate void CbackRequestRecieved(RoundedRequestMsg roundedMsg) ;
/// <summary>
/// RoundedRequestMsg 對應于一條完整的請求
/// </summary>
public strUCt RoundedRequestMsg
{
public int ConnectID ; //請求所對應的Tcp連接
public byte[] Data ;
}
RoundedRequestMsg.Data是經消息分裂器分裂得到的一個完整的請求消息,一個字節不多、一個字節也不少。
(5)ITcpStreamDispatcherHook
ITcpStreamDispatcherHook是一個Hook,它為用戶提供了一個自定義的對請求/回復消息進行操作的插入點。ITcpStreamDispatcherHook由TcpStreamDispatcher使用,用于對請求消息和回復消息進行截獲,然后處理或轉換這些消息,比如常用的處理/轉換操作包括:加密/解密、消息驗證等等。ITcpStreamDispatcherHook定義如下: /// <summary>
/// ITcpStreamDispatcherHook 由TcpStreamDispatcher使用,用于對請求消息和回復消息進行截獲,然后處理轉換這些消息,
/// 比如加密/解密。
/// </summary>
public interface ITcpStreamDispatcherHook
{
//轉換消息
byte[] CaptureRequestMsg(byte[] roundedMsg) ;
byte[] CaptureRespondMsg(byte[] roundedMsg) ;
//驗證消息,以下驗證的消息是還沒有被捕捉的消息
bool VerifyFirstMsgOfUser(byte[] roundedMsg ,ref RequestValidation validation) ;
bool VerifyOtherMessage(byte[] roundedMsg ,ref RequestValidation validation) ;
}
關于這個接口中各方法的含義可以在消息分派器的實現中更好的領會! 3.消息分派器實現
在前述的基本元素的基礎上,實現消息分派器非常簡單,我們來看其核心方法DealRequestMessage的實現源碼: private IMessageSplitter curMsgSplitter = new MessageSpliter() ;
private IDataStreamHelper curMsgHelper ; //必須設置
private IRequestDealerFactory curDealerFactory ; //必須設置
private ITcpStreamDispatcherHook tcpStreamDispatcherHook ;
public ArrayList DealRequestMessage(RequestData requestData, out byte[] leftData, ref RequestValidation validation)
{
//消息分裂
ArrayList respondList = new ArrayList() ;
ArrayList reqList = this.curMsgSplitter.SplitRequestMsgs(requestData.Buff ,requestData.ValidCount ,out leftData) ;
if(reqList == null)
{
return respondList ;
}
bool verified = true ;
for(int i=0; i<reqList.Count ;i++)
{
byte[] theData = (byte[])reqList[i] ;
#region 驗證消息
if(requestData.IsFirstMsg && (i == 0))
{
verified = this.tcpStreamDispatcherHook.VerifyFirstMsgOfUser(theData ,ref validation) ;
}
else
{
verified = this.tcpStreamDispatcherHook.VerifyOtherMessage(theData ,ref validation ) ;
}
if(! verified)
{
if(validation.gotoCloseConnection)
{
return null ;
}
this.AddRespondToList(respondList ,this.curMsgHelper.GetRespondWhenFailure(theData ,ServiceFailureType.InvalidMessge)) ;
continue ;
}
#endregion
//接插,捕捉/轉換請求消息
byte[] reqData = this.tcpStreamDispatcherHook.CaptureRequestMsg(theData) ;
#region 處理消息
//處理消息
IDataStreamHeader header = this.curMsgHelper.ParseMessageHeader(reqData ,0);
IRequestDealer dealer = this.curDealerFactory.CreateDealer(header.ServiceKey ,header.TypeKey) ;
if(dealer == null)
{
this.AddRespondToList(respondList ,this.curMsgHelper.GetRespondWhenFailure(reqData ,ServiceFailureType.ServiceIsNotExit)) ;
continue ;
}
RoundedRequestMsg roundReqMsg = new RoundedRequestMsg();
roundReqMsg.ConnectID = requestData.ConnectID ;
roundReqMsg.Data = reqData ;
try
{
byte[] respondData = dealer.DealRequestMessage(roundReqMsg) ;
if(respondData != null)
{
this.AddRespondToList(respondList ,respondData) ;
}
}
catch(Exception ee)
{
this.AddRespondToList(respondList , this.curMsgHelper.GetRespondWhenFailure(reqData ,ee.Message)) ;
}
#endregion
}
return respondList;
}
//將回復消息加密后放入list
private void AddRespondToList(ArrayList list ,byte[] theRespondData)
{
//接插,捕捉/轉換回復消息
byte[] respondData = this.tcpStreamDispatcherHook.CaptureRespondMsg(theRespondData) ;
list.Add(respondData) ;
}
假如你是一直按順序讀下來的,理解上面的實現一定不成什么問題。到這里,Tcp通信層的所有重要的設施基本都已介紹完畢,最后,給出了提示,即,在你的應用中,如何使用這個可復用的Tcp通信層。步驟如下:
(1)實現IDataStreamHelper接口。
(2)實現IReqestStreamDispatcher接口,假如采用的是Tcp協議,則可直接使用參考實現TcpStreamDispatcher
(3)實現各種請求處理器,這些處理器實現IRequestDealer接口。
(4)實現IRequestDealerFactory接口。 接下來,還有什么?其實,還有很多,都可以提高到框架的層次,以便復用。比如,前面我們處理消息都是基于流(byte[])的形式,在此基礎上,我們可以更上一層,采用基于對象的形式――即,將請求消息和回復消息都封裝成類,這就涉及了流的解析(流=>對象)和對象序列化(消息對象=>流)問題;另外,我們甚至可以將Tcp用戶治理納入到框架的高度,以進行復用,比如,通常基于Tcp服務的系統都需要治理在線的Tcp用戶,并記錄Tcp用戶請求服務的具體信息、在線時間等,這些經過良好的分析概括都可以提高到復用的高度。以后有時間,我會將這樣的經驗和大家分享。
最后,把EnterpriseServerBase類庫中的Network命名空間中的源碼和大家共享,希望對大家有所幫助!(另,該命名空間中已經包含了上述的基于對象的消息和Tcp用戶治理的可復用組件)。