国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

使用Python的Twisted框架編寫非阻塞程序的代碼示例

2019-11-25 16:46:49
字體:
來源:轉載
供稿:網友

先來看一段代碼:

# ~*~ Twisted - A Python tale ~*~from time import sleep# Hello, I'm a developer and I mainly setup Wordpress.def install_wordpress(customer):  # Our hosting company Threads Ltd. is bad. I start installation and...  print "Start installation for", customer  # ...then wait till the installation finishes successfully. It is  # boring and I'm spending most of my time waiting while consuming  # resources (memory and some CPU cycles). It's because the process  # is *blocking*.  sleep(3)  print "All done for", customer# I do this all day long for our customersdef developer_day(customers):  for customer in customers:    install_wordpress(customer)developer_day(["Bill", "Elon", "Steve", "Mark"])

運行一下,結果如下所示:

$ ./deferreds.py 1
------ Running example 1 ------Start installation for BillAll done for BillStart installation...* Elapsed time: 12.03 seconds

這是一段順序執行的代碼。四個消費者,為一個人安裝需要3秒的時間,那么四個人就是12秒。這樣處理不是很令人滿意,所以看一下第二個使用了線程的例子:

import threading# The company grew. We now have many customers and I can't handle the# workload. We are now 5 developers doing exactly the same thing.def developers_day(customers):  # But we now have to synchronize... a.k.a. bureaucracy  lock = threading.Lock()  #  def dev_day(id):    print "Goodmorning from developer", id    # Yuck - I hate locks...    lock.acquire()    while customers:      customer = customers.pop(0)      lock.release()      # My Python is less readable      install_wordpress(customer)      lock.acquire()    lock.release()    print "Bye from developer", id  # We go to work in the morning  devs = [threading.Thread(target=dev_day, args=(i,)) for i in range(5)]  [dev.start() for dev in devs]  # We leave for the evening  [dev.join() for dev in devs]# We now get more done in the same time but our dev process got more# complex. As we grew we spend more time managing queues than doing dev# work. We even had occasional deadlocks when processes got extremely# complex. The fact is that we are still mostly pressing buttons and# waiting but now we also spend some time in meetings.developers_day(["Customer %d" % i for i in xrange(15)])

運行一下:

$ ./deferreds.py 2
------ Running example 2 ------Goodmorning from developer 0Goodmorning from developer1Start installation forGoodmorning from developer 2Goodmorning from developer 3Customer 0...from developerCustomer 13 3Bye from developer 2* Elapsed time: 9.02 seconds

這次是一段并行執行的代碼,使用了5個工作線程。15個消費者每個花費3s意味著總共45s的時間,不過用了5個線程并行執行總共只花費了9s的時間。這段代碼有點復雜,很大一部分代碼是用于管理并發,而不是專注于算法或者業務邏輯。另外,程序的輸出結果看起來也很混雜,可讀性也天津市。即使是簡單的多線程的代碼同樣也難以寫得很好,所以我們轉為使用Twisted:

# For years we thought this was all there was... We kept hiring more# developers, more managers and buying servers. We were trying harder# optimising processes and fire-fighting while getting mediocre# performance in return. Till luckily one day our hosting# company decided to increase their fees and we decided to# switch to Twisted Ltd.!from twisted.internet import reactorfrom twisted.internet import deferfrom twisted.internet import task# Twisted has a slightly different approachdef schedule_install(customer):  # They are calling us back when a Wordpress installation completes.  # They connected the caller recognition system with our CRM and  # we know exactly what a call is about and what has to be done next.  #  # We now design processes of what has to happen on certain events.  def schedule_install_wordpress():      def on_done():        print "Callback: Finished installation for", customer    print "Scheduling: Installation for", customer    return task.deferLater(reactor, 3, on_done)  #  def all_done(_):    print "All done for", customer  #  # For each customer, we schedule these processes on the CRM  # and that  # is all our chief-Twisted developer has to do  d = schedule_install_wordpress()  d.addCallback(all_done)  #  return d# Yes, we don't need many developers anymore or any synchronization.# ~~ Super-powered Twisted developer ~~def twisted_developer_day(customers):  print "Goodmorning from Twisted developer"  #  # Here's what has to be done today  work = [schedule_install(customer) for customer in customers]  # Turn off the lights when done  join = defer.DeferredList(work)  join.addCallback(lambda _: reactor.stop())  #  print "Bye from Twisted developer!"# Even his day is particularly short!twisted_developer_day(["Customer %d" % i for i in xrange(15)])# Reactor, our secretary uses the CRM and follows-up on events!reactor.run()

