一. reactor
twisted的核心是reactor,而提到reactor不可避免的是同步/異步,阻塞/非阻塞,在Dave的第一章概念性介紹中,對同步/異步的界限有點模糊,關于同步/異步,阻塞/非阻塞可參見知乎討論。而關于proactor(主動器)和reactor(反應堆),這里有一篇推薦博客有比較詳細的介紹。
就reactor模式的網絡IO而言,應該是同步IO而不是異步IO。而Dave第一章中提到的異步,核心在于:顯式地放棄對任務的控制權而不是被操作系統隨機地停止,程序員必須將任務組織成序列來交替的小步完成。因此,若其中一個任務用到另外一個任務的輸出,則依賴的任務(即接收輸出的任務)需要被設計成為要接收系列比特或分片而不是一下全部接收。
顯式主動地放棄任務的控制權有點類似協程的思考方式,reactor可看作協程的調度器。reactor是一個事件循環,我們可以向reactor注冊自己感興趣的事件(如套接字可讀/可寫)和處理器(如執行讀寫操作),reactor會在事件發生時回調我們的處理器,處理器執行完成之后,相當于協程掛起(yield),回到reactor的事件循環中,等待下一個事件來臨并回調。reactor本身有一個同步事件多路分解器(Synchronous Event Demultiplexer),可用select/epoll等機制實現,當然twisted reactor的事件觸發不一定是基于IO,也可以由定時器等其它機制觸發。
twisted的reactor無需我們主動注冊事件和回調函數,而是通過多態(繼承特定類,并實現所關心的事件接口,然后傳給twisted reactor)來實現。關于twisted的reactor,有幾個需要注意的地方:
twisted.internet.reactor是單例模式,每個程序只能有一個reactor;
盡量在reactor回調函數盡快完成操作,不要執行阻塞任務,reactor本質是單線程,用戶回調代碼與twisted代碼運行在同一個上下文,某個回調函數中阻塞,會導致reactor整個事件循環阻塞;
reactor會一直運行,除非通過reactor.stop()顯示停止它,但一般調用reactor.stop(),也就意味著應用程序結束;
二. twisted簡單使用
twisted的本質是reactor,我們可以使用twisted的底層API(避開twisted便利的高層抽象)來使用reactor:
# 示例一 twisted底層API的使用from twisted.internet import reactofrom twisted.internet import mainfrom twisted.internet.interfaces import IReadDescriptorimport socketclass MySocket(IReadDescriptor): def __init__(self, address): # 連接服務器 self.address = address self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(address) self.sock.setblocking(0) # tell the Twisted reactor to monitor this socket for reading reactor.addReader(self) # 接口: 告訴reactor 監聽的套接字描述符 def fileno(self): try: return self.sock.fileno() except socket.error: return -1 # 接口: 在連接斷開時的回調 def connectionLost(self, reason): self.sock.close() reactor.removeReader(self) # 當應用程序需要終止時 調用: # reactor.stop() # 接口: 當套接字描述符有數據可讀時 def doRead(self): bytes = '' # 盡可能多的讀取數據 while True: try: bytesread = self.sock.recv(1024) if not bytesread: break else: bytes += bytesread except socket.error, e: if e.args[0] == errno.EWOULDBLOCK: break return main.CONNECTION_LOST if not bytes: return main.CONNECTION_DONE else: # 在這里解析協議并處理數據 print bytes
示例一可以很清晰的看到twisted的reactor本質:添加監聽描述符,監聽可讀/可寫事件,當事件來臨時回調函數,回調完成之后繼續監聽事件。
需要注意:
套接字為非阻塞,如果為阻塞則失去了reactor的意義
我們通過繼承IReadDescriptor來提供reactor所需要的接口
通過reactor.addReader將套接字類加入reactor的監聽對象中
main.CONNECTION_LOST是twisted預定義的值,通過這些值它我們可以一定程度控制下一步回調(類似于模擬一個事件)
但是上面的MySocket類不夠好,主要有以下缺點:
需要我們自己去讀取數據,而不是框架幫我們讀好,并處理異常
網絡IO和數據處理混為一塊,沒有剝離開來
三. twisted抽象
twisted在reactor的基礎上,建立了更高的抽象,對一個網絡連接而言,twisted建立了如下三個概念:
Transports:網絡連接層,僅負責網絡連接和讀/寫字節數據
Protocols: 協議層,服務業務相關的網絡協議,將字節流轉換成應用所需數據
Protocol Factories:協議工廠,負責創建Protocols,每個網絡連接都有一個Protocols對象(因為要保存協議解析狀態)
twisted的這些概念和erlang中的ranch網絡框架很像,ranch框架也抽象了Transports和Protocols概念,在有新的網絡連接時,ranch自動創建Transports和Protocols,其中Protocols由用戶在啟動ranch時傳入,是一個實現了ranch_protocol behaviour的模塊,Protocols初始化時,會收到該連接對應的Transports,如此我們可以在Protocols中處理字節流數據,按照我們的協議解析并處理數據。同時可通過Transports來發送數據(ranch已經幫你讀取了字節流數據了)。
和ranch類似,twisted也會在新連接到達時創建Protocols并且將Transport傳入,twisted會幫我們讀取字節流數據,我們只需在dataReceived(self, data)接口中處理字節流數據即可。此時的twisted在網絡IO上可以算是真正的異步了,它幫我們處理了網絡IO和可能遇到的異常,并且將網絡IO和數據處理剝離開來,抽象為Transports和Protocols,提高了程序的清晰性和健壯性。
# 示例二 twisted抽象的使用from twisted.internet import reactorfrom twisted.internet.protocol import Protocol, ClientFactoryclass MyProtocol(Protocol): # 接口: Protocols初始化時調用,并傳入Transports # 另外 twisted會自動將Protocols的factory對象成員設為ProtocolsFactory實例的引用 # 如此就可以通過factory來與MyProtocolFactory交互 def makeConnection(self,trans): print 'make connection: get transport: ', trans print 'my factory is: ', self.factory # 接口: 有數據到達 def dataReceived(self, data): self.poem += data msg = 'Task %d: got %d bytes of poetry from %s' print msg % (self.task_num, len(data), self.transport.getPeer()) # 接口: 連接斷開 def connectionLost(self, reason): # 連接斷開的處理class MyProtocolFactory(ClientFactory): # 接口: 通過protocol類成員指出需要創建的Protocols protocol = PoetryProtocol # tell base class what proto to build def __init__(self, address): self.poetry_count = poetry_count self.poems = {} # task num -> poem # 接口: 在創建Protocols的回調 def buildProtocol(self, address): proto = ClientFactory.buildProtocol(self, address) # 在這里對proto做一些初始化.... return proto # 接口: 連接Server失敗時的回調 def clientConnectionFailed(self, connector, reason): print 'Failed to connect to:', connector.getDestination() def main(address): factory = MyClientFactory(address) host, port = address # 連接服務端時傳入ProtocolsFactory reactor.connectTCP(host, port, factory) reactor.run()示例二要比示例一要簡單清晰很多,因為它無需處理網絡IO,并且邏輯上更為清晰,實際上ClientFactory和Protocol提供了更多的接口用于實現更靈活強大的邏輯控制,具體的接口可參見twisted源代碼。
四. twisted Deferred
twisted Deferred對象用于解決這樣的問題:有時候我們需要在ProtocolsFactory中嵌入自己的回調,以便Protocols中發生某個事件(如所有Protocols都處理完成)時,回調我們指定的函數(如TaskFinished)。如果我們自己來實現回調,需要處理幾個問題:
如何區分回調的正確返回和錯誤返回?(我們在使用異步調用時,要尤其注意錯誤返回的重要性)
如果我們的正確返回和錯誤返回都需要執行一個公共函數(如關閉連接)呢?
如果保證該回調只被調用一次?
Deferred對象便用于解決這種問題,它提供兩個回調鏈,分別對應于正確返回和錯誤返回,在正確返回或錯誤返回時,它會依次調用對應鏈中的函數,并且保證回調的唯一性。
d = Deferred()# 添加正確回調和錯誤回調d.addCallbacks(your_ok_callback, your_err_callback)# 添加公共回調函數d.addBoth(your_common_callback)# 正確返回 將依次調用 your_ok_callback(Res) -> common_callback(Res)d.callback(Res)# 錯誤返回 將依次調用 your_err_callback(Err) -> common_callback(Err)d.errback(Err)# 注意,對同一個Defered對象,只能返回一次,嘗試多次返回將會報錯
twisted的defer是異步的一種變現方式,可以這么理解,他和thread的區別是,他是基于時間event的。
有了deferred,即可對任務的執行進行管理控制。防止程序的運行,由于等待某項任務的完成而陷入阻塞停滯,提高整體運行的效率。
Deferred能幫助你編寫異步代碼,但并不是為自動生成異步或無阻塞的代碼!要想將一個同步函數編程異步函數,必須在函數中返回Deferred并正確注冊回調。
五.綜合示例
下面的例子,你們自己跑跑,我上面說的都是一些個零散的例子,大家對照下面完整的,走一遍。 twisted理解其實卻是有點麻煩,大家只要知道他是基于事件的后,慢慢理解就行了。
#coding:utf-8#xiaorui.ccfrom twisted.internet import reactor, deferfrom twisted.internet.threads import deferToThreadimport os,sysfrom twisted.python import threadable; threadable.init(1)deferred =deferToThread.__get__import timedef todoprint_(result): print resultdef running(): "Prints a few dots on stdout while the reactor is running."# sys.stdout.write("."); sys.stdout.flush() print '.' reactor.callLater(.1, running)@deferreddef sleep(sec): "A blocking function magically converted in a non-blocking one." print 'start sleep %s'%sec time.sleep(sec) print '/nend sleep %s'%sec return "ok"def test(n,m): print "fun test() is start" m=m vals = [] keys = [] for i in xrange(m): vals.append(i) keys.append('a%s'%i) d = None for i in xrange(n): d = dict(zip(keys, vals)) print "fun test() is end" return dif __name__== "__main__":#one sleep(10).addBoth(todoprint_) reactor.callLater(.1, running) reactor.callLater(3, reactor.stop) print "go go !!!" reactor.run()#two aa=time.time() de = defer.Deferred() de.addCallback(test) reactor.callInThread(de.callback,10000000,100 ) print time.time()-aa print "我這里先做別的事情" print de print "go go end"新聞熱點
疑難解答
圖片精選