国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

Python實現簡單多線程任務隊列

2020-01-04 17:42:16
字體:
來源:轉載
供稿:網友
本文給大家介紹的是使用很簡單的代碼實現的多線程任務隊列,給大家一個思路,希望對大家學習python能夠有所幫助
 

最近我在用梯度下降算法繪制神經網絡的數據時,遇到了一些算法性能的問題。梯度下降算法的代碼如下(偽代碼):

def gradient_descent():  # the gradient descent code  plotly.write(X, Y)

一般來說,當網絡請求 plot.ly 繪圖時會阻塞等待返回,于是也會影響到其他的梯度下降函數的執行速度。

一種解決辦法是每調用一次 plotly.write 函數就開啟一個新的線程,但是這種方法感覺不是很好。 我不想用一個像 cerely(一種分布式任務隊列)一樣大而全的任務隊列框架,因為框架對于我的這點需求來說太重了,并且我的繪圖也并不需要 redis 來持久化數據。

那用什么辦法解決呢?我在 python 中寫了一個很小的任務隊列,它可以在一個單獨的線程中調用 plotly.write函數。下面是程序代碼。

from threading import Threadimport Queue import timeclass TaskQueue(Queue.Queue):

首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊列的行為。

def __init__(self, num_workers=1):  Queue.Queue.__init__(self)  self.num_workers = num_workers  self.start_workers()

初始化的時候,我們可以不用考慮工作線程的數量。

def add_task(self, task, *args, **kwargs):  args = args or ()  kwargs = kwargs or {}  self.put((task, args, kwargs))

我們把 task, args, kwargs 以元組的形式存儲在隊列中。*args 可以傳遞數量不等的參數,**kwargs 可以傳遞命名參數。

def start_workers(self):  for i in range(self.num_workers):    t = Thread(target=self.worker)    t.daemon = True    t.start()

我們為每個 worker 創建一個線程,然后在后臺刪除。

下面是 worker 函數的代碼:

def worker(self):  while True:    tupl = self.get()    item, args, kwargs = self.get()    item(*args, **kwargs)     self.task_done()

worker 函數獲取隊列頂端的任務,并根據輸入參數運行,除此之外,沒有其他的功能。下面是隊列的代碼:

我們可以通過下面的代碼測試:

def blokkah(*args, **kwargs):  time.sleep(5)  print “Blokkah mofo!”q = TaskQueue(num_workers=5)for item in range(1):  q.add_task(blokkah)q.join() # wait for all the tasks to finish.print “All done!”

Blokkah 是我們要做的任務名稱。隊列已經緩存在內存中,并且沒有執行很多任務。下面的步驟是把主隊列當做單獨的進程來運行,這樣主程序退出以及執行數據庫持久化時,隊列任務不會停止運行。但是這個例子很好地展示了如何從一個很簡單的小任務寫成像工作隊列這樣復雜的程序。

def gradient_descent():  # the gradient descent code  queue.add_task(plotly.write, x=X, y=Y)

修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。

from threading import Threadimport Queueimport timeclass TaskQueue(Queue.Queue):def __init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers = num_workersself.start_workers()def add_task(self, task, *args, **kwargs):args = args or ()kwargs = kwargs or {}self.put((task, args, kwargs))def start_workers(self):for i in range(self.num_workers):t = Thread(target=self.worker)t.daemon = Truet.start()def worker(self):while True:tupl = self.get()item, args, kwargs = self.get()item(*args, **kwargs)self.task_done()def tests():def blokkah(*args, **kwargs):time.sleep(5)print "Blokkah mofo!"q = TaskQueue(num_workers=5)for item in range(10):q.add_task(blokkah)q.join() # block until all tasks are doneprint "All done!"if __name__ == "__main__":tests()

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 平陆县| 枝江市| 错那县| 毕节市| 靖边县| 兰溪市| 张北县| 阳新县| 集安市| 广宁县| 麦盖提县| 绵阳市| 三河市| 沁源县| 临西县| 桓台县| 砀山县| 玉环县| 遂川县| 建平县| 个旧市| 马公市| 石城县| 兰州市| 酉阳| 当涂县| 深水埗区| 泰和县| 陆丰市| 河津市| 新民市| 温州市| 嘉鱼县| 张家界市| 吉木乃县| 井研县| 台东县| 舞钢市| 郓城县| 宁晋县| 古田县|