国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

Python利用multiprocessing實現最簡單的分布式作業調度系統實例

2020-01-04 16:23:27
字體:
來源:轉載
供稿:網友

介紹

Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個機器的多個進程中,依靠網絡通信。想到這,就在想是不是可以使用此模塊來實現一個簡單的作業調度系統。在這之前,我們先來詳細了解下python中的多進程管理包multiprocessing。

multiprocessing.Process

multiprocessing包是Python中的多進程管理包。它與 threading.Thread類似,可以利用multiprocessing.Process對象來創建一個進程。該進程可以允許放在Python程序內部編寫的函數中。該Process對象與Thread對象的用法相同,擁有is_alive()、join([timeout])、run()、start()、terminate()等方法。屬性有:authkey、daemon(要通過start()設置)、exitcode(進程在運行時為None、如果為–N,表示被信號N結束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類,用來同步進程,其用法也與threading包中的同名類一樣。multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。

這個模塊表示像線程一樣管理進程,這個是multiprocessing的核心,它與threading很相似,對多核CPU的利用率會比threading好的多。

看一下Process類的構造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

參數說明:

  • group:進程所屬組。基本不用
  • target:表示調用對象。
  • args:表示調用對象的位置參數元組。
  • name:別名
  • kwargs:表示調用對象的字典。

創建進程的簡單實例:

#coding=utf-8import multiprocessingdef do(n) : #獲取當前線程的名字 name = multiprocessing.current_process().name print name,'starting' print "worker ", n return if __name__ == '__main__' : numList = [] for i in xrange(5) : p = multiprocessing.Process(target=do, args=(i,)) numList.append(p) p.start() p.join() print "Process end."

執行結果:

Process-1 startingworker 0Process end.Process-2 startingworker 1Process end.Process-3 startingworker 2Process end.Process-4 startingworker 3Process end.Process-5 startingworker 4Process end.

創建子進程時,只需要傳入一個執行函數和函數的參數,創建一個Process實例,并用其start()方法啟動,join()方法表示等待子進程結束以后再繼續往下運行,通常用于進程間的同步。

注意:

在Windows上要想使用進程模塊,就必須把有關進程的代碼寫在當前.py文件的if __name__ == ‘__main__' :語句的下面,才能正常使用Windows下的進程模塊。Unix/Linux下則不需要。

multiprocess.Pool

當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時可以發揮進程池的功效。

Pool可以提供指定數量的進程供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,直到池中有進程結束,才會創建新的進程來它。

apply_async和apply

函數原型:

apply_async(func[, args=()[, kwds={}[, callback=None]]])

二者都是向進程池中添加新的進程,不同的時,apply每次添加新的進程時,主進程和新的進程會并行執行,但是主進程會阻塞,直到新進程的函數執行結束。 這是很低效的,所以python3.x之后不再使用

apply_async和apply功能相同,但是主進程不會阻塞。

# -*- coding:utf-8 -*-import multiprocessingimport timedef func(msg): print "*msg: ", msg time.sleep(3) print "*end"if __name__ == "__main__": # 維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 pool = multiprocessing.Pool(processes=3) for i in range(10): msg = "hello [{}]".format(i) # pool.apply(func, (msg,)) pool.apply_async(func, (msg,)) # 異步開啟進程, 非阻塞型, 能夠向池中添加進程而不等待其執行完畢就能再次執行循環 print "--" * 10 pool.close() # 關閉pool, 則不會有新的進程添加進去 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執行完畢 print "All process done."

運行結果:

"D:/Program Files/Anaconda2/python.exe" E:/pycharm/test/multiprocessing/v1.py--------------------*msg: hello [0]*msg: hello [1]*msg: hello [2]*end*msg: hello [3]*end*end*msg: hello [4]*msg: hello [5]*end*msg: hello [6]*end*end*msg: hello [7]*msg: hello [8]*end*msg: hello [9]*end*end*endAll process done.Process finished with exit code 0

獲得進程的執行結果

# -*- coding:utf-8 -*-import multiprocessingimport timedef func_with_return(msg): print "*msg: ", msg time.sleep(3) print "*end" return "{} return".format(msg)if __name__ == "__main__": # 維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 pool = multiprocessing.Pool(processes=3) results = [] for i in range(10): msg = "hello [{}]".format(i) res = pool.apply_async(func_with_return, (msg,)) # 異步開啟進程, 非阻塞型, 能夠向池中添加進程而不等待其執行完畢就能再次執行循環 results.append(res) print "--" * 10 pool.close() # 關閉pool, 則不會有新的進程添加進去 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執行完畢 print "All process done." print "Return results: " for i in results: print i.get() # 獲得進程的執行結果

結果:

"D:/Program Files/Anaconda2/python.exe" E:/pycharm/test/multiprocessing/v1.py--------------------*msg: hello [0]*msg: hello [1]*msg: hello [2]*end*end*msg: hello [3]*msg: hello [4]*end*msg: hello [5]*end*end*msg: hello [6]*msg: hello [7]*end*msg: hello [8]*end*end*msg: hello [9]*end*endAll process done.Return results: hello [0] returnhello [1] returnhello [2] returnhello [3] returnhello [4] returnhello [5] returnhello [6] returnhello [7] returnhello [8] returnhello [9] returnProcess finished with exit code 0

map

函數原型:

map(func, iterable[, chunksize=None])

Pool類中的map方法,與內置的map函數用法行為基本一致,它會使進程阻塞直到返回結果。

注意,雖然第二個參數是一個迭代器,但在實際使用中,必須在整個隊列都就緒后,程序才會運行子進程。

# -*- coding:utf-8 -*-import multiprocessingimport timedef func_with_return(msg): print "*msg: ", msg time.sleep(3) print "*end" return "{} return".format(msg)if __name__ == "__main__": # 維持執行的進程總數為processes,當一個進程執行完畢后會添加新的進程進去 pool = multiprocessing.Pool(processes=3) results = [] msgs = [] for i in range(10): msg = "hello [{}]".format(i) msgs.append(msg) results = pool.map(func_with_return, msgs) print "--" * 10 pool.close() # 關閉pool, 則不會有新的進程添加進去 pool.join() # 必須在join之前close, 然后join等待pool中所有的線程執行完畢 print "All process done." print "Return results: " for i in results: print i # 獲得進程的執行結果

執行結果:

"D:/Program Files/Anaconda2/python.exe" E:/pycharm/test/multiprocessing/v2.py*msg: hello [0]*msg: hello [1]*msg: hello [2]*end*end*msg: hello [3]*msg: hello [4]*end*msg: hello [5]*end*end*msg: hello [6]*msg: hello [7]*end*msg: hello [8]*end*end*msg: hello [9]*end*end--------------------All process done.Return results: hello [0] returnhello [1] returnhello [2] returnhello [3] returnhello [4] returnhello [5] returnhello [6] returnhello [7] returnhello [8] returnhello [9] returnProcess finished with exit code 0

注意:執行結果中“—-”的位置,可以看到,map之后,主進程是阻塞的,等待map的結果返回

close()

關閉進程池(pool),使其不在接受新的任務。

terminate()

結束工作進程,不在處理未處理的任務。

join()

主進程阻塞等待子進程的退出,join方法必須在close或terminate之后使用。

進程間通信

多進程最麻煩的地方就是進程間通信,IPC比線程通信要難處理的多,所以留作單獨一篇來記錄

利用multiprocessing實現一個最簡單的分布式作業調度系統

Job

首先創建一個Job類,為了測試簡單,只包含一個job id屬性,將來可以封裝一些作業狀態,作業命令,執行用戶等屬性。

job.py

#!/usr/bin/env python# -*- coding: utf-8 -*-class Job: def __init__(self, job_id): self.job_id = job_id

Master

Master用來派發作業和顯示運行完成的作業信息

master.py

#!/usr/bin/env python# -*- coding: utf-8 -*-from Queue import Queuefrom multiprocessing.managers import BaseManagerfrom job import Jobclass Master: def __init__(self): # 派發出去的作業隊列 self.dispatched_job_queue = Queue() # 完成的作業隊列 self.finished_job_queue = Queue() def get_dispatched_job_queue(self): return self.dispatched_job_queue def get_finished_job_queue(self): return self.finished_job_queue def start(self): # 把派發作業隊列和完成作業隊列注冊到網絡上 BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue) BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue) # 監聽端口和啟動服務 manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs') manager.start() # 使用上面注冊的方法獲取隊列 dispatched_jobs = manager.get_dispatched_job_queue() finished_jobs = manager.get_finished_job_queue() # 這里一次派發10個作業,等到10個作業都運行完后,繼續再派發10個作業 job_id = 0 while True:  for i in range(0, 10):  job_id = job_id + 1  job = Job(job_id)  print('Dispatch job: %s' % job.job_id)  dispatched_jobs.put(job)  while not dispatched_jobs.empty():  job = finished_jobs.get(60)  print('Finished Job: %s' % job.job_id) manager.shutdown()if __name__ == "__main__": master = Master() master.start()

Slave

Slave用來運行master派發的作業并將結果返回

slave.py

#!/usr/bin/env python# -*- coding: utf-8 -*-import timefrom Queue import Queuefrom multiprocessing.managers import BaseManagerfrom job import Jobclass Slave: def __init__(self): # 派發出去的作業隊列 self.dispatched_job_queue = Queue() # 完成的作業隊列 self.finished_job_queue = Queue() def start(self): # 把派發作業隊列和完成作業隊列注冊到網絡上 BaseManager.register('get_dispatched_job_queue') BaseManager.register('get_finished_job_queue') # 連接master server = '127.0.0.1' print('Connect to server %s...' % server) manager = BaseManager(address=(server, 8888), authkey='jobs') manager.connect() # 使用上面注冊的方法獲取隊列 dispatched_jobs = manager.get_dispatched_job_queue() finished_jobs = manager.get_finished_job_queue() # 運行作業并返回結果,這里只是模擬作業運行,所以返回的是接收到的作業 while True:  job = dispatched_jobs.get(timeout=1)  print('Run job: %s ' % job.job_id)  time.sleep(1)  finished_jobs.put(job)if __name__ == "__main__": slave = Slave() slave.start()

測試

分別打開三個linux終端,第一個終端運行master,第二個和第三個終端用了運行slave,運行結果如下

master

$ python master.py Dispatch job: 1Dispatch job: 2Dispatch job: 3Dispatch job: 4Dispatch job: 5Dispatch job: 6Dispatch job: 7Dispatch job: 8Dispatch job: 9Dispatch job: 10Finished Job: 1Finished Job: 2Finished Job: 3Finished Job: 4Finished Job: 5Finished Job: 6Finished Job: 7Finished Job: 8Finished Job: 9Dispatch job: 11Dispatch job: 12Dispatch job: 13Dispatch job: 14Dispatch job: 15Dispatch job: 16Dispatch job: 17Dispatch job: 18Dispatch job: 19Dispatch job: 20Finished Job: 10Finished Job: 11Finished Job: 12Finished Job: 13Finished Job: 14Finished Job: 15Finished Job: 16Finished Job: 17Finished Job: 18Dispatch job: 21Dispatch job: 22Dispatch job: 23Dispatch job: 24Dispatch job: 25Dispatch job: 26Dispatch job: 27Dispatch job: 28Dispatch job: 29Dispatch job: 30

slave1

$ python slave.py Connect to server 127.0.0.1...Run job: 1 Run job: 2 Run job: 3 Run job: 5 Run job: 7 Run job: 9 Run job: 11 Run job: 13 Run job: 15 Run job: 17 Run job: 19 Run job: 21 Run job: 23 

slave2

$ python slave.py Connect to server 127.0.0.1...Run job: 4 Run job: 6 Run job: 8 Run job: 10 Run job: 12 Run job: 14 Run job: 16 Run job: 18 Run job: 20 Run job: 22 Run job: 24 

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對VEVB武林網的支持。


注:相關教程知識閱讀請移步到python教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 青龙| 尼勒克县| 邵东县| 五莲县| 资源县| 南投市| 肥西县| 永安市| 孙吴县| 保定市| 山东省| 开化县| 仙游县| 宣汉县| 三原县| 松溪县| 彰武县| 宝坻区| 漠河县| 白山市| 鲜城| 波密县| 建德市| 太康县| 茌平县| 会东县| 京山县| 贵南县| 屏东市| 明光市| 霍山县| 曲阜市| 和静县| 巴彦县| 若羌县| 米脂县| 靖远县| 察哈| 新昌县| 揭东县| 牡丹江市|