国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

Python多進程multiprocessing用法實例分析

2019-11-25 15:55:31
字體:
來源:轉載
供稿:網友

本文實例講述了Python多進程multiprocessing用法。分享給大家供大家參考,具體如下:

mutilprocess簡介

像線程一樣管理進程,這個是mutilprocess的核心,他與threading很是相像,對多核CPU的利用率會比threading好的多。

簡單的創建進程:

import multiprocessingdef worker(num):  """thread worker function"""  print 'Worker:', num  returnif __name__ == '__main__':  jobs = []  for i in range(5):    p = multiprocessing.Process(target=worker, args=(i,))    jobs.append(p)    p.start()

確定當前的進程,即是給進程命名,方便標識區分,跟蹤

import multiprocessingimport timedef worker():  name = multiprocessing.current_process().name  print name, 'Starting'  time.sleep(2)  print name, 'Exiting'def my_service():  name = multiprocessing.current_process().name  print name, 'Starting'  time.sleep(3)  print name, 'Exiting'if __name__ == '__main__':  service = multiprocessing.Process(name='my_service',                   target=my_service)  worker_1 = multiprocessing.Process(name='worker 1',                    target=worker)  worker_2 = multiprocessing.Process(target=worker) # default name  worker_1.start()  worker_2.start()  service.start()

守護進程就是不阻擋主程序退出,自己干自己的 mutilprocess.setDaemon(True)就這句等待守護進程退出,要加上join,join可以傳入浮點數值,等待n久就不等了

守護進程:

import multiprocessingimport timeimport sysdef daemon():  name = multiprocessing.current_process().name  print 'Starting:', name  time.sleep(2)  print 'Exiting :', namedef non_daemon():  name = multiprocessing.current_process().name  print 'Starting:', name  print 'Exiting :', nameif __name__ == '__main__':  d = multiprocessing.Process(name='daemon',                target=daemon)  d.daemon = True  n = multiprocessing.Process(name='non-daemon',                target=non_daemon)  n.daemon = False  d.start()  n.start()  d.join(1)  print 'd.is_alive()', d.is_alive()  n.join()

最好使用 poison pill,強制的使用terminate()注意 terminate之后要join,使其可以更新狀態

終止進程:

import multiprocessingimport timedef slow_worker():  print 'Starting worker'  time.sleep(0.1)  print 'Finished worker'if __name__ == '__main__':  p = multiprocessing.Process(target=slow_worker)  print 'BEFORE:', p, p.is_alive()  p.start()  print 'DURING:', p, p.is_alive()  p.terminate()  print 'TERMINATED:', p, p.is_alive()  p.join()  print 'JOINED:', p, p.is_alive()

①. == 0 未生成任何錯誤 
②. 0 進程有一個錯誤,并以該錯誤碼退出
③. < 0 進程由一個-1 * exitcode信號結束

進程的退出狀態:

import multiprocessingimport sysimport timedef exit_error():  sys.exit(1)def exit_ok():  returndef return_value():  return 1def raises():  raise RuntimeError('There was an error!')def terminated():  time.sleep(3)if __name__ == '__main__':  jobs = []  for f in [exit_error, exit_ok, return_value, raises, terminated]:    print 'Starting process for', f.func_name    j = multiprocessing.Process(target=f, name=f.func_name)    jobs.append(j)    j.start()  jobs[-1].terminate()  for j in jobs:    j.join()    print '%15s.exitcode = %s' % (j.name, j.exitcode)

方便的調試,可以用logging

日志:

import multiprocessingimport loggingimport sysdef worker():  print 'Doing some work'  sys.stdout.flush()if __name__ == '__main__':  multiprocessing.log_to_stderr()  logger = multiprocessing.get_logger()  logger.setLevel(logging.INFO)  p = multiprocessing.Process(target=worker)  p.start()  p.join()

利用class來創建進程,定制子類

派生進程:

import multiprocessingclass Worker(multiprocessing.Process):  def run(self):    print 'In %s' % self.name    returnif __name__ == '__main__':  jobs = []  for i in range(5):    p = Worker()    jobs.append(p)    p.start()  for j in jobs:    j.join()

python進程間傳遞消息:

import multiprocessingclass MyFancyClass(object):  def __init__(self, name):    self.name = name  def do_something(self):    proc_name = multiprocessing.current_process().name    print 'Doing something fancy in %s for %s!' % /      (proc_name, self.name)def worker(q):  obj = q.get()  obj.do_something()if __name__ == '__main__':  queue = multiprocessing.Queue()  p = multiprocessing.Process(target=worker, args=(queue,))  p.start()  queue.put(MyFancyClass('Fancy Dan'))  # Wait for the worker to finish  queue.close()  queue.join_thread()  p.join()import multiprocessingimport timeclass Consumer(multiprocessing.Process):  def __init__(self, task_queue, result_queue):    multiprocessing.Process.__init__(self)    self.task_queue = task_queue    self.result_queue = result_queue  def run(self):    proc_name = self.name    while True:      next_task = self.task_queue.get()      if next_task is None:        # Poison pill means shutdown        print '%s: Exiting' % proc_name        self.task_queue.task_done()        break      print '%s: %s' % (proc_name, next_task)      answer = next_task()      self.task_queue.task_done()      self.result_queue.put(answer)    returnclass Task(object):  def __init__(self, a, b):    self.a = a    self.b = b  def __call__(self):    time.sleep(0.1) # pretend to take some time to do the work    return '%s * %s = %s' % (self.a, self.b, self.a * self.b)  def __str__(self):    return '%s * %s' % (self.a, self.b)if __name__ == '__main__':  # Establish communication queues  tasks = multiprocessing.JoinableQueue()  results = multiprocessing.Queue()  # Start consumers  num_consumers = multiprocessing.cpu_count() * 2  print 'Creating %d consumers' % num_consumers  consumers = [ Consumer(tasks, results)         for i in xrange(num_consumers) ]  for w in consumers:    w.start()  # Enqueue jobs  num_jobs = 10  for i in xrange(num_jobs):    tasks.put(Task(i, i))  # Add a poison pill for each consumer  for i in xrange(num_consumers):    tasks.put(None)  # Wait for all of the tasks to finish  tasks.join()  # Start printing results  while num_jobs:    result = results.get()    print 'Result:', result    num_jobs -= 1

