我們將使用Python線程來解決Python中的生產者―消費者問題。這個問題完全不像他們在學校中說的那么難。
如果你對生產者―消費者問題有了解,看這篇博客會更有意義。
為什么要關心生產者―消費者問題:
當我們在使用線程時,你可以學習以下的線程概念:
我假設你已經有這些基本概念:線程、競態條件,以及如何解決靜態條件(例如使用lock)。否則的話,你建議你去看我上一篇文章basics of Threads。
引用維基百科:
生產者的工作是產生一塊數據,放到buffer中,如此循環。與此同時,消費者在消耗這些數據(例如從buffer中把它們移除),每次一塊。
這里的關鍵詞是“同時”。所以生產者和消費者是并發運行的,我們需要對生產者和消費者做線程分離。
from threading import Thread class ProducerThread(Thread): def run(self): pass class ConsumerThread(Thread): def run(self): pass
再次引用維基百科:
這個為描述了兩個共享固定大小緩沖隊列的進程,即生產者和消費者。
假設我們有一個全局變量,可以被生產者和消費者線程修改。生產者產生數據并把它加入到隊列。消費者消耗這些數據(例如把它移出)。
queue = []
在剛開始,我們不會設置固定大小的條件,而在實際運行時加入(指下述例子)。
一開始帶bug的程序:
from threading import Thread, Lockimport timeimport random queue = []lock = Lock() class ProducerThread(Thread): def run(self): nums = range(5) #Will create the list [0, 1, 2, 3, 4] global queue while True: num = random.choice(nums) #Selects a random number from list [0, 1, 2, 3, 4] lock.acquire() queue.append(num) print "Produced", num lock.release() time.sleep(random.random()) class ConsumerThread(Thread): def run(self): global queue while True: lock.acquire() if not queue: print "Nothing in queue, but consumer will try to consume" num = queue.pop(0) print "Consumed", num lock.release() time.sleep(random.random()) ProducerThread().start()ConsumerThread().start()
運行幾次并留意一下結果。如果程序在IndexError異常后并沒有自動結束,用Ctrl+Z結束運行。
樣例輸出:
Produced 3Consumed 3Produced 4Consumed 4Produced 1Consumed 1Nothing in queue, but consumer will try to consumeException in thread Thread-2:Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner self.run() File "producer_consumer.py", line 31, in run num = queue.pop(0)IndexError: pop from empty list
解釋:
我們把這個實現作為錯誤行為(wrong behavior)。
什么是正確行為?
當隊列中沒有任何數據的時候,消費者應該停止運行并等待(wait),而不是繼續嘗試進行消耗。而當生產者在隊列中加入數據之后,應該有一個渠道去告訴(notify)消費者。然后消費者可以再次從隊列中進行消耗,而IndexError不再出現。
關于條件
條件(condition)可以讓一個或多個線程進入wait,直到被其他線程notify。參考:?http://docs.python.org/2/library/threading.html#condition-objects
這就是我們所需要的。我們希望消費者在隊列為空的時候wait,只有在被生產者notify后恢復。生產者只有在往隊列中加入數據后進行notify。因此在生產者notify后,可以確保隊列非空,因此消費者消費時不會出現異常。
condition的acquire()和release()方法內部調用了lock的acquire()和release()。所以我們可以用condiction實例取代lock實例,但lock的行為不會改變。
生產者和消費者需要使用同一個condition實例, 保證wait和notify正常工作。
重寫消費者代碼:
from threading import Condition condition = Condition() class ConsumerThread(Thread): def run(self): global queue while True: condition.acquire() if not queue: print "Nothing in queue, consumer is waiting" condition.wait() print "Producer added something to queue and notified the consumer" num = queue.pop(0) print "Consumed", num condition.release() time.sleep(random.random())
重寫生產者代碼:
class ProducerThread(Thread): def run(self): nums = range(5) global queue while True: condition.acquire() num = random.choice(nums) queue.append(num) print "Produced", num condition.notify() condition.release() time.sleep(random.random())
樣例輸出:
Produced 3Consumed 3Produced 1Consumed 1Produced 4Consumed 4Produced 3Consumed 3Nothing in queue, consumer is waitingProduced 2Producer added something to queue and notified the consumerConsumed 2Nothing in queue, consumer is waitingProduced 2Producer added something to queue and notified the consumerConsumed 2Nothing in queue, consumer is waitingProduced 3Producer added something to queue and notified the consumerConsumed 3Produced 4Consumed 4Produced 1Consumed 1
解釋:
為隊列增加大小限制
生產者不能向一個滿隊列繼續加入數據。
它可以用以下方式來實現:
最終程序如下:
from threading import Thread, Conditionimport timeimport random queue = []MAX_NUM = 10condition = Condition() class ProducerThread(Thread): def run(self): nums = range(5) global queue while True: condition.acquire() if len(queue) == MAX_NUM: print "Queue full, producer is waiting" condition.wait() print "Space in queue, Consumer notified the producer" num = random.choice(nums) queue.append(num) print "Produced", num condition.notify() condition.release() time.sleep(random.random()) class ConsumerThread(Thread): def run(self): global queue while True: condition.acquire() if not queue: print "Nothing in queue, consumer is waiting" condition.wait() print "Producer added something to queue and notified the consumer" num = queue.pop(0) print "Consumed", num condition.notify() condition.release() time.sleep(random.random()) ProducerThread().start()ConsumerThread().start()
樣例輸出:
Produced 0Consumed 0Produced 0Produced 4Consumed 0Consumed 4Nothing in queue, consumer is waitingProduced 4Producer added something to queue and notified the consumerConsumed 4Produced 3Produced 2Consumed 3
更新:
很多網友建議我在lock和condition下使用Queue來代替使用list。我同意這種做法,但我的目的是展示Condition,wait()和notify()如何工作,所以使用了list。
以下用Queue來更新一下代碼。
Queue封裝了Condition的行為,如wait(),notify(),acquire()。
現在不失為一個好機會讀一下Queue的文檔(http://docs.python.org/2/library/queue.html)。
更新程序:
from threading import Threadimport timeimport randomfrom Queue import Queue queue = Queue(10) class ProducerThread(Thread): def run(self): nums = range(5) global queue while True: num = random.choice(nums) queue.put(num) print "Produced", num time.sleep(random.random()) class ConsumerThread(Thread): def run(self): global queue while True: num = queue.get() queue.task_done() print "Consumed", num time.sleep(random.random()) ProducerThread().start()ConsumerThread().start()
解釋:
新聞熱點
疑難解答
圖片精選