FastSocket這個東西上次我已經(jīng)說過,它使用簡單,功能強大,擴展靈活,目前在新浪的生產(chǎn)環(huán)境中已經(jīng)被廣泛使用,所以它的性能,安全等各方面我們絕對可以信賴,今天我們來說一個話題,和上一講有關(guān),這次我們制作一個基于FastSocket的傳輸協(xié)議,它的意義重大,當fastSocket提供的協(xié)議不能滿足項目要求時,我們就必須硬著頭皮去自己寫了,還好,fastsocket為我們鋪好了路,我們只要按著這條路走下去,就可以了。
首先,如果要想擴展一個自己的協(xié)議,要對%20client和server端分別進行開發(fā),下面我們來看一下client的開發(fā)
/// <summary> /// 異步二進制協(xié)議 /// 協(xié)議格式 /// [Message Length(int32)][SeqID(int32)][ProjectID(int16)][Cmd Length(int16)][VersonNumber Length(int16)][Cmd + VersonNumber + Body Buffer] /// 其中參數(shù)TableName和VersonNumber長度為40,不夠自動在左側(cè)補空格 /// </summary> public sealed class DSSBinaryProtocol : iprotocol<DSSBinaryResponse> { #region IProtocol Members /// <summary> /// find response /// </summary> /// <param name="connection"></param> /// <param name="buffer"></param> /// <param name="readlength"></param> /// <returns></returns> /// <exception cref="BadProtocolException">bad async binary protocl</exception> public DSSBinaryResponse FindResponse(IConnection connection, ArraySegment<byte> buffer, out int readlength) { if (buffer.Count < 4) { readlength = 0; return null; } //獲取message length var messageLength = NetworkBitConverter.ToInt32(buffer.Array, buffer.Offset); if (messageLength < 7) throw new BadProtocolException("bad async binary protocl"); readlength = messageLength + 4; if (buffer.Count < readlength) { readlength = 0; return null; } var seqID = NetworkBitConverter.ToInt32(buffer.Array, buffer.Offset + 4); var projectID = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 8); var flagLength = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 10); var versonLength = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 12); var strName = Encoding.UTF8.GetString(buffer.Array, buffer.Offset + 14, flagLength); var versonNumber = Encoding.UTF8.GetString(buffer.Array, buffer.Offset + 14 + flagLength, versonLength); var dataLength = messageLength - 10 - flagLength - versonLength; byte[] data = null; if (dataLength > 0) { data = new byte[dataLength]; Buffer.BlockCopy(buffer.Array, buffer.Offset + 14 + flagLength + versonLength, data, 0, dataLength); } return new DSSBinaryResponse(seqID, projectID, strName, versonNumber, data); } #endregion }View Code
DSSBinaryResponse

/// <summary> /// 數(shù)據(jù)同步系統(tǒng)DSS使用的Socket協(xié)議,我們稱為DSSBinary協(xié)議 /// [Message Length(int32)][SeqID(int32)][ProjectID(int16)][Cmd Length(int16)][VersonNumber Length(int16)][Cmd + VersonNumber + Body Buffer] /// </summary> public class DSSBinaryResponse : IResponse { /// <summary> /// 流水ID /// </summary> public int SeqID { get; private set; } /// <summary> /// 項目類型編號 /// </summary> public short ProjectID { get; set; } /// <summary> /// 本次傳輸?shù)陌姹咎枺锌蛻舳宋ㄒ籟項目名稱(4字節(jié))+guid(36字節(jié))] /// </summary> public string VersonNumber { get; private set; } /// <summary> /// 命令名稱 /// </summary> public string Flag { get; private set; } /// <summary> /// 要操作的表對象,以字節(jié)數(shù)組形式進行傳輸 /// </summary> public readonly byte[] Buffer = null; public DSSBinaryResponse(int seqID, short projectID, string flag, string versonNumber, byte[] buffer) { this.SeqID = seqID; this.ProjectID = projectID; this.VersonNumber = versonNumber; this.Flag = flag; this.Buffer = buffer; } }View CodeDSSBinarySocketClient

/// <summary> /// 異步socket客戶端 /// </summary> public class DSSBinarySocketClient : PooledSocketClient<DSSBinaryResponse> { #region Constructors /// <summary> /// new /// </summary> public DSSBinarySocketClient() : base(new DSSBinaryProtocol()) { } /// <summary> /// new /// </summary> /// <param name="socketBufferSize"></param> /// <param name="messageBufferSize"></param> public DSSBinarySocketClient(int socketBufferSize, int messageBufferSize) : base(new DSSBinaryProtocol(), socketBufferSize, messageBufferSize, 3000, 3000) { } /// <summary> /// new /// </summary> /// <param name="socketBufferSize"></param> /// <param name="messageBufferSize"></param> /// <param name="millisecondsSendTimeout"></param> /// <param name="millisecondsReceiveTimeout"></param> public DSSBinarySocketClient(int socketBufferSize, int messageBufferSize, int millisecondsSendTimeout, int millisecondsReceiveTimeout) : base(new DSSBinaryProtocol(), socketBufferSize, messageBufferSize, millisecondsSendTimeout, millisecondsReceiveTimeout) { } #endregion #region Public Methods public Task<TResult> Send<TResult>(string cmdName, short projectID, string versonNumber, byte[] payload, Func<DSSBinaryResponse, TResult> funcResultFactory, object asyncState = null) { return this.Send(null, cmdName, projectID, versonNumber, payload, funcResultFactory, asyncState); } public Task<TResult> Send<TResult>(byte[] consistentKey, string cmdName, short projectID, string versonNumber, byte[] payload, Func<DSSBinaryResponse, TResult> funcResultFactory, object asyncState = null) { if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName"); if (funcResultFactory == null) throw new ArgumentNullException("funcResultFactory"); var seqID = base.NextRequestSeqID(); var cmdLength = cmdName.Length; var versonNumberLength = versonNumber.Length; var messageLength = (payload == null ? 0 : payload.Length) + cmdLength + versonNumberLength + 10; var sendBuffer = new byte[messageLength + 4];
新聞熱點
疑難解答