Event提供一種簡單的方法,可以在進程間傳遞狀態信息。事件可以切換設置和未設置狀態。通過使用一個可選的超時值,時間對象的用戶可以等待其狀態從未設置變為設置。

進程間信號傳遞:

import multiprocessingimport timedef wait_for_event(e):  """Wait for the event to be set before doing anything"""  print 'wait_for_event: starting'  e.wait()  print 'wait_for_event: e.is_set()->', e.is_set()def wait_for_event_timeout(e, t):  """Wait t seconds and then timeout"""  print 'wait_for_event_timeout: starting'  e.wait(t)  print 'wait_for_event_timeout: e.is_set()->', e.is_set()if __name__ == '__main__':  e = multiprocessing.Event()  w1 = multiprocessing.Process(name='block',                  target=wait_for_event,                 args=(e,))  w1.start()  w2 = multiprocessing.Process(name='nonblock',                  target=wait_for_event_timeout,                  args=(e, 2))  w2.start()  print 'main: waiting before calling Event.set()'  time.sleep(3)  e.set()  print 'main: event is set'

Python多進程,一般的情況是Queue來傳遞。

Queue:

from multiprocessing import Process, Queuedef f(q):  q.put([42, None, 'hello'])if __name__ == '__main__':  q = Queue()  p = Process(target=f, args=(q,))  p.start()  print q.get()  # prints "[42, None, 'hello']"  p.join()

多線程優先隊列Queue:

import Queueimport threadingimport timeexitFlag = 0class myThread (threading.Thread):  def __init__(self, threadID, name, q):    threading.Thread.__init__(self)    self.threadID = threadID    self.name = name    self.q = q  def run(self):    print "Starting " + self.name    process_data(self.name, self.q)    print "Exiting " + self.namedef process_data(threadName, q):  while not exitFlag:    queueLock.acquire()    if not workQueue.empty():      data = q.get()      queueLock.release()      print "%s processing %s" % (threadName, data)    else:      queueLock.release()    time.sleep(1)threadList = ["Thread-1", "Thread-2", "Thread-3"]nameList = ["One", "Two", "Three", "Four", "Five"]queueLock = threading.Lock()workQueue = Queue.Queue(10)threads = []threadID = 1# Create new threadsfor tName in threadList:  thread = myThread(threadID, tName, workQueue)  thread.start()  threads.append(thread)  threadID += 1# Fill the queuequeueLock.acquire()for word in nameList:  workQueue.put(word)queueLock.release()# Wait for queue to emptywhile not workQueue.empty():  pass# Notify threads it's time to exitexitFlag = 1# Wait for all threads to completefor t in threads:  t.join()print "Exiting Main Thread"

多進程使用Queue通信的例子

import timefrom multiprocessing import Process,QueueMSG_QUEUE = Queue(5)def startA(msgQueue):  while True:    if msgQueue.empty() > 0:      print ('queue is empty %d' % (msgQueue.qsize()))    else:      msg = msgQueue.get()      print( 'get msg %s' % (msg,))    time.sleep(1)def startB(msgQueue):  while True:    msgQueue.put('hello world')    print( 'put hello world queue size is %d' % (msgQueue.qsize(),))    time.sleep(3)if __name__ == '__main__':  processA = Process(target=startA,args=(MSG_QUEUE,))  processB = Process(target=startB,args=(MSG_QUEUE,))  processA.start()  print( 'processA start..')

主進程定義了一個Queue類型的變量,并作為Process的args參數傳給子進程processA和processB,兩個進程一個向隊列中寫數據,一個讀數據。

更多關于Python相關內容感興趣的讀者可查看本站專題:《Python進程與線程操作技巧總結》、《Python Socket編程技巧總結》、《Python數據結構與算法教程》、《Python函數使用技巧總結》、《Python字符串操作技巧匯總》、《Python入門與進階經典教程》及《Python文件與目錄操作技巧匯總

希望本文所述對大家Python程序設計有所幫助。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 罗城| 威远县| 荔波县| 梅州市| 宜都市| 梨树县| 拉孜县| 梨树县| 武川县| 老河口市| 呼伦贝尔市| 无极县| 绍兴县| 茌平县| 荥经县| 凤庆县| 松原市| 洛浦县| 阳曲县| 桦南县| 博乐市| 时尚| 丰顺县| 通州区| 东方市| 五大连池市| 顺平县| 岳池县| 华亭县| 文安县| 当阳市| 乐平市| 合作市| 浑源县| 余江县| 南靖县| 深州市| 辰溪县| 来安县| 万盛区| 山西省|