国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

用Python的線程來解決生產者消費問題的示例

2019-11-25 17:51:42
字體:
來源:轉載
供稿:網友

我們將使用Python線程來解決Python中的生產者―消費者問題。這個問題完全不像他們在學校中說的那么難。

如果你對生產者―消費者問題有了解,看這篇博客會更有意義。

為什么要關心生產者―消費者問題:

  •     可以幫你更好地理解并發和不同概念的并發。
  •     信息隊列中的實現中,一定程度上使用了生產者―消費者問題的概念,而你某些時候必然會用到消息隊列。

當我們在使用線程時,你可以學習以下的線程概念:

  •     Condition:線程中的條件。
  •     wait():在條件實例中可用的wait()。
  •     notify() :在條件實例中可用的notify()。

我假設你已經有這些基本概念:線程、競態條件,以及如何解決靜態條件(例如使用lock)。否則的話,你建議你去看我上一篇文章basics of Threads

引用維基百科:

生產者的工作是產生一塊數據,放到buffer中,如此循環。與此同時,消費者在消耗這些數據(例如從buffer中把它們移除),每次一塊。

這里的關鍵詞是“同時”。所以生產者和消費者是并發運行的,我們需要對生產者和消費者做線程分離。
 

from threading import Thread class ProducerThread(Thread):  def run(self):    pass class ConsumerThread(Thread):  def run(self):    pass

再次引用維基百科:

這個為描述了兩個共享固定大小緩沖隊列的進程,即生產者和消費者。

假設我們有一個全局變量,可以被生產者和消費者線程修改。生產者產生數據并把它加入到隊列。消費者消耗這些數據(例如把它移出)。

queue = []

在剛開始,我們不會設置固定大小的條件,而在實際運行時加入(指下述例子)。

一開始帶bug的程序:

from threading import Thread, Lockimport timeimport random queue = []lock = Lock() class ProducerThread(Thread):  def run(self):    nums = range(5) #Will create the list [0, 1, 2, 3, 4]    global queue    while True:      num = random.choice(nums) #Selects a random number from list [0, 1, 2, 3, 4]      lock.acquire()      queue.append(num)      print "Produced", num      lock.release()      time.sleep(random.random()) class ConsumerThread(Thread):  def run(self):    global queue    while True:      lock.acquire()      if not queue:        print "Nothing in queue, but consumer will try to consume"      num = queue.pop(0)      print "Consumed", num      lock.release()      time.sleep(random.random()) ProducerThread().start()ConsumerThread().start()

運行幾次并留意一下結果。如果程序在IndexError異常后并沒有自動結束,用Ctrl+Z結束運行。

樣例輸出:
 

Produced 3Consumed 3Produced 4Consumed 4Produced 1Consumed 1Nothing in queue, but consumer will try to consumeException in thread Thread-2:Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner  self.run() File "producer_consumer.py", line 31, in run  num = queue.pop(0)IndexError: pop from empty list

解釋:

  •     我們開始了一個生產者線程(下稱生產者)和一個消費者線程(下稱消費者)。
  •     生產者不停地添加(數據)到隊列,而消費者不停地消耗。
  •     由于隊列是一個共享變量,我們把它放到lock程序塊內,以防發生競態條件。
  •     在某一時間點,消費者把所有東西消耗完畢而生產者還在掛起(sleep)。消費者嘗試繼續進行消耗,但此時隊列為空,出現IndexError異常。
  •     在每次運行過程中,在發生IndexError異常之前,你會看到print語句輸出”Nothing in queue, but consumer will try to consume”,這是你出錯的原因。

我們把這個實現作為錯誤行為(wrong behavior)。

什么是正確行為?

當隊列中沒有任何數據的時候,消費者應該停止運行并等待(wait),而不是繼續嘗試進行消耗。而當生產者在隊列中加入數據之后,應該有一個渠道去告訴(notify)消費者。然后消費者可以再次從隊列中進行消耗,而IndexError不再出現。

關于條件

    條件(condition)可以讓一個或多個線程進入wait,直到被其他線程notify。參考:?http://docs.python.org/2/library/threading.html#condition-objects

這就是我們所需要的。我們希望消費者在隊列為空的時候wait,只有在被生產者notify后恢復。生產者只有在往隊列中加入數據后進行notify。因此在生產者notify后,可以確保隊列非空,因此消費者消費時不會出現異常。

  •     condition內含lock。
  •     condition有acquire()和release()方法,用以調用內部的lock的對應方法。

condition的acquire()和release()方法內部調用了lock的acquire()和release()。所以我們可以用condiction實例取代lock實例,但lock的行為不會改變。
生產者和消費者需要使用同一個condition實例, 保證wait和notify正常工作。

重寫消費者代碼:
 

from threading import Condition condition = Condition() class ConsumerThread(Thread):  def run(self):    global queue    while True:      condition.acquire()      if not queue:        print "Nothing in queue, consumer is waiting"        condition.wait()        print "Producer added something to queue and notified the consumer"      num = queue.pop(0)      print "Consumed", num      condition.release()      time.sleep(random.random())

重寫生產者代碼:
 