運行結果:

------ Running example 3 ------Goodmorning from Twisted developerScheduling: Installation for Customer 0....Scheduling: Installation for Customer 14Bye from Twisted developer!Callback: Finished installation for Customer 0All done for Customer 0Callback: Finished installation for Customer 1All done for Customer 1...All done for Customer 14* Elapsed time: 3.18 seconds

這次我們得到了完美的執行代碼和可讀性強的輸出結果,并且沒有使用線程。我們并行地處理了15個消費者,也就是說,本來需要45s的執行時間在3s之內就已經完成。這個竅門就是我們把所有的阻塞的對sleep()的調用都換成了Twisted中對等的task.deferLater()和回調函數。由于現在處理的操作在其他地方進行,我們就可以毫不費力地同時服務于15個消費者。
前面提到處理的操作發生在其他的某個地方。現在來解釋一下,算術運算仍然發生在CPU內,但是現在的CPU處理速度相比磁盤和網絡操作來說非常快。所以給CPU提供數據或者從CPU向內存或另一個CPU發送數據花費了大多數時間。我們使用了非阻塞的操作節省了這方面的時間,例如,task.deferLater()使用了回調函數,當數據已經傳輸完成的時候會被激活。
另一個很重要的一點是輸出中的Goodmorning from Twisted developer和Bye from Twisted developer!信息。在代碼開始執行時就已經打印出了這兩條信息。如果代碼如此早地執行到了這個地方,那么我們的應用真正開始運行是在什么時候呢?答案是,對于一個Twisted應用(包括Scrapy)來說是在reactor.run()里運行的。在調用這個方法之前,必須把應用中可能用到的每個Deferred鏈準備就緒,然后reactor.run()方法會監視并激活回調函數。
注意,reactor的主要一條規則就是,你可以執行任何操作,只要它足夠快并且是非阻塞的。
現在好了,代碼中沒有那么用于管理多線程的部分了,不過這些回調函數看起來還是有些雜亂。可以修改成這樣:

# Twisted gave us utilities that make our code way more readable!@defer.inlineCallbacksdef inline_install(customer):  print "Scheduling: Installation for", customer  yield task.deferLater(reactor, 3, lambda: None)  print "Callback: Finished installation for", customer  print "All done for", customerdef twisted_developer_day(customers):  ... same as previously but using inline_install() instead of schedule_install()twisted_developer_day(["Customer %d" % i for i in xrange(15)])reactor.run()

運行的結果和前一個例子相同。這段代碼的作用和上一個例子是一樣的,但是看起來更加簡潔明了。inlineCallbacks生成器可以使用一些一些Python的機制來使得inline_install()函數暫停或者恢復執行。inline_install()函數變成了一個Deferred對象并且并行地為每個消費者運行。每次yield的時候,運行就會中止在當前的inline_install()實例上,直到yield的Deferred對象完成后再恢復運行。
現在唯一的問題是,如果我們不止有15個消費者,而是有,比如10000個消費者時又該怎樣?這段代碼會同時開始10000個同時執行的序列(比如HTTP請求、數據庫的寫操作等等)。這樣做可能沒什么問題,但也可能會產生各種失敗。在有巨大并發請求的應用中,例如Scrapy,我們經常需要把并發的數量限制到一個可以接受的程度上。在下面的一個例子中,我們使用task.Cooperator()來完成這樣的功能。Scrapy在它的Item Pipeline中也使用了相同的機制來限制并發的數目(即CONCURRENT_ITEMS設置):

