国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 編程 > Python > 正文

Python并發(fā)之多進(jìn)程的方法實(shí)例代碼

2020-01-04 14:57:10
字體:
供稿:網(wǎng)友

一,進(jìn)程的理論基礎(chǔ)

一個(gè)應(yīng)用程序,歸根結(jié)底是一堆代碼,是靜態(tài)的,而進(jìn)程才是執(zhí)行中的程序,在一個(gè)程序運(yùn)行的時(shí)候會(huì)有多個(gè)進(jìn)程并發(fā)執(zhí)行。

進(jìn)程和線程的區(qū)別:

  • 進(jìn)程是系統(tǒng)資源分配的基本單位。
  • 一個(gè)進(jìn)程內(nèi)可以包含多個(gè)線程,屬于一對(duì)多的關(guān)系,進(jìn)程內(nèi)的資源,被其內(nèi)的線程共享
  • 線程是進(jìn)程運(yùn)行的最小單位,如果說進(jìn)程是完成一個(gè)功能,那么其線程就是完成這個(gè)功能的基本單位
  • 進(jìn)程間資源不共享,多進(jìn)程切換資源開銷,難度大,同一進(jìn)程內(nèi)的線程資源共享,多線程切換資源開銷,難度小

進(jìn)程與線程的共同點(diǎn):

都是為了提高程序運(yùn)行效率,都有執(zhí)行的優(yōu)先權(quán)

二,Python的多進(jìn)程( multiprocessing模塊)

創(chuàng)建一個(gè)進(jìn)程(和創(chuàng)建線程類似)

方法一:創(chuàng)建Process對(duì)象,通過對(duì)象調(diào)用start()方法啟動(dòng)進(jìn)程

from multiprocessing import Processdef foo(name): print('hello,%s'%name)if __name__ == '__main__': p1=Process(target=foo,args=('world',)) p2 = Process(target=foo, args=('China',)) p1.start() p2.start() print('=====主進(jìn)程=====') # == == =主進(jìn)程 == == = # hello, world # hello, China #主進(jìn)程和子進(jìn)程并發(fā)執(zhí)行 

注意:Process對(duì)象只能在在 if __name__ == '__main__':下創(chuàng)建,不然會(huì)報(bào)錯(cuò)。

方法二:自定義一個(gè)類繼承Process類,并重寫run()方法,將執(zhí)行代碼放在其內(nèi)

from multiprocessing import Processclass MyProcess(Process): def __init__(self,name):  super().__init__()  self.name = name def run(self):  print('hello,%s'%self.name)if __name__ == '__main__': myprocess1 = MyProcess('world') myprocess2 = MyProcess('world') myprocess1.start() myprocess2.start()

Process內(nèi)置方法

實(shí)例方法:

p.start():?jiǎn)?dòng)進(jìn)程,并調(diào)用該子進(jìn)程中的p.run()

p.run():進(jìn)程啟動(dòng)時(shí)運(yùn)行的方法,正是它去調(diào)用target指定的函數(shù),我們自定義類的類中一定要實(shí)現(xiàn)該方法 

p.terminate():強(qiáng)制終止進(jìn)程p,不會(huì)進(jìn)行任何清理操作,如果p創(chuàng)建了子進(jìn)程,該子進(jìn)程就成了僵尸進(jìn)程,使用該方法需要特別小心這種情況。如果p還保存了一個(gè)鎖那么也將不會(huì)被釋放,進(jìn)而導(dǎo)致死鎖

p.is_alive():如果p仍然運(yùn)行,返回True

p.join([timeout]):主線程等待p終止。timeout是可選的超時(shí)時(shí)間
Process屬性

p.daemon:默認(rèn)值為False,如果設(shè)為True,代表p為后臺(tái)運(yùn)行的守護(hù)進(jìn)程,當(dāng)p的父進(jìn)程終止時(shí),p也隨之終止,并且設(shè)定為True后,p不能創(chuàng)建自己的新進(jìn)程,必須在p.start()之前設(shè)置

p.name:進(jìn)程的名稱

p.pid:進(jìn)程的pid

p.exitcode:進(jìn)程在運(yùn)行時(shí)為None、如果為–N,表示被信號(hào)N結(jié)束(了解即可)

守護(hù)進(jìn)程

類似于守護(hù)線程,只不過守護(hù)線程是對(duì)象的一個(gè)方法,而守護(hù)進(jìn)程封裝成對(duì)象的屬性。

from multiprocessing import Processimport timeclass MyProcess(Process): def __init__(self,name):  super().__init__()  self.name = name def run(self):  time.sleep(3)  print('hello,%s'%self.name)if __name__ == '__main__': myprocess1=MyProcess('world') myprocess1.daemon = True myprocess1.start() print('結(jié)束')#不會(huì)輸出‘hello world',因?yàn)樵O(shè)置為守護(hù)進(jìn)程,主進(jìn)程不會(huì)等待

也可以使用join方法,使主進(jìn)程等待

