国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

在Python程序中實現分布式進程的教程

2019-11-25 17:37:07
字體:
來源:轉載
供稿:網友

在Thread和Process中,應當優(yōu)選Process,因為Process更穩(wěn)定,而且,Process可以分布到多臺機器上,而Thread最多只能分布到同一臺機器的多個CPU上。

Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個進程中,依靠網絡通信。由于managers模塊封裝很好,不必了解網絡通信的細節(jié),就可以很容易地編寫分布式多進程程序。

舉個例子:如果我們已經有一個通過Queue通信的多進程程序在同一臺機器上運行,現在,由于處理任務的進程任務繁重,希望把發(fā)送任務的進程和處理任務的進程分布到兩臺機器上。怎么用分布式進程實現?

原有的Queue可以繼續(xù)使用,但是,通過managers模塊把Queue通過網絡暴露出去,就可以讓其他機器的進程訪問Queue了。

我們先看服務進程,服務進程負責啟動Queue,把Queue注冊到網絡上,然后往Queue里面寫入任務:

# taskmanager.pyimport random, time, Queuefrom multiprocessing.managers import BaseManager# 發(fā)送任務的隊列:task_queue = Queue.Queue()# 接收結果的隊列:result_queue = Queue.Queue()# 從BaseManager繼承的QueueManager:class QueueManager(BaseManager):  pass# 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:QueueManager.register('get_task_queue', callable=lambda: task_queue)QueueManager.register('get_result_queue', callable=lambda: result_queue)# 綁定端口5000, 設置驗證碼'abc':manager = QueueManager(address=('', 5000), authkey='abc')# 啟動Queue:manager.start()# 獲得通過網絡訪問的Queue對象:task = manager.get_task_queue()result = manager.get_result_queue()# 放幾個任務進去:for i in range(10):  n = random.randint(0, 10000)  print('Put task %d...' % n)  task.put(n)# 從result隊列讀取結果:print('Try get results...')for i in range(10):  r = result.get(timeout=10)  print('Result: %s' % r)# 關閉:manager.shutdown()

請注意,當我們在一臺機器上寫多進程程序時,創(chuàng)建的Queue可以直接拿來用,但是,在分布式多進程環(huán)境下,添加任務到Queue不可以直接對原始的task_queue進行操作,那樣就繞過了QueueManager的封裝,必須通過manager.get_task_queue()獲得的Queue接口添加。

然后,在另一臺機器上啟動任務進程(本機上啟動也可以):

# taskworker.pyimport time, sys, Queuefrom multiprocessing.managers import BaseManager# 創(chuàng)建類似的QueueManager:class QueueManager(BaseManager):  pass# 由于這個QueueManager只從網絡上獲取Queue,所以注冊時只提供名字:QueueManager.register('get_task_queue')QueueManager.register('get_result_queue')# 連接到服務器,也就是運行taskmanager.py的機器:server_addr = '127.0.0.1'print('Connect to server %s...' % server_addr)# 端口和驗證碼注意保持與taskmanager.py設置的完全一致:m = QueueManager(address=(server_addr, 5000), authkey='abc')# 從網絡連接:m.connect()# 獲取Queue的對象:task = m.get_task_queue()result = m.get_result_queue()# 從task隊列取任務,并把結果寫入result隊列:for i in range(10):  try:    n = task.get(timeout=1)    print('run task %d * %d...' % (n, n))    r = '%d * %d = %d' % (n, n, n*n)    time.sleep(1)    result.put(r)  except Queue.Empty:    print('task queue is empty.')# 處理結束:print('worker exit.')

任務進程要通過網絡連接到服務進程,所以要指定服務進程的IP。

現在,可以試試分布式進程的工作效果了。先啟動taskmanager.py服務進程:

$ python taskmanager.py Put task 3411...Put task 1605...Put task 1398...Put task 4729...Put task 5300...Put task 7471...Put task 68...Put task 4219...Put task 339...Put task 7866...Try get results...

taskmanager進程發(fā)送完任務后,開始等待result隊列的結果。現在啟動taskworker.py進程:

$ python taskworker.py 127.0.0.1Connect to server 127.0.0.1...run task 3411 * 3411...run task 1605 * 1605...run task 1398 * 1398...run task 4729 * 4729...run task 5300 * 5300...run task 7471 * 7471...run task 68 * 68...run task 4219 * 4219...run task 339 * 339...run task 7866 * 7866...worker exit.

taskworker進程結束,在taskmanager進程中會繼續(xù)打印出結果:

Result: 3411 * 3411 = 11634921Result: 1605 * 1605 = 2576025Result: 1398 * 1398 = 1954404Result: 4729 * 4729 = 22363441Result: 5300 * 5300 = 28090000Result: 7471 * 7471 = 55815841Result: 68 * 68 = 4624Result: 4219 * 4219 = 17799961Result: 339 * 339 = 114921Result: 7866 * 7866 = 61873956

這個簡單的Manager/Worker模型有什么用?其實這就是一個簡單但真正的分布式計算,把代碼稍加改造,啟動多個worker,就可以把任務分布到幾臺甚至幾十臺機器上,比如把計算n*n的代碼換成發(fā)送郵件,就實現了郵件隊列的異步發(fā)送。

Queue對象存儲在哪?注意到taskworker.py中根本沒有創(chuàng)建Queue的代碼,所以,Queue對象存儲在taskmanager.py進程中:

2015428155738775.png (594×366)

而Queue之所以能通過網絡訪問,就是通過QueueManager實現的。由于QueueManager管理的不止一個Queue,所以,要給每個Queue的網絡調用接口起個名字,比如get_task_queue。

authkey有什么用?這是為了保證兩臺機器正常通信,不被其他機器惡意干擾。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定連接不上。
小結

Python的分布式進程接口簡單,封裝良好,適合需要把繁重任務分布到多臺機器的環(huán)境下。

注意Queue的作用是用來傳遞任務和接收結果,每個任務的描述數據量要盡量小。比如發(fā)送一個處理日志文件的任務,就不要發(fā)送幾百兆的日志文件本身,而是發(fā)送日志文件存放的完整路徑,由Worker進程再去共享的磁盤上讀取文件。

發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 宜都市| 永清县| 曲麻莱县| 金湖县| 湖北省| 宁都县| 浮梁县| 赤水市| 龙岩市| 普宁市| 怀安县| 九江县| 邵武市| 湄潭县| 宽甸| 榆中县| 恩施市| 红安县| 鹤峰县| 晋州市| 永安市| 蓬安县| 尖扎县| 铅山县| 咸阳市| 林西县| 上虞市| 海原县| 德州市| 南投市| 徐水县| 广河县| 海口市| 五台县| 合作市| 石楼县| 长垣县| 长垣县| 怀化市| 普兰店市| 康定县|