問題起因
最近要將一個文本分割成好幾個topic,每個topic設計一個regressor,各regressor是相互獨立的,最后匯總所有topic的regressor得到總得預測結果。沒錯!類似bagging ensemble!只是我沒有抽樣。文本不大,大概3000行,topic個數為8,于是我寫了一個串行的程序,一個topic算完之后再算另一個topic。可是我在每個topic中用了GridSearchCV來調參,又要選特征又要調整regressor的參數,導致參數組合一共有1782種。我真是低估了調參的時間,程序跑了一天一夜最后因為忘記import一個庫導致最終的預測精度沒有算出來。后來想到,既然每個topic的預測都是獨立的,那是不是可以并行呢?
python/51236.html">Python中的多線程與多進程
但是聽聞Python的多線程實際上并不能真正利用多核,所以如果使用多線程實際上還是在一個核上做并發處理。不過,如果使用多進程就可以真正利用多核,因為各進程之間是相互獨立的,不共享資源,可以在不同的核上執行不同的進程,達到并行的效果。同時在我的問題中,各topic相互獨立,不涉及進程間的通信,只需最后匯總結果,因此使用多進程是個不錯的選擇。
multiprocessing
一個子進程
multiprocessing模塊提供process類實現新建進程。下述代碼是新建一個子進程。
from multiprocessing import Processdef f(name): print 'hello', nameif __name__ == '__main__': p = Process(target=f, args=('bob',)) # 新建一個子進程p,目標函數是f,args是函數f的參數列表 p.start() # 開始執行進程 p.join() # 等待子進程結束上述代碼中p.join()的意思是等待子進程結束后才執行后續的操作,一般用于進程間通信。例如有一個讀進程pw和一個寫進程pr,在調用pw之前需要先寫pr.join(),表示等待寫進程結束之后才開始執行讀進程。
多個子進程
如果要同時創建多個子進程可以使用multiprocessing.Pool類。該類可以創建一個進程池,然后在多個核上執行這些進程。
import multiprocessingimport timedef func(msg): print multiprocessing.current_process().name + '-' + msgif __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 創建4個進程 for i in xrange(10): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) pool.close() # 關閉進程池,表示不能在往進程池中添加進程 pool.join() # 等待進程池中的所有進程執行完畢,必須在close()之后調用 print "Sub-process(es) done."
輸出結果如下:
Sub-process(es) done.PoolWorker-34-hello 1PoolWorker-33-hello 0PoolWorker-35-hello 2PoolWorker-36-hello 3PoolWorker-34-hello 7PoolWorker-33-hello 4PoolWorker-35-hello 5PoolWorker-36-hello 6PoolWorker-33-hello 8PoolWorker-36-hello 9
上述代碼中的pool.apply_async()是apply()函數的變體,apply_async()是apply()的并行版本,apply()是apply_async()的阻塞版本,使用apply()主進程會被阻塞直到函數執行結束,所以說是阻塞版本。apply()既是Pool的方法,也是Python內置的函數,兩者等價。可以看到輸出結果并不是按照代碼for循環中的順序輸出的。
多個子進程并返回值
apply_async()本身就可以返回被進程調用的函數的返回值。上一個創建多個子進程的代碼中,如果在函數func中返回一個值,那么pool.apply_async(func, (msg, ))的結果就是返回pool中所有進程的值的對象(注意是對象,不是值本身)。
import multiprocessingimport timedef func(msg): return multiprocessing.current_process().name + '-' + msgif __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 創建4個進程 results = [] for i in xrange(10): msg = "hello %d" %(i) results.append(pool.apply_async(func, (msg, ))) pool.close() # 關閉進程池,表示不能再往進程池中添加進程,需要在join之前調用 pool.join() # 等待進程池中的所有進程執行完畢 print ("Sub-process(es) done.") for res in results: print (res.get())上述代碼輸出結果如下:
Sub-process(es) done.PoolWorker-37-hello 0PoolWorker-38-hello 1PoolWorker-39-hello 2PoolWorker-40-hello 3PoolWorker-37-hello 4PoolWorker-38-hello 5PoolWorker-39-hello 6PoolWorker-37-hello 7PoolWorker-40-hello 8PoolWorker-38-hello 9
與之前的輸出不同,這次的輸出是有序的。
如果電腦是八核,建立8個進程,在Ubuntu下輸入top命令再按下大鍵盤的1,可以看到每個CPU的使用率是比較平均的,如下圖:

在system monitor中也可以清楚看到執行多進程前后CPU使用率曲線的差異。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VEVB武林網。
新聞熱點
疑難解答