国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

Python使用multiprocessing實現一個最簡單的分布式作業調度系統

2019-11-25 16:52:57
字體:
來源:轉載
供稿:網友

 mutilprocess像線程一樣管理進程,這個是mutilprocess的核心,他與threading很是相像,對多核CPU的利用率會比threading好的多。

介紹

Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。一個服務進程可以作為調度者,將任務分布到其他多個機器的多個進程中,依靠網絡通信。

想到這,就在想是不是可以使用此模塊來實現一個簡單的作業調度系統。

實現

Job

首先創建一個Job類,為了測試簡單,只包含一個job id屬性

job.py

#!/usr/bin/env python# -*- coding: utf-8 -*-class Job:def __init__(self, job_id):self.job_id = job_id

Master

Master用來派發作業和顯示運行完成的作業信息

master.py

#!/usr/bin/env python# -*- coding: utf-8 -*-from Queue import Queuefrom multiprocessing.managers import BaseManagerfrom job import Job

class Master:

def __init__(self):# 派發出去的作業隊列self.dispatched_job_queue = Queue()# 完成的作業隊列self.finished_job_queue = Queue()def get_dispatched_job_queue(self):return self.dispatched_job_queuedef get_finished_job_queue(self):return self.finished_job_queuedef start(self):# 把派發作業隊列和完成作業隊列注冊到網絡上BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue)BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue)# 監聽端口和啟動服務manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')manager.start()# 使用上面注冊的方法獲取隊列dispatched_jobs = manager.get_dispatched_job_queue()finished_jobs = manager.get_finished_job_queue()# 這里一次派發10個作業,等到10個作業都運行完后,繼續再派發10個作業job_id = 0while True:for i in range(0, 10):job_id = job_id + 1job = Job(job_id)print('Dispatch job: %s' % job.job_id)dispatched_jobs.put(job)while not dispatched_jobs.empty():job = finished_jobs.get(60)print('Finished Job: %s' % job.job_id)manager.shutdown()if __name__ == "__main__":master = Master()master.start()

Slave

Slave用來運行master派發的作業并將結果返回

slave.py

#!/usr/bin/env python# -*- coding: utf-8 -*-import timefrom Queue import Queuefrom multiprocessing.managers import BaseManagerfrom job import Job

class Slave:

def __init__(self):# 派發出去的作業隊列self.dispatched_job_queue = Queue()# 完成的作業隊列self.finished_job_queue = Queue()

def start(self):

# 把派發作業隊列和完成作業隊列注冊到網絡上BaseManager.register('get_dispatched_job_queue')BaseManager.register('get_finished_job_queue')# 連接masterserver = '127.0.0.1'print('Connect to server %s...' % server)manager = BaseManager(address=(server, 8888), authkey='jobs')manager.connect()# 使用上面注冊的方法獲取隊列dispatched_jobs = manager.get_dispatched_job_queue()finished_jobs = manager.get_finished_job_queue()# 運行作業并返回結果,這里只是模擬作業運行,所以返回的是接收到的作業while True:job = dispatched_jobs.get(timeout=1)print('Run job: %s ' % job.job_id)time.sleep(1)finished_jobs.put(job)if __name__ == "__main__":slave = Slave()slave.start()

測試

分別打開三個linux終端,第一個終端運行master,第二個和第三個終端用了運行slave,運行結果如下

master

$ python master.py Dispatch job: 1Dispatch job: 2Dispatch job: 3Dispatch job: 4Dispatch job: 5Dispatch job: 6Dispatch job: 7Dispatch job: 8Dispatch job: 9Dispatch job: 10Finished Job: 1Finished Job: 2Finished Job: 3Finished Job: 4Finished Job: 5Finished Job: 6Finished Job: 7Finished Job: 8Finished Job: 9Dispatch job: 11Dispatch job: 12Dispatch job: 13Dispatch job: 14Dispatch job: 15Dispatch job: 16Dispatch job: 17Dispatch job: 18Dispatch job: 19Dispatch job: 20Finished Job: 10Finished Job: 11Finished Job: 12Finished Job: 13Finished Job: 14Finished Job: 15Finished Job: 16Finished Job: 17Finished Job: 18Dispatch job: 21Dispatch job: 22Dispatch job: 23Dispatch job: 24Dispatch job: 25Dispatch job: 26Dispatch job: 27Dispatch job: 28Dispatch job: 29Dispatch job: 30

slave1

$ python slave.py Connect to server 127.0.0.1...Run job: 1 Run job: 2 Run job: 3 Run job: 5 Run job: 7 Run job: 9 Run job: 11 Run job: 13 Run job: 15 Run job: 17 Run job: 19 Run job: 21 Run job: 23 

slave2

$ python slave.py Connect to server 127.0.0.1...Run job: 4 Run job: 6 Run job: 8 Run job: 10 Run job: 12 Run job: 14 Run job: 16 Run job: 18 Run job: 20 Run job: 22 Run job: 24 

以上內容是小編給大家介紹的Python使用multiprocessing實現一個最簡單的分布式作業調度系統,希望對大家有所幫助!

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 砀山县| 武穴市| 郯城县| 武强县| 石门县| 孝感市| 沅陵县| 郓城县| 平利县| 新余市| 昭平县| 双桥区| 罗定市| 合山市| 南阳市| 修水县| 塔城市| 洛阳市| 库尔勒市| 四川省| 通榆县| 天水市| 太保市| 京山县| 沅江市| 彭州市| 巴东县| 黄龙县| 通化县| 新民市| 齐齐哈尔市| 察雅县| 宽甸| 黎平县| 银川市| 柳河县| 柳河县| 南和县| 龙川县| 前郭尔| 澳门|