本文實例講述了Python多進程multiprocessing用法。分享給大家供大家參考,具體如下:
mutilprocess簡介
像線程一樣管理進程,這個是mutilprocess的核心,他與threading很是相像,對多核CPU的利用率會比threading好的多。
簡單的創建進程:
import multiprocessingdef worker(num): """thread worker function""" print 'Worker:', num returnif __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start()
確定當前的進程,即是給進程命名,方便標識區分,跟蹤
import multiprocessingimport timedef worker(): name = multiprocessing.current_process().name print name, 'Starting' time.sleep(2) print name, 'Exiting'def my_service(): name = multiprocessing.current_process().name print name, 'Starting' time.sleep(3) print name, 'Exiting'if __name__ == '__main__': service = multiprocessing.Process(name='my_service', target=my_service) worker_1 = multiprocessing.Process(name='worker 1', target=worker) worker_2 = multiprocessing.Process(target=worker) # default name worker_1.start() worker_2.start() service.start()
守護進程就是不阻擋主程序退出,自己干自己的 mutilprocess.setDaemon(True)就這句等待守護進程退出,要加上join,join可以傳入浮點數值,等待n久就不等了
守護進程:
import multiprocessingimport timeimport sysdef daemon(): name = multiprocessing.current_process().name print 'Starting:', name time.sleep(2) print 'Exiting :', namedef non_daemon(): name = multiprocessing.current_process().name print 'Starting:', name print 'Exiting :', nameif __name__ == '__main__': d = multiprocessing.Process(name='daemon', target=daemon) d.daemon = True n = multiprocessing.Process(name='non-daemon', target=non_daemon) n.daemon = False d.start() n.start() d.join(1) print 'd.is_alive()', d.is_alive() n.join()
最好使用 poison pill,強制的使用terminate()注意 terminate之后要join,使其可以更新狀態
終止進程:
import multiprocessingimport timedef slow_worker(): print 'Starting worker' time.sleep(0.1) print 'Finished worker'if __name__ == '__main__': p = multiprocessing.Process(target=slow_worker) print 'BEFORE:', p, p.is_alive() p.start() print 'DURING:', p, p.is_alive() p.terminate() print 'TERMINATED:', p, p.is_alive() p.join() print 'JOINED:', p, p.is_alive()
①. == 0 未生成任何錯誤
②. 0 進程有一個錯誤,并以該錯誤碼退出
③. < 0 進程由一個-1 * exitcode信號結束
進程的退出狀態:
import multiprocessingimport sysimport timedef exit_error(): sys.exit(1)def exit_ok(): returndef return_value(): return 1def raises(): raise RuntimeError('There was an error!')def terminated(): time.sleep(3)if __name__ == '__main__': jobs = [] for f in [exit_error, exit_ok, return_value, raises, terminated]: print 'Starting process for', f.func_name j = multiprocessing.Process(target=f, name=f.func_name) jobs.append(j) j.start() jobs[-1].terminate() for j in jobs: j.join() print '%15s.exitcode = %s' % (j.name, j.exitcode)方便的調試,可以用logging
日志:
import multiprocessingimport loggingimport sysdef worker(): print 'Doing some work' sys.stdout.flush()if __name__ == '__main__': multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) p = multiprocessing.Process(target=worker) p.start() p.join()
利用class來創建進程,定制子類
派生進程:
import multiprocessingclass Worker(multiprocessing.Process): def run(self): print 'In %s' % self.name returnif __name__ == '__main__': jobs = [] for i in range(5): p = Worker() jobs.append(p) p.start() for j in jobs: j.join()
python進程間傳遞消息:
import multiprocessingclass MyFancyClass(object): def __init__(self, name): self.name = name def do_something(self): proc_name = multiprocessing.current_process().name print 'Doing something fancy in %s for %s!' % / (proc_name, self.name)def worker(q): obj = q.get() obj.do_something()if __name__ == '__main__': queue = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(queue,)) p.start() queue.put(MyFancyClass('Fancy Dan')) # Wait for the worker to finish queue.close() queue.join_thread() p.join()import multiprocessingimport timeclass Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: # Poison pill means shutdown print '%s: Exiting' % proc_name self.task_queue.task_done() break print '%s: %s' % (proc_name, next_task) answer = next_task() self.task_queue.task_done() self.result_queue.put(answer) returnclass Task(object): def __init__(self, a, b): self.a = a self.b = b def __call__(self): time.sleep(0.1) # pretend to take some time to do the work return '%s * %s = %s' % (self.a, self.b, self.a * self.b) def __str__(self): return '%s * %s' % (self.a, self.b)if __name__ == '__main__': # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() # Start consumers num_consumers = multiprocessing.cpu_count() * 2 print 'Creating %d consumers' % num_consumers consumers = [ Consumer(tasks, results) for i in xrange(num_consumers) ] for w in consumers: w.start() # Enqueue jobs num_jobs = 10 for i in xrange(num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in xrange(num_consumers): tasks.put(None) # Wait for all of the tasks to finish tasks.join() # Start printing results while num_jobs: result = results.get() print 'Result:', result num_jobs -= 1Event提供一種簡單的方法,可以在進程間傳遞狀態信息。事件可以切換設置和未設置狀態。通過使用一個可選的超時值,時間對象的用戶可以等待其狀態從未設置變為設置。
進程間信號傳遞:
import multiprocessingimport timedef wait_for_event(e): """Wait for the event to be set before doing anything""" print 'wait_for_event: starting' e.wait() print 'wait_for_event: e.is_set()->', e.is_set()def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" print 'wait_for_event_timeout: starting' e.wait(t) print 'wait_for_event_timeout: e.is_set()->', e.is_set()if __name__ == '__main__': e = multiprocessing.Event() w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,)) w1.start() w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2)) w2.start() print 'main: waiting before calling Event.set()' time.sleep(3) e.set() print 'main: event is set'
Python多進程,一般的情況是Queue來傳遞。
Queue:
from multiprocessing import Process, Queuedef f(q): q.put([42, None, 'hello'])if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join()
多線程優先隊列Queue:
import Queueimport threadingimport timeexitFlag = 0class myThread (threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print "Starting " + self.name process_data(self.name, self.q) print "Exiting " + self.namedef process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print "%s processing %s" % (threadName, data) else: queueLock.release() time.sleep(1)threadList = ["Thread-1", "Thread-2", "Thread-3"]nameList = ["One", "Two", "Three", "Four", "Five"]queueLock = threading.Lock()workQueue = Queue.Queue(10)threads = []threadID = 1# Create new threadsfor tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1# Fill the queuequeueLock.acquire()for word in nameList: workQueue.put(word)queueLock.release()# Wait for queue to emptywhile not workQueue.empty(): pass# Notify threads it's time to exitexitFlag = 1# Wait for all threads to completefor t in threads: t.join()print "Exiting Main Thread"
多進程使用Queue通信的例子
import timefrom multiprocessing import Process,QueueMSG_QUEUE = Queue(5)def startA(msgQueue): while True: if msgQueue.empty() > 0: print ('queue is empty %d' % (msgQueue.qsize())) else: msg = msgQueue.get() print( 'get msg %s' % (msg,)) time.sleep(1)def startB(msgQueue): while True: msgQueue.put('hello world') print( 'put hello world queue size is %d' % (msgQueue.qsize(),)) time.sleep(3)if __name__ == '__main__': processA = Process(target=startA,args=(MSG_QUEUE,)) processB = Process(target=startB,args=(MSG_QUEUE,)) processA.start() print( 'processA start..')主進程定義了一個Queue類型的變量,并作為Process的args參數傳給子進程processA和processB,兩個進程一個向隊列中寫數據,一個讀數據。
更多關于Python相關內容感興趣的讀者可查看本站專題:《Python進程與線程操作技巧總結》、《Python Socket編程技巧總結》、《Python數據結構與算法教程》、《Python函數使用技巧總結》、《Python字符串操作技巧匯總》、《Python入門與進階經典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對大家Python程序設計有所幫助。
新聞熱點
疑難解答
圖片精選