国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

剖析Python的Twisted框架的核心特性

2019-11-25 16:46:44
字體:
來源:轉載
供稿:網友

一. reactor
twisted的核心是reactor,而提到reactor不可避免的是同步/異步,阻塞/非阻塞,在Dave的第一章概念性介紹中,對同步/異步的界限有點模糊,關于同步/異步,阻塞/非阻塞可參見知乎討論。而關于proactor(主動器)和reactor(反應堆),這里有一篇推薦博客有比較詳細的介紹。
就reactor模式的網絡IO而言,應該是同步IO而不是異步IO。而Dave第一章中提到的異步,核心在于:顯式地放棄對任務的控制權而不是被操作系統隨機地停止,程序員必須將任務組織成序列來交替的小步完成。因此,若其中一個任務用到另外一個任務的輸出,則依賴的任務(即接收輸出的任務)需要被設計成為要接收系列比特或分片而不是一下全部接收。
顯式主動地放棄任務的控制權有點類似協程的思考方式,reactor可看作協程的調度器。reactor是一個事件循環,我們可以向reactor注冊自己感興趣的事件(如套接字可讀/可寫)和處理器(如執行讀寫操作),reactor會在事件發生時回調我們的處理器,處理器執行完成之后,相當于協程掛起(yield),回到reactor的事件循環中,等待下一個事件來臨并回調。reactor本身有一個同步事件多路分解器(Synchronous Event Demultiplexer),可用select/epoll等機制實現,當然twisted reactor的事件觸發不一定是基于IO,也可以由定時器等其它機制觸發。
twisted的reactor無需我們主動注冊事件和回調函數,而是通過多態(繼承特定類,并實現所關心的事件接口,然后傳給twisted reactor)來實現。關于twisted的reactor,有幾個需要注意的地方:
twisted.internet.reactor是單例模式,每個程序只能有一個reactor;
盡量在reactor回調函數盡快完成操作,不要執行阻塞任務,reactor本質是單線程,用戶回調代碼與twisted代碼運行在同一個上下文,某個回調函數中阻塞,會導致reactor整個事件循環阻塞;
reactor會一直運行,除非通過reactor.stop()顯示停止它,但一般調用reactor.stop(),也就意味著應用程序結束;

二. twisted簡單使用
twisted的本質是reactor,我們可以使用twisted的底層API(避開twisted便利的高層抽象)來使用reactor:

# 示例一 twisted底層API的使用from twisted.internet import reactofrom twisted.internet import mainfrom twisted.internet.interfaces import IReadDescriptorimport socketclass MySocket(IReadDescriptor):  def __init__(self, address):    # 連接服務器    self.address = address    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)    self.sock.connect(address)    self.sock.setblocking(0)    # tell the Twisted reactor to monitor this socket for reading    reactor.addReader(self)  # 接口: 告訴reactor 監聽的套接字描述符  def fileno(self):    try:      return self.sock.fileno()    except socket.error:      return -1       # 接口: 在連接斷開時的回調  def connectionLost(self, reason):    self.sock.close()    reactor.removeReader(self)  # 當應用程序需要終止時 調用:    # reactor.stop() # 接口: 當套接字描述符有數據可讀時  def doRead(self):    bytes = '' # 盡可能多的讀取數據    while True:      try:        bytesread = self.sock.recv(1024)        if not bytesread:          break        else:          bytes += bytesread      except socket.error, e:        if e.args[0] == errno.EWOULDBLOCK:          break        return main.CONNECTION_LOST    if not bytes:       return main.CONNECTION_DONE    else:      # 在這里解析協議并處理數據      print bytes

示例一可以很清晰的看到twisted的reactor本質:添加監聽描述符,監聽可讀/可寫事件,當事件來臨時回調函數,回調完成之后繼續監聽事件。
需要注意:
套接字為非阻塞,如果為阻塞則失去了reactor的意義
我們通過繼承IReadDescriptor來提供reactor所需要的接口
通過reactor.addReader將套接字類加入reactor的監聽對象中
main.CONNECTION_LOST是twisted預定義的值,通過這些值它我們可以一定程度控制下一步回調(類似于模擬一個事件)
但是上面的MySocket類不夠好,主要有以下缺點:
需要我們自己去讀取數據,而不是框架幫我們讀好,并處理異常
網絡IO和數據處理混為一塊,沒有剝離開來

三. twisted抽象
twisted在reactor的基礎上,建立了更高的抽象,對一個網絡連接而言,twisted建立了如下三個概念:
Transports:網絡連接層,僅負責網絡連接和讀/寫字節數據
Protocols: 協議層,服務業務相關的網絡協議,將字節流轉換成應用所需數據
Protocol Factories:協議工廠,負責創建Protocols,每個網絡連接都有一個Protocols對象(因為要保存協議解析狀態)
twisted的這些概念和erlang中的ranch網絡框架很像,ranch框架也抽象了Transports和Protocols概念,在有新的網絡連接時,ranch自動創建Transports和Protocols,其中Protocols由用戶在啟動ranch時傳入,是一個實現了ranch_protocol behaviour的模塊,Protocols初始化時,會收到該連接對應的Transports,如此我們可以在Protocols中處理字節流數據,按照我們的協議解析并處理數據。同時可通過Transports來發送數據(ranch已經幫你讀取了字節流數據了)。
和ranch類似,twisted也會在新連接到達時創建Protocols并且將Transport傳入,twisted會幫我們讀取字節流數據,我們只需在dataReceived(self, data)接口中處理字節流數據即可。此時的twisted在網絡IO上可以算是真正的異步了,它幫我們處理了網絡IO和可能遇到的異常,并且將網絡IO和數據處理剝離開來,抽象為Transports和Protocols,提高了程序的清晰性和健壯性。

