国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 學(xué)院 > 開發(fā)設(shè)計(jì) > 正文

SignalR循序漸進(jìn)(三)簡(jiǎn)易的集群通訊組件

2019-11-17 03:05:01
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

SignalR循序漸進(jìn)(三)簡(jiǎn)易的集群通訊組件

上一篇演示了泛型Hub的實(shí)現(xiàn),微軟于6月17日更新了SignalR 2.1.0,然后自帶了泛型Hub,于是就不需要自己去實(shí)現(xiàn)了…(微軟你為啥不早一個(gè)月自帶啊…)。不過(guò)沒(méi)關(guān)系,SignalR出彩之處不在泛型Hub,本篇為各位觀眾帶來(lái)了基于SignalR的簡(jiǎn)易集群通訊組件Demo,可用于分布式定時(shí)任務(wù)。

說(shuō)到集群,自然想到了NLB啊Cluster啊HPC啊等等。NLB受制于成員數(shù)量,Cluster用數(shù)量堆高可用性,HPC太復(fù)雜。本著SignalR的雙向異步通訊的特點(diǎn),其實(shí)是可以用來(lái)玩彈性計(jì)算的。初始狀態(tài)由一臺(tái)計(jì)算任務(wù)分發(fā)節(jié)點(diǎn),一臺(tái)監(jiān)控以及一臺(tái)計(jì)算節(jié)點(diǎn)構(gòu)成。隨著任務(wù)分發(fā)隊(duì)列中的任務(wù)數(shù)越來(lái)越多,一臺(tái)執(zhí)行節(jié)點(diǎn)無(wú)法及時(shí)消耗待執(zhí)行任務(wù),達(dá)到某個(gè)閾值的時(shí)候,動(dòng)態(tài)的加入一個(gè)計(jì)算節(jié)點(diǎn)來(lái)增加計(jì)算吞吐量。同樣的,當(dāng)隊(duì)列中的任務(wù)基本處于很低的數(shù)量的時(shí)候,自動(dòng)移除一個(gè)計(jì)算節(jié)點(diǎn)來(lái)減少資源消耗。當(dāng)然,如果是大型的計(jì)算量之下,分發(fā)節(jié)點(diǎn),隊(duì)列都應(yīng)該是集群的,還要考慮各種計(jì)算節(jié)點(diǎn)故障之類的問(wèn)題,這不在本篇考慮的范疇內(nèi),本篇以初始狀態(tài)模型來(lái)一步步實(shí)現(xiàn)簡(jiǎn)易集群通訊組件。

好,廢話不說(shuō)了,正篇開始。

任務(wù)分發(fā)節(jié)點(diǎn)

image

任務(wù)分發(fā)節(jié)點(diǎn)只有一個(gè)公開的行為,就是接受計(jì)算節(jié)點(diǎn)任務(wù)執(zhí)行完成的消息。

下面是實(shí)現(xiàn)。

/// <summary>    /// 集群交換器    /// </summary>    public class ClusterHub : Hub<IClusterClient>    {        /// <summary>        ///         /// </summary>        static ClusterHub()        {            aliveDictionary = new ConcurrentDictionary<string, Guid>();        }                /// <summary>        ///         /// </summary>        /// <param name="dispatcher"></param>        public ClusterHub(IDispatcher dispatcher)        {            this.dispatcher = dispatcher;            db = OdbFactory.Open(localDbFileName);        }        /// <summary>        /// 本地數(shù)據(jù)庫(kù)文件名        /// </summary>        const string localDbFileName = "ClusterStorage.dll";        /// <summary>        /// 監(jiān)視器連接Id        /// </summary>        static string monitorConnectionId;        /// <summary>        /// 調(diào)度器        /// </summary>        IDispatcher dispatcher;        /// <summary>        /// 在線詞典        /// </summary>        static ConcurrentDictionary<string, Guid> aliveDictionary;        /// <summary>        ///         /// </summary>        static IOdb db;        /// <summary>        /// 完成任務(wù)        /// </summary>        /// <param name="jobResult"></param>        public void Finished(Contracts.Messages.JobResultDto jobResult)        {            lock (db)            {                var members = db.AsQueryable<MemberDo>();                var member = members.SingleOrDefault(m => m.Id == Guid.Parse(jobResult.Id));                if (member != null)                {                    member.UpdateStatisticsInfo(jobResult.PRocessedTime);                    db.Store(member);                    if (!string.IsNullOrWhiteSpace(monitorConnectionId))                    {                        Clients.Client(monitorConnectionId).UpdateMemberStatisticsInfo(new Contracts.Messages.MemberStatisticsInfoDto() { Id = member.Id.ToString(), AverageProcessedTime = member.AverageProcessedTime });                    }                }            }            Clients.Caller.RunJob(dispatcher.GetJobId());        }        /// <summary>        /// 加入        /// </summary>        void Join()        {            object ip = string.Empty;            var isMonitor = Context.Request.QueryString["ClientRole"] == "Monitor";            Context.Request.Environment.TryGetValue("server.RemoteIpAddress", out ip);            lock (db)            {                var members = db.AsQueryable<MemberDo>();                var member = members.SingleOrDefault(m => m.Ip == ip.ToString() && m.IsMonitor == isMonitor);                if (member != null)                {                    member.MemberStatusType = MemberStatusTypeEnum.Connectioned;                }                else                {                    member = new MemberDo(ip.ToString(), isMonitor);                    if (isMonitor)                    {                        monitorConnectionId = Context.ConnectionId;                    }                }                db.Store(member);                aliveDictionary.TryAdd(Context.ConnectionId, member.Id);                if (!isMonitor)                {                    if (!string.IsNullOrWhiteSpace(monitorConnectionId))                    {                        Clients.Client(monitorConnectionId).MemberJoin(member.Id);                    }                    Clients.Caller.GetId(member.Id.ToString());                    Clients.Caller.RunJob(dispatcher.GetJobId());                }            }        }        /// <summary>        /// 離開        /// </summary>        void Leave()        {            var id = Guid.Empty;            aliveDictionary.TryRemove(Context.ConnectionId, out id);            lock (db)            {                var members = db.AsQueryable<MemberDo>();                var member = members.SingleOrDefault(m => m.Id == id);                if (member != null)                {                    member.MemberStatusType = MemberStatusTypeEnum.Disconnectioned;                    db.Store(member);                    if (member.IsMonitor)                    {                        monitorConnectionId = string.Empty;                    }                    else if (!string.IsNullOrWhiteSpace(monitorConnectionId))                    {                        Clients.Client(monitorConnectionId).MemberLeave(id);                    }                }            }        }        public override Task OnConnected()        {            Console.WriteLine(Context.ConnectionId+":Connected");            Join();            return base.OnConnected();        }        public override Task OnDisconnected()        {            Console.WriteLine(Context.ConnectionId + ":Disconnected");            Leave();            return base.OnDisconnected();        }        public override Task OnReconnected()        {            Console.WriteLine(Context.ConnectionId + ":Reconnected");            return base.OnReconnected();        }    }

