mutilprocess像線程一樣管理進程,這個是mutilprocess的核心,他與threading很是相像,對多核CPU的利用率會比threading好的多。
介紹
Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個機器的多個進程中,依靠網絡通信。
想到這,就在想是不是可以使用此模塊來實現一個簡單的作業調度系統。
實現
Job
首先創建一個Job類,為了測試簡單,只包含一個job id屬性
job.py
#!/usr/bin/env python# -*- coding: utf-8 -*-class Job:def __init__(self, job_id):self.job_id = job_id
Master
Master用來派發作業和顯示運行完成的作業信息
master.py
#!/usr/bin/env python# -*- coding: utf-8 -*-from Queue import Queuefrom multiprocessing.managers import BaseManagerfrom job import Job
class Master:
def __init__(self):# 派發出去的作業隊列self.dispatched_job_queue = Queue()# 完成的作業隊列self.finished_job_queue = Queue()def get_dispatched_job_queue(self):return self.dispatched_job_queuedef get_finished_job_queue(self):return self.finished_job_queuedef start(self):# 把派發作業隊列和完成作業隊列注冊到網絡上BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue)BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue)# 監聽端口和啟動服務manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')manager.start()# 使用上面注冊的方法獲取隊列dispatched_jobs = manager.get_dispatched_job_queue()finished_jobs = manager.get_finished_job_queue()# 這里一次派發10個作業,等到10個作業都運行完后,繼續再派發10個作業job_id = 0while True:for i in range(0, 10):job_id = job_id + 1job = Job(job_id)print('Dispatch job: %s' % job.job_id)dispatched_jobs.put(job)while not dispatched_jobs.empty():job = finished_jobs.get(60)print('Finished Job: %s' % job.job_id)manager.shutdown()if __name__ == "__main__":master = Master()master.start()Slave
Slave用來運行master派發的作業并將結果返回
slave.py
#!/usr/bin/env python# -*- coding: utf-8 -*-import timefrom Queue import Queuefrom multiprocessing.managers import BaseManagerfrom job import Job
class Slave:
def __init__(self):# 派發出去的作業隊列self.dispatched_job_queue = Queue()# 完成的作業隊列self.finished_job_queue = Queue()
def start(self):
# 把派發作業隊列和完成作業隊列注冊到網絡上BaseManager.register('get_dispatched_job_queue')BaseManager.register('get_finished_job_queue')# 連接masterserver = '127.0.0.1'print('Connect to server %s...' % server)manager = BaseManager(address=(server, 8888), authkey='jobs')manager.connect()# 使用上面注冊的方法獲取隊列dispatched_jobs = manager.get_dispatched_job_queue()finished_jobs = manager.get_finished_job_queue()# 運行作業并返回結果,這里只是模擬作業運行,所以返回的是接收到的作業while True:job = dispatched_jobs.get(timeout=1)print('Run job: %s ' % job.job_id)time.sleep(1)finished_jobs.put(job)if __name__ == "__main__":slave = Slave()slave.start()測試
分別打開三個linux終端,第一個終端運行master,第二個和第三個終端用了運行slave,運行結果如下
master
$ python master.py Dispatch job: 1Dispatch job: 2Dispatch job: 3Dispatch job: 4Dispatch job: 5Dispatch job: 6Dispatch job: 7Dispatch job: 8Dispatch job: 9Dispatch job: 10Finished Job: 1Finished Job: 2Finished Job: 3Finished Job: 4Finished Job: 5Finished Job: 6Finished Job: 7Finished Job: 8Finished Job: 9Dispatch job: 11Dispatch job: 12Dispatch job: 13Dispatch job: 14Dispatch job: 15Dispatch job: 16Dispatch job: 17Dispatch job: 18Dispatch job: 19Dispatch job: 20Finished Job: 10Finished Job: 11Finished Job: 12Finished Job: 13Finished Job: 14Finished Job: 15Finished Job: 16Finished Job: 17Finished Job: 18Dispatch job: 21Dispatch job: 22Dispatch job: 23Dispatch job: 24Dispatch job: 25Dispatch job: 26Dispatch job: 27Dispatch job: 28Dispatch job: 29Dispatch job: 30
slave1
$ python slave.py Connect to server 127.0.0.1...Run job: 1 Run job: 2 Run job: 3 Run job: 5 Run job: 7 Run job: 9 Run job: 11 Run job: 13 Run job: 15 Run job: 17 Run job: 19 Run job: 21 Run job: 23
slave2
$ python slave.py Connect to server 127.0.0.1...Run job: 4 Run job: 6 Run job: 8 Run job: 10 Run job: 12 Run job: 14 Run job: 16 Run job: 18 Run job: 20 Run job: 22 Run job: 24
以上內容是小編給大家介紹的Python使用multiprocessing實現一個最簡單的分布式作業調度系統,希望對大家有所幫助!
新聞熱點
疑難解答
圖片精選