先來看一段代碼:
# ~*~ Twisted - A Python tale ~*~from time import sleep# Hello, I'm a developer and I mainly setup Wordpress.def install_wordpress(customer): # Our hosting company Threads Ltd. is bad. I start installation and... print "Start installation for", customer # ...then wait till the installation finishes successfully. It is # boring and I'm spending most of my time waiting while consuming # resources (memory and some CPU cycles). It's because the process # is *blocking*. sleep(3) print "All done for", customer# I do this all day long for our customersdef developer_day(customers): for customer in customers: install_wordpress(customer)developer_day(["Bill", "Elon", "Steve", "Mark"])
運行一下,結果如下所示:
$ ./deferreds.py 1
------ Running example 1 ------Start installation for BillAll done for BillStart installation...* Elapsed time: 12.03 seconds
這是一段順序執行的代碼。四個消費者,為一個人安裝需要3秒的時間,那么四個人就是12秒。這樣處理不是很令人滿意,所以看一下第二個使用了線程的例子:
import threading# The company grew. We now have many customers and I can't handle the# workload. We are now 5 developers doing exactly the same thing.def developers_day(customers): # But we now have to synchronize... a.k.a. bureaucracy lock = threading.Lock() # def dev_day(id): print "Goodmorning from developer", id # Yuck - I hate locks... lock.acquire() while customers: customer = customers.pop(0) lock.release() # My Python is less readable install_wordpress(customer) lock.acquire() lock.release() print "Bye from developer", id # We go to work in the morning devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)] [dev.start() for dev in devs] # We leave for the evening [dev.join() for dev in devs]# We now get more done in the same time but our dev process got more# complex. As we grew we spend more time managing queues than doing dev# work. We even had occasional deadlocks when processes got extremely# complex. The fact is that we are still mostly pressing buttons and# waiting but now we also spend some time in meetings.developers_day(["Customer %d" % i for i in xrange(15)])
運行一下:
$ ./deferreds.py 2
------ Running example 2 ------Goodmorning from developer 0Goodmorning from developer1Start installation forGoodmorning from developer 2Goodmorning from developer 3Customer 0...from developerCustomer 13 3Bye from developer 2* Elapsed time: 9.02 seconds
這次是一段并行執行的代碼,使用了5個工作線程。15個消費者每個花費3s意味著總共45s的時間,不過用了5個線程并行執行總共只花費了9s的時間。這段代碼有點復雜,很大一部分代碼是用于管理并發,而不是專注于算法或者業務邏輯。另外,程序的輸出結果看起來也很混雜,可讀性也天津市。即使是簡單的多線程的代碼同樣也難以寫得很好,所以我們轉為使用Twisted:
# For years we thought this was all there was... We kept hiring more# developers, more managers and buying servers. We were trying harder# optimising processes and fire-fighting while getting mediocre# performance in return. Till luckily one day our hosting# company decided to increase their fees and we decided to# switch to Twisted Ltd.!from twisted.internet import reactorfrom twisted.internet import deferfrom twisted.internet import task# Twisted has a slightly different approachdef schedule_install(customer): # They are calling us back when a Wordpress installation completes. # They connected the caller recognition system with our CRM and # we know exactly what a call is about and what has to be done next. # # We now design processes of what has to happen on certain events. def schedule_install_wordpress(): def on_done(): print "Callback: Finished installation for", customer print "Scheduling: Installation for", customer return task.deferLater(reactor, 3, on_done) # def all_done(_): print "All done for", customer # # For each customer, we schedule these processes on the CRM # and that # is all our chief-Twisted developer has to do d = schedule_install_wordpress() d.addCallback(all_done) # return d# Yes, we don't need many developers anymore or any synchronization.# ~~ Super-powered Twisted developer ~~def twisted_developer_day(customers): print "Goodmorning from Twisted developer" # # Here's what has to be done today work = [schedule_install(customer) for customer in customers] # Turn off the lights when done join = defer.DeferredList(work) join.addCallback(lambda _: reactor.stop()) # print "Bye from Twisted developer!"# Even his day is particularly short!twisted_developer_day(["Customer %d" % i for i in xrange(15)])# Reactor, our secretary uses the CRM and follows-up on events!reactor.run()
運行結果:
------ Running example 3 ------Goodmorning from Twisted developerScheduling: Installation for Customer 0....Scheduling: Installation for Customer 14Bye from Twisted developer!Callback: Finished installation for Customer 0All done for Customer 0Callback: Finished installation for Customer 1All done for Customer 1...All done for Customer 14* Elapsed time: 3.18 seconds
這次我們得到了完美的執行代碼和可讀性強的輸出結果,并且沒有使用線程。我們并行地處理了15個消費者,也就是說,本來需要45s的執行時間在3s之內就已經完成。這個竅門就是我們把所有的阻塞的對sleep()的調用都換成了Twisted中對等的task.deferLater()和回調函數。由于現在處理的操作在其他地方進行,我們就可以毫不費力地同時服務于15個消費者。
前面提到處理的操作發生在其他的某個地方。現在來解釋一下,算術運算仍然發生在CPU內,但是現在的CPU處理速度相比磁盤和網絡操作來說非常快。所以給CPU提供數據或者從CPU向內存或另一個CPU發送數據花費了大多數時間。我們使用了非阻塞的操作節省了這方面的時間,例如,task.deferLater()使用了回調函數,當數據已經傳輸完成的時候會被激活。
另一個很重要的一點是輸出中的Goodmorning from Twisted developer和Bye from Twisted developer!信息。在代碼開始執行時就已經打印出了這兩條信息。如果代碼如此早地執行到了這個地方,那么我們的應用真正開始運行是在什么時候呢?答案是,對于一個Twisted應用(包括Scrapy)來說是在reactor.run()里運行的。在調用這個方法之前,必須把應用中可能用到的每個Deferred鏈準備就緒,然后reactor.run()方法會監視并激活回調函數。
注意,reactor的主要一條規則就是,你可以執行任何操作,只要它足夠快并且是非阻塞的。
現在好了,代碼中沒有那么用于管理多線程的部分了,不過這些回調函數看起來還是有些雜亂。可以修改成這樣:
# Twisted gave us utilities that make our code way more readable!@defer.inlineCallbacksdef inline_install(customer): print "Scheduling: Installation for", customer yield task.deferLater(reactor, 3, lambda: None) print "Callback: Finished installation for", customer print "All done for", customerdef twisted_developer_day(customers): ... same as previously but using inline_install() instead of schedule_install()twisted_developer_day(["Customer %d" % i for i in xrange(15)])reactor.run()
運行的結果和前一個例子相同。這段代碼的作用和上一個例子是一樣的,但是看起來更加簡潔明了。inlineCallbacks生成器可以使用一些一些Python的機制來使得inline_install()函數暫停或者恢復執行。inline_install()函數變成了一個Deferred對象并且并行地為每個消費者運行。每次yield的時候,運行就會中止在當前的inline_install()實例上,直到yield的Deferred對象完成后再恢復運行。
現在唯一的問題是,如果我們不止有15個消費者,而是有,比如10000個消費者時又該怎樣?這段代碼會同時開始10000個同時執行的序列(比如HTTP請求、數據庫的寫操作等等)。這樣做可能沒什么問題,但也可能會產生各種失敗。在有巨大并發請求的應用中,例如Scrapy,我們經常需要把并發的數量限制到一個可以接受的程度上。在下面的一個例子中,我們使用task.Cooperator()來完成這樣的功能。Scrapy在它的Item Pipeline中也使用了相同的機制來限制并發的數目(即CONCURRENT_ITEMS設置):
@defer.inlineCallbacksdef inline_install(customer): ... same as above# The new "problem" is that we have to manage all this concurrency to# avoid causing problems to others, but this is a nice problem to have.def twisted_developer_day(customers): print "Goodmorning from Twisted developer" work = (inline_install(customer) for customer in customers) # # We use the Cooperator mechanism to make the secretary not # service more than 5 customers simultaneously. coop = task.Cooperator() join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)]) # join.addCallback(lambda _: reactor.stop()) print "Bye from Twisted developer!"twisted_developer_day(["Customer %d" % i for i in xrange(15)])reactor.run()# We are now more lean than ever, our customers happy, our hosting# bills ridiculously low and our performance stellar.# ~*~ THE END ~*~
運行結果:
$ ./deferreds.py 5------ Running example 5 ------Goodmorning from Twisted developerBye from Twisted developer!Scheduling: Installation for Customer 0...Callback: Finished installation for Customer 4All done for Customer 4Scheduling: Installation for Customer 5...Callback: Finished installation for Customer 14All done for Customer 14* Elapsed time: 9.19 seconds
從上面的輸出中可以看到,程序運行時好像有5個處理消費者的槽。除非一個槽空出來,否則不會開始處理下一個消費者的請求。在本例中,處理時間都是3秒,所以看起來像是5個一批次地處理一樣。最后得到的性能跟使用線程是一樣的,但是這次只有一個線程,代碼也更加簡潔更容易寫出正確的代碼。
PS:deferToThread使同步函數實現非阻塞
wisted的defer.Deferred (from twisted.internet import defer)可以返回一個deferred對象.
注:deferToThread使用線程實現的,不推薦過多使用
***把同步函數變為異步(返回一個Deferred)***
twisted的deferToThread(from twisted.internet.threads import deferToThread)也返回一個deferred對象,不過回調函數在另一個線程處理,主要用于數據庫/文件讀取操作
..# 代碼片段 def dataReceived(self, data): now = int(time.time()) for ftype, data in self.fpcodec.feed(data): if ftype == 'oob': self.msg('OOB:', repr(data)) elif ftype == 0x81: # 對服務器請求的心跳應答(這個是解析 防疲勞駕駛儀,發給gps上位機的,然后上位機發給服務器的) self.msg('FP.PONG:', repr(data)) else: self.msg('TODO:', (ftype, data)) d = deferToThread(self.redis.zadd, "beier:fpstat:fps", now, self.devid) d.addCallback(self._doResult, extra)下面這兒完整的例子可以給大家參考一下
# -*- coding: utf-8 -*-from twisted.internet import defer, reactorfrom twisted.internet.threads import deferToThreadimport functoolsimport time# 耗時操作 這是一個同步阻塞函數def mySleep(timeout): time.sleep(timeout) # 返回值相當于加進了callback里 return 3 def say(result): print "耗時操作結束了, 并把它返回的結果給我了", result# 用functools.partial包裝一下, 傳遞參數進去cb = functools.partial(mySleep, 3)d = deferToThread(cb) d.addCallback(say)print "你還沒有結束我就執行了, 哈哈"reactor.run()
新聞熱點
疑難解答
圖片精選