from multiprocessing import Processimport timeclass MyProcess(Process): def __init__(self,name):  super().__init__()  self.name = name def run(self):  time.sleep(3)  print('hello,%s'%self.name)if __name__ == '__main__': myprocess1=MyProcess('world') myprocess1.daemon = True myprocess1.start() myprocess1.join() #程序阻塞 print('結(jié)束')join()

進(jìn)程同步和鎖

進(jìn)程雖然不像線程共享資源,但是這并不意味著進(jìn)程間不 需要加鎖,比如不同進(jìn)程會(huì)共享同一個(gè)終端 ( 屏幕),或者操作同一個(gè)文件,數(shù)據(jù)庫(kù),那么數(shù)據(jù)安全還是很有必要的,因此我們可以加鎖,

from multiprocessing import Process,Lockimport timedef a_print(l): #需要傳入對(duì)象,因?yàn)樾畔⒉还蚕?l.acquire() print('我要打印信息') time.sleep(1) print('我打印完了') l.release()if __name__ == '__main__': l = Lock() for i in range(20):  p = Process(target=a_print,args=(l,))  p.start()

信號(hào)量(Semaphore)

能夠并發(fā)執(zhí)行的進(jìn)程數(shù),超出的進(jìn)程阻塞,直到有進(jìn)程運(yùn)行完成。

Semaphore管理一個(gè)內(nèi)置的計(jì)數(shù)器,

每當(dāng)調(diào)用acquire()時(shí)內(nèi)置計(jì)數(shù)器-1;

調(diào)用release() 時(shí)內(nèi)置計(jì)數(shù)器+1;

計(jì)數(shù)器不能小于0;當(dāng)計(jì)數(shù)器為0時(shí),acquire()將阻塞進(jìn)程直到其他進(jìn)程調(diào)用release()。

from multiprocessing import Process,Queue,Semaphoreimport time,randomdef seat(s,n): s.acquire() print('學(xué)生%d坐下了'%n) time.sleep(random.randint(1,2)) s.release()if __name__ == '__main__': s = Semaphore(5) for i in range(20):  p = Process(target=seat,args=(s,i))  p.start() print('-----主進(jìn)程-------')

注意:其實(shí)信號(hào)量和鎖類似,只是限制進(jìn)程運(yùn)行某個(gè)代碼塊的數(shù)量(鎖為1個(gè)),并不是能限制并發(fā)的進(jìn)程,如上述代碼,一次性還是創(chuàng)建了20個(gè)進(jìn)程

事件(Event)

from multiprocessing import Process,Eventimport time, randomdef eating(event): event.wait() print('去吃飯的路上...')def makeing(event): print('做飯中') time.sleep(random.randint(1,2)) print('做好了,快來...') event.set()if __name__ == '__main__': event=Event() t1 = Process(target=eating,args=(event,)) t2 = Process(target=makeing,args=(event,)) t1.start() t2.start() # 做飯中 # 做好了,快來... # 去吃飯的路上...

和線程事件幾乎一致

進(jìn)程隊(duì)列(Queue)

進(jìn)程隊(duì)列是進(jìn)程通訊的方式之一。使用multiprocessing 下的Queue

from multiprocessing import Process,Queueimport timedef func1(queue): while True:  info=queue.get()  if info == None:   return   print(info)def func2(queue): for i in range(10):  time.sleep(1)  queue.put('is %d'%i) queue.put(None) #結(jié)束的標(biāo)志if __name__ == '__main__': q = Queue() p1 = Process(target=func1,args=(q,)) p2 = Process(target=func2, args=(q,)) p1.start() p2.start()Queue類的方法,源碼如下:class Queue(object): def __init__(self, maxsize=-1): #可以傳參設(shè)置隊(duì)列最大容量  self._maxsize = maxsize def qsize(self): #返回當(dāng)前時(shí)刻隊(duì)列中的個(gè)數(shù)  return 0 def empty(self): #是否為空  return False def full(self): 是否滿了  return False def put(self, obj, block=True, timeout=None): #放值,blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,該方法會(huì)阻塞timeout指定的時(shí)間,直到該隊(duì)列有剩余的空間。如果超時(shí),會(huì)拋出Queue.Full異常。如果blocked為False,但該Queue已滿,會(huì)立即拋出Queue.Full異常  pass def put_nowait(self, obj): #=put(False)  pass def get(self, block=True, timeout=None): 獲取值,get方法有兩個(gè)可選參數(shù):blocked和timeout。如果blocked為True(默認(rèn)值),并且timeout為正值,那么在等待時(shí)間內(nèi)沒有取到任何元素,會(huì)拋出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個(gè)值可用,則立即返回該值,否則,如果隊(duì)列為空,則立即拋出Queue.Empty異常.  pass def get_nowait(self): # = get(False)  pass def close(self): #將隊(duì)列關(guān)閉  pass def join_thread(self): #略,幾乎不用  pass def cancel_join_thread(self):  pass

進(jìn)程隊(duì)列源碼注釋

進(jìn)程池

進(jìn)程的消耗是很大的,因此我們不能無節(jié)制的開啟新進(jìn)程,因此我們可以 通過維護(hù)一個(gè)進(jìn)程池來控制進(jìn)程的數(shù)量 。這就不同于信號(hào)量,進(jìn)程池可以從源頭控制進(jìn)程數(shù)量。在Python中可以通過如下方法使用

