在Thread和Process中,應當優選Process,因為Process更穩定,而且,Process可以分布到多臺機器上,而Thread最多只能分布到同一臺機器的多個CPU上。
Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。由于managers模塊封裝很好,不必了解網絡通信的細節,就可以很容易地編寫分布式多進程程序。
舉個例子:如果我們已經有一個通過Queue通信的多進程程序在同一臺機器上運行,現在,由于處理任務的進程任務繁重,希望把發送任務的進程和處理任務的進程分布到兩臺機器上。怎么用分布式進程實現?
原有的Queue可以繼續使用,但是,通過managers模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了。
我們先看服務進程,服務進程負責啟動Queue,把Queue注冊到網絡上,然后往Queue里面寫入任務:
# task_master.py import random, time, queuefrom multiprocessing.managers import BaseManager# 發送任務的隊列:task_queue = queue.Queue()# 接收結果的隊列:result_queue = queue.Queue()# 從BaseManager繼承的QueueManager:class QueueManager(BaseManager): pass# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:QueueManager.register('get_task_queue', callable=lambda: task_queue)QueueManager.register('get_result_queue', callable=lambda: result_queue)# 綁定端口5000, 設置驗證碼'abc':manager = QueueManager(address=('', 5000), authkey=b'abc')# 啟動Queue:manager.start()# 獲得通過網絡訪問的Queue對象:task = manager.get_task_queue()result = manager.get_result_queue()# 放幾個任務進去:for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n)# 從result隊列讀取結果:print('Try get results...')for i in range(10): r = result.get(timeout=10) print('Result: %s' % r)# 關閉:manager.shutdown()請注意,當我們在一臺機器上寫多進程程序時,創建的Queue可以直接拿來用,但是,在分布式多進程環境下,添加任務到Queue不可以直接對原始的task_queue進行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue接口添加。
然后,在另一臺機器上啟動任務進程(本機上啟動也可以):
# task_master.pyimport random, time, queuefrom multiprocessing.managers import BaseManager# 發送任務的隊列:task_queue = queue.Queue()# 接收結果的隊列:result_queue = queue.Queue()# 從BaseManager繼承的QueueManager:class QueueManager(BaseManager): pass# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:QueueManager.register('get_task_queue', callable=lambda: task_queue)QueueManager.register('get_result_queue', callable=lambda: result_queue)# 綁定端口5000, 設置驗證碼'abc':manager = QueueManager(address=('', 5000), authkey=b'abc')# 啟動Queue:manager.start()# 獲得通過網絡訪問的Queue對象:task = manager.get_task_queue()result = manager.get_result_queue()# 放幾個任務進去:for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n)# 從result隊列讀取結果:print('Try get results...')for i in range(10): r = result.get(timeout=10) print('Result: %s' % r)# 關閉:manager.shutdown()任務進程要通過網絡連接到服務進程,所以要指定服務進程的IP。
現在,可以試試分布式進程的工作效果了。先啟動task_master.py服務進程:
$ python3 task_master.py Put task 3411...Put task 1605...Put task 1398...Put task 4729...Put task 5300...Put task 7471...Put task 68...Put task 4219...Put task 339...Put task 7866...Try get results...
task_master.py進程發送完任務后,開始等待result隊列的結果。現在啟動task_worker.py進程:
$ python3 task_worker.pyConnect to server 127.0.0.1...run task 3411 * 3411...run task 1605 * 1605...run task 1398 * 1398...run task 4729 * 4729...run task 5300 * 5300...run task 7471 * 7471...run task 68 * 68...run task 4219 * 4219...run task 339 * 339...run task 7866 * 7866...worker exit.
task_worker.py進程結束,在task_master.py進程中會繼續打印出結果:
Result: 3411 * 3411 = 11634921Result: 1605 * 1605 = 2576025Result: 1398 * 1398 = 1954404Result: 4729 * 4729 = 22363441Result: 5300 * 5300 = 28090000Result: 7471 * 7471 = 55815841Result: 68 * 68 = 4624Result: 4219 * 4219 = 17799961Result: 339 * 339 = 114921Result: 7866 * 7866 = 61873956
這個簡單的Master/Worker模型有什么用?其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾臺甚至幾十臺機器上,比如把計算n*n的代碼換成發送郵件,就實現了郵件隊列的異步發送。
而Queue之所以能通過網絡訪問,就是通過QueueManager實現的。由于QueueManager管理的不止一個Queue,所以,要給每個Queue的網絡調用接口起個名字,比如get_task_queue。
authkey有什么用?這是為了保證兩臺機器正常通信,不被其他機器惡意干擾。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定連接不上。
Python的分布式進程接口簡單,封裝良好,適合需要把繁重任務分布到多臺機器的環境下。
注意Queue的作用是用來傳遞任務和接收結果,每個任務的描述數據量要盡量小。比如發送一個處理日志文件的任務,就不要發送幾百兆的日志文件本身,而是發送日志文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。
以上就是本篇文章所講述的所有內容,這篇文章主要介紹了python分布式進程的相關知識,希望你能借助資料從而理解上述所說的內容。希望我在這片文章所講述的內容能夠對你有所幫助,讓你學習python更加輕松。
新聞熱點
疑難解答