国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 編程 > Python > 正文

Python實(shí)現(xiàn)簡(jiǎn)單多線程任務(wù)隊(duì)列

2019-11-25 16:55:10
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

最近我在用梯度下降算法繪制神經(jīng)網(wǎng)絡(luò)的數(shù)據(jù)時(shí),遇到了一些算法性能的問(wèn)題。梯度下降算法的代碼如下(偽代碼):

def gradient_descent():  # the gradient descent code  plotly.write(X, Y)

一般來(lái)說(shuō),當(dāng)網(wǎng)絡(luò)請(qǐng)求 plot.ly 繪圖時(shí)會(huì)阻塞等待返回,于是也會(huì)影響到其他的梯度下降函數(shù)的執(zhí)行速度。

一種解決辦法是每調(diào)用一次 plotly.write 函數(shù)就開啟一個(gè)新的線程,但是這種方法感覺不是很好。 我不想用一個(gè)像 cerely(一種分布式任務(wù)隊(duì)列)一樣大而全的任務(wù)隊(duì)列框架,因?yàn)榭蚣軐?duì)于我的這點(diǎn)需求來(lái)說(shuō)太重了,并且我的繪圖也并不需要 redis 來(lái)持久化數(shù)據(jù)。

那用什么辦法解決呢?我在 python 中寫了一個(gè)很小的任務(wù)隊(duì)列,它可以在一個(gè)單獨(dú)的線程中調(diào)用 plotly.write函數(shù)。下面是程序代碼。

from threading import Threadimport Queue import timeclass TaskQueue(Queue.Queue):

首先我們繼承 Queue.Queue 類。從 Queue.Queue 類可以繼承 get 和 put 方法,以及隊(duì)列的行為。

def __init__(self, num_workers=1):  Queue.Queue.__init__(self)  self.num_workers = num_workers  self.start_workers()

初始化的時(shí)候,我們可以不用考慮工作線程的數(shù)量。

def add_task(self, task, *args, **kwargs):  args = args or ()  kwargs = kwargs or {}  self.put((task, args, kwargs))

我們把 task, args, kwargs 以元組的形式存儲(chǔ)在隊(duì)列中。*args 可以傳遞數(shù)量不等的參數(shù),**kwargs 可以傳遞命名參數(shù)。

def start_workers(self):  for i in range(self.num_workers):    t = Thread(target=self.worker)    t.daemon = True    t.start()

我們?yōu)槊總€(gè) worker 創(chuàng)建一個(gè)線程,然后在后臺(tái)刪除。

下面是 worker 函數(shù)的代碼:

def worker(self):  while True:    tupl = self.get()    item, args, kwargs = self.get()    item(*args, **kwargs)     self.task_done()

worker 函數(shù)獲取隊(duì)列頂端的任務(wù),并根據(jù)輸入?yún)?shù)運(yùn)行,除此之外,沒有其他的功能。下面是隊(duì)列的代碼:

我們可以通過(guò)下面的代碼測(cè)試:

def blokkah(*args, **kwargs):  time.sleep(5)  print “Blokkah mofo!”q = TaskQueue(num_workers=5)for item in range(1):  q.add_task(blokkah)q.join() # wait for all the tasks to finish.print “All done!”

Blokkah 是我們要做的任務(wù)名稱。隊(duì)列已經(jīng)緩存在內(nèi)存中,并且沒有執(zhí)行很多任務(wù)。下面的步驟是把主隊(duì)列當(dāng)做單獨(dú)的進(jìn)程來(lái)運(yùn)行,這樣主程序退出以及執(zhí)行數(shù)據(jù)庫(kù)持久化時(shí),隊(duì)列任務(wù)不會(huì)停止運(yùn)行。但是這個(gè)例子很好地展示了如何從一個(gè)很簡(jiǎn)單的小任務(wù)寫成像工作隊(duì)列這樣復(fù)雜的程序。

def gradient_descent():  # the gradient descent code  queue.add_task(plotly.write, x=X, y=Y)

修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感興趣的話,可以參考下面的代碼。

from threading import Threadimport Queueimport timeclass TaskQueue(Queue.Queue):def __init__(self, num_workers=1):Queue.Queue.__init__(self)self.num_workers = num_workersself.start_workers()def add_task(self, task, *args, **kwargs):args = args or ()kwargs = kwargs or {}self.put((task, args, kwargs))def start_workers(self):for i in range(self.num_workers):t = Thread(target=self.worker)t.daemon = Truet.start()def worker(self):while True:tupl = self.get()item, args, kwargs = self.get()item(*args, **kwargs)self.task_done()def tests():def blokkah(*args, **kwargs):time.sleep(5)print "Blokkah mofo!"q = TaskQueue(num_workers=5)for item in range(10):q.add_task(blokkah)q.join() # block until all tasks are doneprint "All done!"if __name__ == "__main__":tests()

發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 浮梁县| 兴安县| 娄烦县| 萝北县| 旅游| 鹰潭市| 穆棱市| 和政县| 玉林市| 屏边| 京山县| 文成县| 丹东市| 建平县| 确山县| 卢龙县| 乌兰察布市| 康定县| 阿瓦提县| 阳朔县| 北海市| 缙云县| 太和县| 钟山县| 大埔县| 聊城市| 古田县| 怀来县| 合作市| 民乐县| 马山县| 临汾市| 桃源县| 大英县| 玛纳斯县| 宜君县| 遵义市| 喀喇沁旗| 邻水| 大足县| 建德市|