# 示例二 twisted抽象的使用from twisted.internet import reactorfrom twisted.internet.protocol import Protocol, ClientFactoryclass MyProtocol(Protocol):  # 接口: Protocols初始化時調用,并傳入Transports # 另外 twisted會自動將Protocols的factory對象成員設為ProtocolsFactory實例的引用 #   如此就可以通過factory來與MyProtocolFactory交互 def makeConnection(self,trans):    print 'make connection: get transport: ', trans    print 'my factory is: ', self.factory     # 接口: 有數據到達  def dataReceived(self, data):    self.poem += data    msg = 'Task %d: got %d bytes of poetry from %s'    print msg % (self.task_num, len(data), self.transport.getPeer())  # 接口: 連接斷開  def connectionLost(self, reason):    # 連接斷開的處理class MyProtocolFactory(ClientFactory): # 接口: 通過protocol類成員指出需要創建的Protocols  protocol = PoetryProtocol # tell base class what proto to build  def __init__(self, address):    self.poetry_count = poetry_count    self.poems = {} # task num -> poem     # 接口: 在創建Protocols的回調  def buildProtocol(self, address):    proto = ClientFactory.buildProtocol(self, address)    # 在這里對proto做一些初始化....    return proto     # 接口: 連接Server失敗時的回調  def clientConnectionFailed(self, connector, reason):    print 'Failed to connect to:', connector.getDestination()    def main(address): factory = MyClientFactory(address)  host, port = address  # 連接服務端時傳入ProtocolsFactory  reactor.connectTCP(host, port, factory)   reactor.run()

示例二要比示例一要簡單清晰很多,因為它無需處理網絡IO,并且邏輯上更為清晰,實際上ClientFactory和Protocol提供了更多的接口用于實現更靈活強大的邏輯控制,具體的接口可參見twisted源代碼。

四. twisted Deferred
twisted Deferred對象用于解決這樣的問題:有時候我們需要在ProtocolsFactory中嵌入自己的回調,以便Protocols中發生某個事件(如所有Protocols都處理完成)時,回調我們指定的函數(如TaskFinished)。如果我們自己來實現回調,需要處理幾個問題:
如何區分回調的正確返回和錯誤返回?(我們在使用異步調用時,要尤其注意錯誤返回的重要性)
如果我們的正確返回和錯誤返回都需要執行一個公共函數(如關閉連接)呢?
如果保證該回調只被調用一次?
Deferred對象便用于解決這種問題,它提供兩個回調鏈,分別對應于正確返回和錯誤返回,在正確返回或錯誤返回時,它會依次調用對應鏈中的函數,并且保證回調的唯一性。

d = Deferred()# 添加正確回調和錯誤回調d.addCallbacks(your_ok_callback, your_err_callback)# 添加公共回調函數d.addBoth(your_common_callback)# 正確返回 將依次調用 your_ok_callback(Res) -> common_callback(Res)d.callback(Res)# 錯誤返回 將依次調用 your_err_callback(Err) -> common_callback(Err)d.errback(Err)# 注意,對同一個Defered對象,只能返回一次,嘗試多次返回將會報錯

twisted的defer是異步的一種變現方式,可以這么理解,他和thread的區別是,他是基于時間event的。
有了deferred,即可對任務的執行進行管理控制。防止程序的運行,由于等待某項任務的完成而陷入阻塞停滯,提高整體運行的效率。
Deferred能幫助你編寫異步代碼,但并不是為自動生成異步或無阻塞的代碼!要想將一個同步函數編程異步函數,必須在函數中返回Deferred并正確注冊回調。

五.綜合示例

下面的例子,你們自己跑跑,我上面說的都是一些個零散的例子,大家對照下面完整的,走一遍。 twisted理解其實卻是有點麻煩,大家只要知道他是基于事件的后,慢慢理解就行了。

#coding:utf-8#xiaorui.ccfrom twisted.internet import reactor, deferfrom twisted.internet.threads import deferToThreadimport os,sysfrom twisted.python import threadable; threadable.init(1)deferred =deferToThread.__get__import timedef todoprint_(result):  print resultdef running():  "Prints a few dots on stdout while the reactor is running."#   sys.stdout.write("."); sys.stdout.flush()  print '.'  reactor.callLater(.1, running)@deferreddef sleep(sec):  "A blocking function magically converted in a non-blocking one."  print 'start sleep %s'%sec  time.sleep(sec)  print '/nend sleep %s'%sec  return "ok"def test(n,m):  print "fun test() is start"  m=m  vals = []  keys = []  for i in xrange(m):    vals.append(i)    keys.append('a%s'%i)  d = None  for i in xrange(n):    d = dict(zip(keys, vals))  print "fun test() is end"  return dif __name__== "__main__":#one  sleep(10).addBoth(todoprint_)  reactor.callLater(.1, running)  reactor.callLater(3, reactor.stop)  print "go go !!!"  reactor.run()#two  aa=time.time()  de = defer.Deferred()  de.addCallback(test)  reactor.callInThread(de.callback,10000000,100 )  print time.time()-aa  print "我這里先做別的事情"  print de  print "go go end"

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 庄河市| 汤阴县| 仲巴县| 昭觉县| 金门县| 长垣县| 乐山市| 德庆县| 响水县| 广河县| 上饶市| 石屏县| 鄂托克旗| 类乌齐县| 石渠县| 克什克腾旗| 广饶县| 洱源县| 永和县| 宁津县| 南岸区| 轮台县| 新宾| 天水市| 遵化市| 洛浦县| 安康市| 筠连县| 诸城市| 平陆县| 酒泉市| 墨竹工卡县| 延吉市| 肃宁县| 兴宁市| 鄱阳县| 西安市| 双城市| 额济纳旗| 庐江县| 兴和县|