為什么需要線程池呢?
設想一下,如果我們使用有任務就開啟一個子線程處理,處理完成后,銷毀子線程或等得子線程自然死亡,那么如果我們的任務所需時間比較短,但是任務數量比較多,那么更多的時間是花在線程的創建和結束上面,效率肯定就低了。
線程池的原理:
既然是線程池(Thread pool),其實名字很形象,就是把指定數量的可用子線程放進一個"池里",有任務時取出一個線程執行,任務執行完后,并不立即銷毀線程,而是放進線程池中,等待接收下一個任務。這樣內存和cpu的開銷也比較小,并且我們可以控制線程的數量。
線程池的實現:
線程池有很多種實現方式,在python/278454.html">python中,已經給我們提供了一個很好的實現方式:Queue-隊列。因為python中Queue本身就是同步的,所以也就是線程安全的,所以我們可以放心的讓多個線程共享一個Queue。
那么說到線程池,那么理應也得有一個任務池,任務池中存放著待執行的任務,各個線程到任務池中取任務執行,那么用Queue來實現任務池是最好不過的。
1.low版線程池
設計思路:運用隊列queue
將線程類名放入隊列中,執行一個就拿一個出來
import queueimport threadingclass ThreadPool(object): def __init__(self, max_num=20): self.queue = queue.Queue(max_num) #創建隊列,最大數為20 for i in range(max_num): self.queue.put(threading.Thread) #將類名放入隊列中 def get_thread(self): return self.queue.get() #從隊列中取出類名 def add_thread(self): self.queue.put(threading.Thread) #進類名放入隊列中def func(arg, p): #定義一個函數 print(arg) import time time.sleep(2) p.add_thread()pool = ThreadPool(10) #創建對象,并執行該類的構造方法,即將線程的類名放入隊列中for i in range(30): thread = pool.get_thread() #調用該對象的get_thread方法,取出類名 t = thread(target=func, args=(i, pool)) #創建對象,執行func,參數在args中 t.start()
由于此方法要求使用者修改原函數,并在原函數里傳參數,且調用方法也發生了改變,并且有空閑線程浪費資源,實際操作中并不方便,故設計了下一版線程池。
2.絕版線程池
設計思路:運用隊列queue
a.隊列里面放任務
b.線程一次次去取任務,線程一空閑就去取任務
import queueimport threadingimport contextlibimport timeStopEvent = object()class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數) :return: 如果線程池已經終止,則返回True否則None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 創建一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數并執行任務函數 """ current_thread = threading.currentThread() self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, args, callback = event try: result = func(*args) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 執行完所有的任務后,所有線程停止 """ self.cancel = True count = len(self.generate_list) while count: self.q.put(StopEvent) count -= 1 def terminate(self): """ 無論是否還有任務,終止線程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.queue.clear() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于記錄線程中正在等待的線程數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread)# How to usepool = ThreadPool(5)def callback(status, result): # status, execute action status # result, execute action return value passdef action(i): print(i)for i in range(30): ret = pool.run(action, (i,), callback)time.sleep(3)print(len(pool.generate_list), len(pool.free_list))print(len(pool.generate_list), len(pool.free_list))pool.close()# pool.terminate()
總結
以上就是本文關于Python探索之自定義實現線程池的全部內容,希望對大家有所幫助。歡迎留言指出。感謝朋友們對本站的支持!
新聞熱點
疑難解答