@defer.inlineCallbacksdef inline_install(customer):  ... same as above# The new "problem" is that we have to manage all this concurrency to# avoid causing problems to others, but this is a nice problem to have.def twisted_developer_day(customers):  print "Goodmorning from Twisted developer"  work = (inline_install(customer) for customer in customers)  #  # We use the Cooperator mechanism to make the secretary not  # service more than 5 customers simultaneously.  coop = task.Cooperator()  join = defer.DeferredList([coop.coiterate(work) for i in xrange(5)])  #  join.addCallback(lambda _: reactor.stop())  print "Bye from Twisted developer!"twisted_developer_day(["Customer %d" % i for i in xrange(15)])reactor.run()# We are now more lean than ever, our customers happy, our hosting# bills ridiculously low and our performance stellar.# ~*~ THE END ~*~

運行結果:

$ ./deferreds.py 5------ Running example 5 ------Goodmorning from Twisted developerBye from Twisted developer!Scheduling: Installation for Customer 0...Callback: Finished installation for Customer 4All done for Customer 4Scheduling: Installation for Customer 5...Callback: Finished installation for Customer 14All done for Customer 14* Elapsed time: 9.19 seconds

從上面的輸出中可以看到,程序運行時好像有5個處理消費者的槽。除非一個槽空出來,否則不會開始處理下一個消費者的請求。在本例中,處理時間都是3秒,所以看起來像是5個一批次地處理一樣。最后得到的性能跟使用線程是一樣的,但是這次只有一個線程,代碼也更加簡潔更容易寫出正確的代碼。

PS:deferToThread使同步函數實現非阻塞
wisted的defer.Deferred (from twisted.internet import defer)可以返回一個deferred對象.

注:deferToThread使用線程實現的,不推薦過多使用
***把同步函數變為異步(返回一個Deferred)***
twisted的deferToThread(from twisted.internet.threads import deferToThread)也返回一個deferred對象,不過回調函數在另一個線程處理,主要用于數據庫/文件讀取操作

..# 代碼片段  def dataReceived(self, data):    now = int(time.time())    for ftype, data in self.fpcodec.feed(data):      if ftype == 'oob':        self.msg('OOB:', repr(data))      elif ftype == 0x81: # 對服務器請求的心跳應答(這個是解析 防疲勞駕駛儀,發給gps上位機的,然后上位機發給服務器的)        self.msg('FP.PONG:', repr(data))      else:        self.msg('TODO:', (ftype, data))      d = deferToThread(self.redis.zadd, "beier:fpstat:fps", now, self.devid)      d.addCallback(self._doResult, extra)

下面這兒完整的例子可以給大家參考一下

# -*- coding: utf-8 -*-from twisted.internet import defer, reactorfrom twisted.internet.threads import deferToThreadimport functoolsimport time# 耗時操作 這是一個同步阻塞函數def mySleep(timeout):  time.sleep(timeout)  # 返回值相當于加進了callback里  return 3 def say(result):  print "耗時操作結束了, 并把它返回的結果給我了", result# 用functools.partial包裝一下, 傳遞參數進去cb = functools.partial(mySleep, 3)d = deferToThread(cb) d.addCallback(say)print "你還沒有結束我就執行了, 哈哈"reactor.run()

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 新泰市| 虹口区| 沁阳市| 凤阳县| 吉木萨尔县| 庆城县| 金坛市| 灵台县| 睢宁县| 阳信县| 汉中市| 喜德县| 深泽县| 义乌市| 龙岩市| 漯河市| 仪征市| 孝感市| 宜都市| 上高县| 鸡泽县| 邹平县| 罗田县| 潍坊市| 夏邑县| 泰兴市| 建平县| 孟津县| 晴隆县| 怀集县| 桂阳县| 万州区| 项城市| 天津市| 习水县| 辉县市| 太康县| 揭阳市| 肥西县| 宣城市| 佛学|