“消息隊列”是在消息的傳輸過程中保存消息的容器。消息隊列管理器在將消息從它的源中繼到它的目標時充當中間人。隊列的主要目的是提供路由并保證消息的傳遞;如果發送消息時接收者不可用,消息隊列會保留消息,直到可以成功地傳遞它。相信對任何架構或應用來說,消息隊列都是一個至關重要的組件,下面是十個理由:
Python的消息隊列示例:
1.threading+Queue實現線程隊列
#!/usr/bin/env python import Queueimport threadingimport time queue = Queue.Queue() class ThreadNum(threading.Thread): """沒打印一個數字等待1秒,并發打印10個數字需要多少秒?""" def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): whileTrue: #消費者端,從隊列中獲取num num = self.queue.get() print"i'm num %s"%(num) time.sleep(1) #在完成這項工作之后,使用 queue.task_done() 函數向任務已經完成的隊列發送一個信號 self.queue.task_done() start = time.time()def main(): #產生一個 threads pool, 并把消息傳遞給thread函數進行處理,這里開啟10個并發 for i in range(10): t = ThreadNum(queue) t.setDaemon(True) t.start() #往隊列中填錯數據 for num in range(10): queue.put(num) #wait on the queue until everything has been processed queue.join() main()print"Elapsed Time: %s" % (time.time() - start)
運行結果:
i'm num 0i'm num 1i'm num 2i'm num 3i'm num 4i'm num 5i'm num 6i'm num 7i'm num 8i'm num 9Elapsed Time: 1.01399993896
解讀:
具體工作步驟描述如下:
1,創建一個 Queue.Queue() 的實例,然后使用數據對它進行填充。
2,將經過填充數據的實例傳遞給線程類,后者是通過繼承 threading.Thread 的方式創建的。
3,生成守護線程池。
4,每次從隊列中取出一個項目,并使用該線程中的數據和 run 方法以執行相應的工作。
5,在完成這項工作之后,使用 queue.task_done() 函數向任務已經完成的隊列發送一個信號。
6,對隊列執行 join 操作,實際上意味著等到隊列為空,再退出主程序。
在使用這個模式時需要注意一點:通過將守護線程設置為 true,程序運行完自動退出。好處是在退出之前,可以對隊列執行 join 操作、或者等到隊列為空。
2.多個隊列
所謂多個隊列,一個隊列的輸出可以作為另一個隊列的輸入!
#!/usr/bin/env pythonimport Queueimport threadingimport time queue = Queue.Queue()out_queue = Queue.Queue() class ThreadNum(threading.Thread): """bkeep""" def __init__(self, queue, out_queue): threading.Thread.__init__(self) self.queue = queue self.out_queue = out_queue def run(self): whileTrue: #從隊列中取消息 num = self.queue.get() bkeep = num #將bkeep放入隊列中 self.out_queue.put(bkeep) #signals to queue job is done self.queue.task_done() class PrintLove(threading.Thread): """Threaded Url Grab""" def __init__(self, out_queue): threading.Thread.__init__(self) self.out_queue = out_queue def run(self): whileTrue: #從隊列中獲取消息并賦值給bkeep bkeep = self.out_queue.get() keke = "I love " + str(bkeep) print keke, print self.getName() time.sleep(1) #signals to queue job is done self.out_queue.task_done() start = time.time()def main(): #populate queue with data for num in range(10): queue.put(num) #spawn a pool of threads, and pass them queue instance for i in range(5): t = ThreadNum(queue, out_queue) t.setDaemon(True) t.start() for i in range(5): pl = PrintLove(out_queue) pl.setDaemon(True) pl.start() #wait on the queue until everything has been processed queue.join() out_queue.join() main()print"Elapsed Time: %s" % (time.time() - start)
運行結果:
I love 0 Thread-6I love 1 Thread-7I love 2 Thread-8I love 3 Thread-9I love 4 Thread-10I love 5 Thread-7I love 6 Thread-6I love 7 Thread-9I love 8 Thread-8I love 9 Thread-10Elapsed Time: 2.00300002098
解讀:
ThreadNum 類工作流程
定義隊列--->繼承threading---->初始化queue---->定義run函數--->get queue中的數據---->處理數據---->put數據到另外一個queue-->發信號告訴queue該條處理完畢
main函數工作流程:
--->往自定義queue中扔數據
--->for循環確定啟動的線程數---->實例化ThreadNum類---->啟動線程并設置守護
--->for循環確定啟動的線程數---->實例化PrintLove類--->啟動線程并設置為守護
--->等待queue中的消息處理完畢后執行join。即退出主程序。
了解了MQ的大概實現以后,我們來總結一下消息隊列的優點:
1. 解耦
在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息隊列在處理過程中間插入了一個隱含的、基于數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2. 冗余
有時在處理數據的時候處理過程會失敗。除非數據被持久化,否則將永遠丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。在被許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理過程明確的指出該消息已經被處理完畢,確保你的數據被安全的保存直到你使用完畢。
3. 擴展性
因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的;只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。
4. 靈活性 & 峰值處理能力
當你的應用上了Hacker News的首頁,你將發現訪問流量攀升到一個不同尋常的水平。在訪問量劇增的情況下,你的應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見;如果為 以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住增長的訪問壓力,而不是因為超出負荷的請求而完全崩潰。 請查看我們關于峰值處理能力的博客文章了解更多此方面的信息。
5. 可恢復性
當體系的一部分組件失效,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。而這種允許重試或者延后處理請求的能力通常是造就一個略感不便的用戶和一個沮喪透頂的用戶之間的區別。
6. 送達保證
消息隊列提供的冗余機制保證了消息能被實際的處理,只要一個進程讀取了該隊列即可。在此基礎上,IronMQ提供了一個"只送達一次"保證。無論有多少進 程在從隊列中領取數據,每一個消息只能被處理一次。這之所以成為可能,是因為獲取一個消息只是"預定"了這個消息,暫時把它移出了隊列。除非客戶端明確的 表示已經處理完了這個消息,否則這個消息會被放回隊列中去,在一段可配置的時間之后可再次被處理。
7.排序保證
在許多情況下,數據處理的順序都很重要。消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。IronMO保證消息漿糊通過FIFO(先進先出)的順序來處理,因此消息在隊列中的位置就是從隊列中檢索他們的位置。
8.緩沖
在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務最高效率的執行--寫入隊列的處理會盡可能的快速,而不受從隊列讀的預備處理的約束。該緩沖有助于控制和優化數據流經過系統的速度。
9. 理解數據流
在一個分布式系統里,要得到一個關于用戶操作會用多長時間及其原因的總體印象,是個巨大的挑戰。消息系列通過消息被處理的頻率,來方便的輔助確定那些表現不佳的處理過程或領域,這些地方的數據流都不夠優化。
10. 異步通信
很多時候,你不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許你把一個消息放入隊列,但并不立即處理它。你想向隊列中放入多少消息就放多少,然后在你樂意的時候再去處理它們。
|
新聞熱點
疑難解答
圖片精選