C#實現(xiàn)異步消息隊列
消息隊列
消息隊列(英語:Messagequeue)是一種進程間通信或同一進程的不同線程間的通信方式,軟件的貯列用來處理一系列的輸入,通常是來自使用者。消息隊列提供了異步的通信協(xié)議,每一個貯列中的紀錄包含詳細說明的資料,包含發(fā)生的時間,輸入裝置的種類,以及特定的輸入?yún)?shù),也就是說:消息的發(fā)送者和接收者不需要同時與消息隊列互交。消息會保存在隊列中,直到接收者取回它。
簡單的說隊列就是貯存了我們需要處理的Command但是并不是及時的拿到其處理結果;
實現(xiàn)
實際上,消息隊列常常保存在鏈表結構中。擁有權限的進程可以向消息隊列中寫入或讀取消息。
目前,有很多消息隊列有很多開源的實現(xiàn),包括JBossMessaging、JORAM、ApacheActiveMQ、SunOpenMessageQueue、ApacheQpid和HTTPSQS。
優(yōu)點,缺點
消息隊列本身是異步的,它允許接收者在消息發(fā)送很長時間后再取回消息,這和大多數(shù)通信協(xié)議是不同的。例如WWW中使用的HTTP協(xié)議是同步的,因為客戶端在發(fā)出請求后必須等待服務器回應。然而,很多情況下我們需要異步的通信協(xié)議。比如,一個進程通知另一個進程發(fā)生了一個事件,但不需要等待回應。但消息隊列的異步特點,也造成了一個缺點,就是接收者必須輪詢消息隊列,才能收到最近的消息。
和信號相比,消息隊列能夠傳遞更多的信息。與管道相比,消息隊列提供了有格式的數(shù)據(jù),這可以減少開發(fā)人員的工作量。但消息隊列仍然有大小限制。
讀取隊列消息
主要有兩種(1)服務端的推;(2)客戶端的拉;
拉:主要是客戶端定時輪詢拿走消息處理;
推:通過事件訂閱方式主動通知訂閱者進行處理;
消息的貯存
簡單的是通過內(nèi)存鏈表實現(xiàn)貯存;也可以借助DB,比如Redis;還可以持久到本地文件中;
如何保證異步處理的一致性
盡管隊列主要目的是實現(xiàn)消息貯存,同時將調(diào)用與實現(xiàn)異步化。但是如果想達到處理消息一致性,好的方式是區(qū)別業(yè)務處理順序,比如操作主從DB,主負責寫,從負責讀,我們沒有機會在寫之后立馬從讀數(shù)據(jù)庫拿到你想要的結果;同時我們需要借助中間狀態(tài),當多個中間狀態(tài)同時符合調(diào)用結果才到到業(yè)務時間被處理,否則將“異常消息”持久化,待下次操作;
上代碼
建立消息對立核心隊列
{
publicdelegatevoidMessageQueueEventNotifyHandler(Message.BaseMessagemessage);
publicclassMessageQueue:Queue<BaseMessage>
{
publicstaticMessageQueueGlobalQueue=newMessageQueue();
} } 可依據(jù)具體業(yè)務進行個性化處理; 通過Proxy向隊列追加消息 publicclassOrderServiceProxy:IOrderService { publicvoidSubmit(Message.BaseMessagemessage) { MessageQueue.MessageQueue.GlobalQueue.Enqueue(message); } } 客戶端調(diào)用 OrderServiceorderService=neWorderService(); MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent+=orderService.Submit; varorders=newList<Order>(){ newOrder(){OrderCode="P001"}, newOrder(){OrderCode="P002"}, newOrder(){OrderCode="B003"} }; OrderServiceProxyproxy=newOrderServiceProxy(); orders.ForEach(order=>proxy.Submit(newMessage.BaseMessage(){Body=order})); Console.ReadLine(); 這樣就滿足了事件的綁定與觸發(fā)個性化處理,同時達到了消息異步化的目的,希望更細致的拓展用到后期的項目中。
新聞熱點
疑難解答