class ProducerThread(Thread):  def run(self):    nums = range(5)    global queue    while True:      condition.acquire()      num = random.choice(nums)      queue.append(num)      print "Produced", num      condition.notify()      condition.release()      time.sleep(random.random())

樣例輸出:
 

Produced 3Consumed 3Produced 1Consumed 1Produced 4Consumed 4Produced 3Consumed 3Nothing in queue, consumer is waitingProduced 2Producer added something to queue and notified the consumerConsumed 2Nothing in queue, consumer is waitingProduced 2Producer added something to queue and notified the consumerConsumed 2Nothing in queue, consumer is waitingProduced 3Producer added something to queue and notified the consumerConsumed 3Produced 4Consumed 4Produced 1Consumed 1

解釋:

  •     對于消費者,在消費前檢查隊列是否為空。
  •     如果為空,調用condition實例的wait()方法。
  •     消費者進入wait(),同時釋放所持有的lock。
  •     除非被notify,否則它不會運行。
  •     生產者可以acquire這個lock,因為它已經被消費者release。
  •     當調用了condition的notify()方法后,消費者被喚醒,但喚醒不意味著它可以開始運行。
  •     notify()并不釋放lock,調用notify()后,lock依然被生產者所持有。
  •     生產者通過condition.release()顯式釋放lock。
  •     消費者再次開始運行,現在它可以得到隊列中的數據而不會出現IndexError異常。

為隊列增加大小限制

生產者不能向一個滿隊列繼續加入數據。

它可以用以下方式來實現:

  •     在加入數據前,生產者檢查隊列是否為滿。
  •     如果不為滿,生產者可以繼續正常流程。
  •     如果為滿,生產者必須等待,調用condition實例的wait()。
  •     消費者可以運行。消費者消耗隊列,并產生一個空余位置。
  •     然后消費者notify生產者。
  •     當消費者釋放lock,消費者可以acquire這個lock然后往隊列中加入數據。

最終程序如下:

from threading import Thread, Conditionimport timeimport random queue = []MAX_NUM = 10condition = Condition() class ProducerThread(Thread):  def run(self):    nums = range(5)    global queue    while True:      condition.acquire()      if len(queue) == MAX_NUM:        print "Queue full, producer is waiting"        condition.wait()        print "Space in queue, Consumer notified the producer"      num = random.choice(nums)      queue.append(num)      print "Produced", num      condition.notify()      condition.release()      time.sleep(random.random()) class ConsumerThread(Thread):  def run(self):    global queue    while True:      condition.acquire()      if not queue:        print "Nothing in queue, consumer is waiting"        condition.wait()        print "Producer added something to queue and notified the consumer"      num = queue.pop(0)      print "Consumed", num      condition.notify()      condition.release()      time.sleep(random.random()) ProducerThread().start()ConsumerThread().start()

樣例輸出:
 

Produced 0Consumed 0Produced 0Produced 4Consumed 0Consumed 4Nothing in queue, consumer is waitingProduced 4Producer added something to queue and notified the consumerConsumed 4Produced 3Produced 2Consumed 3

更新:
很多網友建議我在lock和condition下使用Queue來代替使用list。我同意這種做法,但我的目的是展示Condition,wait()和notify()如何工作,所以使用了list。

以下用Queue來更新一下代碼。

Queue封裝了Condition的行為,如wait(),notify(),acquire()。

現在不失為一個好機會讀一下Queue的文檔(http://docs.python.org/2/library/queue.html)。

更新程序:

from threading import Threadimport timeimport randomfrom Queue import Queue queue = Queue(10) class ProducerThread(Thread):  def run(self):    nums = range(5)    global queue    while True:      num = random.choice(nums)      queue.put(num)      print "Produced", num      time.sleep(random.random()) class ConsumerThread(Thread):  def run(self):    global queue    while True:      num = queue.get()      queue.task_done()      print "Consumed", num      time.sleep(random.random()) ProducerThread().start()ConsumerThread().start()

解釋:

  •     在原來使用list的位置,改為使用Queue實例(下稱隊列)。
  •     這個隊列有一個condition,它有自己的lock。如果你使用Queue,你不需要為condition和lock而煩惱。
  •     生產者調用隊列的put方法來插入數據。
  •     put()在插入數據前有一個獲取lock的邏輯。
  •     同時,put()也會檢查隊列是否已滿。如果已滿,它會在內部調用wait(),生產者開始等待。
  •     消費者使用get方法。
  •     get()從隊列中移出數據前會獲取lock。
  •     get()會檢查隊列是否為空,如果為空,消費者進入等待狀態。
  •     get()和put()都有適當的notify()。現在就去看Queue的源碼吧。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 镇平县| 镇江市| 资溪县| 丹江口市| 泉州市| 隆回县| 南丹县| 陕西省| 东方市| 荆门市| 茌平县| 军事| 长宁县| 九江县| 饶河县| 天柱县| 宜阳县| 于田县| 新郑市| 惠来县| 陵川县| 隆回县| 个旧市| 阜康市| 深水埗区| 佳木斯市| 英吉沙县| 香港| 友谊县| 弋阳县| 湖南省| 邹平县| 武强县| 安西县| 汝南县| 德清县| 兰州市| 镇赉县| 焦作市| 蕉岭县| 敦煌市|