同步調(diào)用

from multiprocessing import Poolimport time, random, osdef func(n): pid = os.getpid() print('進(jìn)程%s正在處理第%d個(gè)任務(wù)'%(pid,n),'時(shí)間%s'%time.strftime('%H-%M-%S')) time.sleep(2) res = '處理%s'%random.choice(['成功','失敗']) return resif __name__ == '__main__': p = Pool(4) #創(chuàng)建4個(gè)進(jìn)程, li = [] for i in range(10):  res = p.apply(func,args=(i,)) 交給進(jìn)程池處理,處理完成才返回值,會(huì)阻塞,即使池內(nèi)還有空余進(jìn)程,相當(dāng)于順序執(zhí)行  li.append(res) for i in li:  print(i)

#進(jìn)程1916正在處理第0個(gè)任務(wù) 時(shí)間21-02-53
#進(jìn)程1240正在處理第1個(gè)任務(wù) 時(shí)間21-02-55
#進(jìn)程3484正在處理第2個(gè)任務(wù) 時(shí)間21-02-57
#進(jìn)程7512正在處理第3個(gè)任務(wù) 時(shí)間21-02-59
#進(jìn)程1916正在處理第4個(gè)任務(wù) 時(shí)間21-03-01
#進(jìn)程1240正在處理第5個(gè)任務(wù) 時(shí)間21-03-03
#進(jìn)程3484正在處理第6個(gè)任務(wù) 時(shí)間21-03-05
#進(jìn)程7512正在處理第7個(gè)任務(wù) 時(shí)間21-03-07
#進(jìn)程1916正在處理第8個(gè)任務(wù) 時(shí)間21-03-09
#進(jìn)程1240正在處理第9個(gè)任務(wù) 時(shí)間21-03-11

從結(jié)果可以發(fā)現(xiàn)兩點(diǎn):

  1. 不是并發(fā)處理
  2. 一直都只有四個(gè)進(jìn)程,串行執(zhí)行

因此進(jìn)程池提供了 異步處理 的方式

from multiprocessing import Poolimport time, random, osdef func(n): pid = os.getpid() print('進(jìn)程%s正在處理第%d個(gè)任務(wù)'%(pid,n),'時(shí)間%s'%time.strftime('%H-%M-%S')) time.sleep(2) res = '處理%s'%random.choice(['成功','失敗']) return resif __name__ == '__main__': p = Pool(4) li = [] for i in range(10):  res = p.apply_async(func,args=(i,)) 結(jié)果不會(huì)立刻返回,遇到阻塞,開啟下一個(gè)進(jìn)程,在這,相當(dāng)于幾乎同時(shí)出現(xiàn)四個(gè)打印結(jié)果(一個(gè)線程處理一個(gè)任務(wù),處理完下個(gè)任務(wù)才能進(jìn)來)  li.append(res) p.close() #join之前需要關(guān)閉進(jìn)程池 p.join() #因?yàn)楫惒?,所以需要等待池?nèi)進(jìn)程工作結(jié)束再繼續(xù) for i in li:  print(i.get()) #i是一個(gè)對(duì)象,通過get方法獲取返回值,而同步則沒有該方法

關(guān)于回調(diào)函數(shù)

from multiprocessing import Poolimport time, random, osdef func(n): pid = os.getpid() print('進(jìn)程%s正在處理第%d個(gè)任務(wù)'%(pid,n),'時(shí)間%s'%time.strftime('%H-%M-%S')) time.sleep(2) res = '處理%s'%random.choice(['成功','失敗']) return resdef foo(info): print(info) #傳入值為進(jìn)程執(zhí)行結(jié)果if __name__ == '__main__': p = Pool(4) li = [] for i in range(10):  res = p.apply_async(func,args=(i,),callback = foo) callback()回調(diào)函數(shù)會(huì)在進(jìn)程執(zhí)行完之后調(diào)用(主進(jìn)程調(diào)用)   li.append(res) p.close()  p.join()  for i in li:  print(i.get()) 

有回調(diào)函數(shù)

總結(jié)

以上所述是小編給大家介紹的Python并發(fā)之多進(jìn)程的方法實(shí)例代碼,希望對(duì)大家有所幫助,如果大家有任何疑問請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)VEVB武林網(wǎng)網(wǎng)站的支持!


注:相關(guān)教程知識(shí)閱讀請(qǐng)移步到python教程頻道。
發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 松原市| 右玉县| 台湾省| 金坛市| 衡水市| 邵武市| 南康市| 贺州市| 社会| 兴仁县| 湘潭市| 太和县| 剑川县| 久治县| 盘山县| 嫩江县| 广灵县| 建宁县| 藁城市| 左贡县| 绥化市| 开封市| 策勒县| 宜黄县| 驻马店市| 上杭县| 湄潭县| 同心县| 南皮县| 运城市| 沈阳市| 岢岚县| 南充市| 蒙山县| 开平市| 嘉善县| 卓尼县| 新宾| 山东省| 延津县| 大同市|