ClusterHub承載著2種客戶端角色的交互,計(jì)算節(jié)點(diǎn)和監(jiān)控。

這邊采用了一個(gè)輕量級(jí)的基于C#開發(fā)的無(wú)引擎對(duì)象數(shù)據(jù)庫(kù)來(lái)存儲(chǔ)客戶端信息。

先說(shuō)重載的部分:

OnConnected - 當(dāng)有客戶端連接的時(shí)候,執(zhí)行Join方法。

OnDisconnected - 當(dāng)有客戶端離線的時(shí)候,執(zhí)行Leave方法。

然后是私有方法:

Join - 根據(jù)QueryString來(lái)區(qū)分客戶端類型是計(jì)算節(jié)點(diǎn)還是監(jiān)視器,如果是計(jì)算節(jié)點(diǎn),就直接通知監(jiān)視器有成員加入,然后通過(guò)IDispatcher來(lái)獲取任務(wù)Id,通知計(jì)算節(jié)點(diǎn)開始執(zhí)行任務(wù)。

Leave - 計(jì)算節(jié)點(diǎn)離線的時(shí)候通知監(jiān)視器。

公開方法:

Finished - 計(jì)算節(jié)點(diǎn)完成任務(wù)后就調(diào)用該方法,Hub將計(jì)算的一些統(tǒng)計(jì)信息更新到本地存儲(chǔ),同時(shí)通知監(jiān)視器更新計(jì)算結(jié)果。

私有變量:

IDispatcher– 任務(wù)調(diào)度器接口,由外部組件來(lái)負(fù)責(zé)具體的實(shí)現(xiàn)。

計(jì)算節(jié)點(diǎn)

image

計(jì)算節(jié)點(diǎn)有兩個(gè)行為:

GetId - 獲取節(jié)點(diǎn)身份。

RunJob - 執(zhí)行任務(wù)。

/// <summary>    /// 集群客戶端    /// </summary>    public class ClusterClient    {        /// <summary>        ///         /// </summary>        /// <param name="jobProvider"></param>        public ClusterClient(IJobProvider jobProvider)        {            this.jobProvider = jobProvider;            url = ConfigurationManager.AppSettings["HubAddress"];            var queryStrings = new Dictionary<string, string>();            queryStrings.Add("ClientRole", "Normal");            connection = new HubConnection(url, queryStrings);            hubProxy = connection.CreateHubProxy(typeof(IClusterHub).GetCustomAttributes(typeof(DescriptionAttribute), false).OfType<DescriptionAttribute>().First().Description);            InitClientEvents();            connection.Start().Wait();        }        string url;        HubConnection connection;        IHubProxy hubProxy;        IJobProvider jobProvider;        string id;        /// <summary>        ///         /// </summary>        void InitClientEvents()        {            hubProxy.On("GetId", (id) => GetId(id));            hubProxy.On("RunJob", (jobId) => Ru
發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 文昌市| 武强县| 长沙县| 固始县| 嘉义县| 深州市| 新沂市| 甘谷县| 白玉县| 榆中县| 新化县| 贵州省| 若羌县| 台南县| 淅川县| 灵山县| 铁岭市| 富蕴县| 西贡区| 天津市| 东城区| 大方县| 东源县| 松桃| 翁牛特旗| 兴仁县| 新乡县| 宝坻区| 元江| 威远县| 永胜县| 蓝田县| 隆德县| 紫阳县| 武鸣县| 乌拉特后旗| 永川市| 商都县| 宁德市| 绩溪县| 溧水县|