第一個twisted支持的詩歌服務器
盡管Twisted大多數情況下用來寫服務器代碼,但為了一開始盡量從簡單處著手,我們首先從簡單的客戶端講起。
讓我們來試試使用Twisted的客戶端。源碼在twisted-client-1/get-poetry.py。首先像前面一樣要開啟三個服務器:
python blocking-server/slowpoetry.py --port 10000 poetry/ecstasy.txt --num-bytes 30python blocking-server/slowpoetry.py --port 10001 poetry/fascination.txtpython blocking-server/slowpoetry.py --port 10002 poetry/science.txt
并且運行客戶端:
python twisted-client-1/get-poetry.py 10000 10001 10002
你會看到在客戶端的命令行打印出:
Task 1: got 60 bytes of poetry from 127.0.0.1:10000Task 2: got 10 bytes of poetry from 127.0.0.1:10001Task 3: got 10 bytes of poetry from 127.0.0.1:10002Task 1: got 30 bytes of poetry from 127.0.0.1:10000 Task 3: got 10 bytes of poetry from 127.0.0.1:10002Task 2: got 10 bytes of poetry from 127.0.0.1:10001 ... Task 1: 3003 bytes of poetryTask 2: 623 bytes of poetryTask 3: 653 bytes of poetryGot 3 poems in 0:00:10.134220
和我們的沒有使用Twisted的非阻塞模式客戶端打印的內容接近。這并不奇怪,因為它們的工作方式是一樣的。
下面,我們來仔細研究一下它的源代碼。
注意:我們開始學習使用Twisted時會使用一些低層Twisted的APIs。這樣做是為揭去Twisted的抽象層,這樣我們就可以從內向外的來學習Tiwsted。但是這就意味著,我們在學習中所使用的APIs在實際應用中可能都不會見到。記住這么一點就行:前面這些代碼只是用作練習,而不是寫真實軟件的例子。
可以看到,首先創建了一組PoetrySocket的實例。在PoetrySocket初始化時,其創建了一個網絡socket作為自己的屬性字段來連接服務器,并且選擇了非阻塞模式:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.connect(address)self.sock.setblocking(0)
最終我們雖然會提高到不使用socket的抽象層次上,但這里我們仍然需要使用它。在創建完socket后,PoetrySocket通過方法addReader將自己傳遞給 reactor:
# tell the Twisted reactor to monitor this socket for readingfrom twisted.internet import reactorreactor.addReader(self)
這個方法給Twisted提供了一個文件描述符來監視要發送來的數據。為什么我們不傳遞給Twisted一個文件描述符或回調函數而是一個對象實例?并且Twisted內部沒有任何與這個詩歌服務相關的代碼,它怎么知道該如何與我們的對象實例交互?相信我,我已經查看過了,打開twisted.internet.interfaces模塊,和我一起來搞清楚是怎么回事。
Twisted接口
在twisted內部有很多被稱作接口的子模塊。每個都定義了一組接口類。由于在8.0版本中,Twisted使用zope.interface作為這些類的基類。但我們這里并不來討論它其中的細節。我們只關心其在Twisted的子類,就是你看到的那些。
使用接口的核心目的之一就是文檔化。作為一個python程序員,你肯定知道Duck Typing。(python哲學思想:“如果看起來像鴨子,聽起來像鴨子,就可以把它當作鴨子”。因此python對象的接口力求簡單而且統一,類似其他語言中面向接口編程思想。) 翻閱twisted.internet.interfaces找到方法的addReader定義,它的定義在IReactorFDSet中可以找到:
def addReader(reader): """ I add reader to the set of file descriptors to get read events for. @param reader: An L{IReadDescriptor} provider that will be checked for read events until it is removed from the reactor with L{removeReader}. @return: C{None}. """IReactorFDSet是一個Twisted的reactor實現的接口。因此任何一個Twisted的reactor都會一個 addReader的方法,如同上面描述的一樣工作。這個方法聲明之所以沒有self參數是因為它僅僅關心一個公共接口定義,self參數僅僅是接口實現時的一部分(在調用它時,也沒有顯式地傳入一個self參數)。接口類永遠不會被實例化或作為基類來繼承實現。
技術上講,IReactorFDSet只會由reactor實現用來監聽文件描述符。具我所知,現在所有已實現reactor都會實現這個接口。
使用接口并不僅僅是為了文檔化。zope.interface允許你顯式地來聲明一個類實現一個或多個接口,并提供運行時檢查這些實現的機制。同樣也提供代理這一機制,它可以動態地為一個沒有實現某接口的類直接提供該接口。但我們這里就不做深入學習了。
你可能已經注意到接口與最近添加到Python中虛基類的相似性了。這里我們并不去分析它們之間的相似性與差異。若你有興趣,可以讀讀Python項目的創始人Glyph寫的一篇關于這個話題的文章。
根據文檔的描述可以看出,addReader的reader參數是要實現IReadDescriptor接口的。這也就意味我們的PoetrySocket也必須這樣做。
閱讀接口模塊我們可以看到下面這段代碼:
class IReadDescriptor(IFileDescriptor): def doRead(): """ Some data is available for reading on your descriptor. """
同時你會看到在我們的PoetrySocket類中有一個doRead方法。當其被Twisted的reactor調用時,就會采用異步的方式從socket中讀取數據。因此,doRead其實就是一個回調函數,只是沒有直接將其傳遞給reactor,而是傳遞一個實現此方法的對象實例。這也是Twisted框架中的慣例―不是直接傳遞實現某個接口的函數而是傳遞實現它的對象。這樣我們通過一個參數就可以傳遞一組相關的回調函數。而且也可以讓回調函數之間通過存儲在對象中的數據進行通信。
那在PoetrySocket中實現其它的回調函數呢?注意到IReadDescriptor是IFileDescriptor的一個子類。這也就意味任何一個實現IReadDescriptor都必須實現IFileDescriptor。若是你仔細閱讀代碼會看到下面的內容:
class IFileDescriptor(ILoggingContext): """ A file descriptor. """ def fileno(): ... def connectionLost(reason): …
我將文檔描述省略掉了,但這些函數的功能從字面上就可以理解:fileno返回我們想監聽的文件描述符,connectionLost是當連接關閉時被調用。你也看到了,PoetrySocket實現了這些方法。
最后,IFileDescriptor繼承了ILoggingContext,這里我不想再展現其源碼。我想說的是,這就是為什么我們要實現一個logPrefix回調函數。你可以在interface模塊中找到答案。
注意:你也許注意到了,當連接關閉時,在doRead中返回了一個特殊的值。我是如何知道的?說實話,沒有它程序是無法正常工作的。我是在分析Twisted源碼中發現其它相應的方法采取相同的方法。你也許想好好研究一下:但有時一些文檔或書的解釋是錯誤的或不完整的。
更多關于回調的知識
我們使用Twisted的異步客戶端和前面的沒有使用Twisted的異步客戶非常的相似。兩者都要連接它們自己的socket,并以異步的方式從中讀取數據。最大的區別在于:使用Twisted的客戶端并沒有使用自己的select循環-而使用了Twisted的reactor。 doRead回調函數是非常重要的一個回調。Twisted調用它來告訴我們已經有數據在socket接收完畢。我可以通過圖7來形象地說明這一過程:

每當回調被激活,就輪到我們的代碼將所有能夠讀的數據讀回來然后非阻塞式的停止。Twisted是不會因為什么異常狀況(如沒有必要的阻塞)而終止我們的代碼。那么我們就故意寫個會產生異常狀況的客戶端看看到底能發生什么事情。可以在twisted-client-1/get-poetry-broken.py中看到源代碼。這個客戶端與你前面看到的同樣有兩個異常狀況出現:
這個客戶端并沒有選擇非阻塞式的socket
doRead回調方法在socket關閉連接前一直在不停地讀socket
現在讓我們運行一下這個客戶端:
python twisted-client-1/get-poetry-broken.py 10000 10001 10002
我們出得到如同下面一樣的輸出:
Task 1: got 3003 bytes of poetry from 127.0.0.1:10000Task 3: got 653 bytes of poetry from 127.0.0.1:10002 Task 2: got 623 bytes of poetry from 127.0.0.1:10001Task 1: 3003 bytes of poetry Task 2: 623 bytes of poetryTask 3: 653 bytes of poetryGot 3 poems in 0:00:10.132753
可能除了任務的完成順序不太一致外,和我前面阻塞式客戶端是一樣的。這是因為這個客戶端是一個阻塞式的。
由于使用了阻塞式的連接,就將我們的非阻塞式客戶端變成了阻塞式的客戶端。這樣一來,我們盡管遭受了使用select的復雜但卻沒有享受到其帶來的異步優勢。
像諸如Twisted這樣的事件循環所提供的多任務的能力是需要用戶的合作來實現的。Twisted會告訴我們什么時候讀或寫一個文件描述符,但我們必須要盡可能高效而沒有阻塞地完成讀寫工作。同樣我們應該禁止使用其它各類的阻塞函數,如os.system中的函數。除此之外,當我們遇到計算型的任務(長時間占用CPU),最好是將任務切成若干個部分執行以讓I/O操作盡可能地執行。
你也許已經注意到這個客戶端所花費的時間少于先前那個阻塞的客戶端。這是由于這個在一開始就與所有的服務建立連接,由于服務是一旦連接建立就立即發送數據,而且我們的操作系統會緩存一部分發送過來但尚讀不到的數據到緩沖區中(緩沖區大小是有上限的)。因此就明白了為什么前面那個會慢了:它是在完成一個后再建立下一個連接并接收數據。
但這種小優勢僅僅在小數據量的情況下才會得以體現。如果我們下載三首20M個單詞的詩,那時OS的緩沖區會在瞬間填滿,這樣一來我們這個客戶端與前面那個阻塞式客戶端相比就沒有什么優勢可言了。
抽象地構建客戶端
首先是,這個客戶端竟然有創建網絡端口并接收端口處的數據這樣枯燥的代碼。Twisted理應為我們實現這些例程性功能,省得我們每次寫一個新的程序時都要自己去實現。這樣做特別有用,可以將我們從異步I/O涉及的一些棘手的異常處理中解放出來(參看前面的客戶端) , 如果要跨平臺就涉及到更多更加棘手的細節。如果你哪天下午有空,可以翻翻Twisted的WIN32實現源代碼,看看里面有多少小針線是來處理跨平臺的。
另一問題是與錯誤處理有關。當運行版本1的Twisted客戶端從并沒有提供服務的端口上下載詩歌時,它就會崩潰。當然我們是可以修正這個錯誤,但通過下面我們要介紹Twisted的APIs來處理這些類型的錯誤會更簡單。
最后,那個客戶端也不能復用。如果有另一個模塊需要通過我們的客戶端下載詩歌呢?人家怎么知道你的詩歌已經下載完畢?我們不能用一個方法簡單地將一首詩下載完成后再傳給人家,而在之前讓人家處于等待狀態。這確實是一個問題,但我們不準備在這個部分解決這個問題―在未來的部分中一定會解決這個問題。
我們將會使用一些高層次的APIs和接口來解決第一、二個問題。Twisted框架是由眾多抽象層松散地組合起來的。因此,學習Twisted也就意味著需要學習這些層都提供什么功能,例如每層都有哪些APIs,接口和實例可供使用。接下來我們會通過剖析Twisted最最重要的部分來更好地感受一下Twisted都是怎么組織的。一旦你對Twisted的整個結構熟悉了,學習新的部分會簡單多了。
一般來說,每個Twisted的抽象都只與一個特定的概念相關。例如,第四部分中的客戶端使用的IReadDescriptor,它就是"一個可以讀取字節的文件描述符"的抽象。一個抽象往往會通過定義接口來指定那些想實現這個抽象(也就是實現這個接口)的對象的形為。在學習新的Twisted抽象概念時,最需要謹記的就是:
多數高層次抽象都是在低層次抽象的基礎上建立的,很少有另立門戶的。
因此,你在學習新的Twisted抽象概念時,始終要記住它做什么和不做什么。特別是,如果一個早期的抽象A實現了F特性,那么F特性不太可能再由其它任何抽象來實現。另外,如果另外一個抽象需要F特性,那么它會使用A而不是自己再去實現F。(通常的做法,B可能會通過繼承A或獲得一個指向A實例的引用)
網絡非常的復雜,因此Twisted包含很多抽象的概念。通過從低層的抽象講起,我們希望能更清楚起看到在一個Twisted程序中各個部分是怎么組織起來的。
核心的循環體
第一個我們要學習的抽象,也是Twisted中最重要的,就是reactor。在每個通過Twisted搭建起來的程序中心處,不管你這個程序有多少層,總會有一個reactor循環在不停止地驅動程序的運行。再也沒有比reactor提供更加基礎的支持了。實際上,Twisted的其它部分(即除了reactor循環體)可以這樣理解:它們都是來輔助X來更好地使用reactor,這里的X可以是提供Web網頁、處理一個數據庫查詢請求或其它更加具體的內容。盡管堅持像上一個客戶端一樣使用低層APIs是可能的,但如果我們執意那樣做,那么我們必需自己來實現非常多的內容。而在更高的層次上,意味著我們可以少寫很多代碼。
但是當在外層思考與處理問題時, 很容易就忘記了reactor的存在了。在任何一個常見大小的Twisted程序中 ,確實很少會有直接與reactor的APIs交互。低層的抽象也是一樣(即我們很少會直接與其交互)。我們在上一個客戶端中用到的文件描述符抽象,就被更高層的抽象更好的歸納以至于我們很少會在真正的Twisted程序中遇到。(他們在內部依然在被使用,只是我們看不到而已)
至于文件描述符抽象的消息,這并不是一個問題。讓Twisted掌舵異步I/O處理,這樣我們就可以更加關注我們實際要解決的問題。但對于reactor不一樣,它永遠都不會消失。當你選擇使用Twisted,也就意味著你選擇使用Reactor模式,并且意味著你需要使用回調與多任務合作的"交互式"編程方式。
Transports
Transports抽象是通過Twisted中interfaces模塊中ITransport接口定義的。一個Twisted的Transport代表一個可以收發字節的單條連接。對于我們的詩歌下載客戶端而言,就是對一條TCP連接的抽象。但是Twisted也支持諸如Unix中管道和UDP。Transport抽象可以代表任何這樣的連接并為其代表的連接處理具體的異步I/O操作細節。
如果你瀏覽一下ITransport中的方法,可能找不到任何接收數據的方法。這是因為Transports總是在低層完成從連接中異步讀取數據的許多細節工作,然后通過回調將數據發給我們。相似的原理,Transport對象的寫相關的方法為避免阻塞也不會選擇立即寫我們要發送的數據。告訴一個Transport要發送數據,只是意味著:盡快將這些數據發送出去,別產生阻塞就行。當然,數據會按照我們提交的順序發送。
通常我們不會自己實現一個Transport。我們會去使用Twisted提供的實現類,即在傳遞給reactor時會為我們創建一個對象實例。
Protocols
Twisted的Protocols抽象由interfaces模塊中的IProtocol定義。也許你已經想到,Protocol對象實現協議內容。也就是說,一個具體的Twisted的Protocol的實現應該對應一個具體網絡協議的實現,像FTP、IMAP或其它我們自己制定的協議。我們的詩歌下載協議,正如它表現的那樣,就是在連接建立后將所有的詩歌內容全部發送出去并且在發送完畢后關閉連接。
嚴格意義上講,每一個Twisted的Protocols類實例都為一個具體的連接提供協議解析。因此我們的程序每建立一條連接(對于服務方就是每接受一條連接),都需要一個協議實例。這就意味著,Protocol實例是存儲協議狀態與間斷性(由于我們是通過異步I/O方式以任意大小來接收數據的)接收并累積數據的地方。
因此,Protocol實例如何得知它為哪條連接服務呢?如果你閱讀IProtocol定義會發現一個makeConnection函數。這是一個回調函數,Twisted會在調用它時傳遞給其一個也是僅有的一個參數,即Transport實例。這個Transport實例就代表Protocol將要使用的連接。
Twisted內置了很多實現了通用協議的Protocol。你可以在twisted.protocols.basic中找到一些稍微簡單點的。在你嘗試寫新Protocol時,最好是看看Twisted源碼是不是已經有現成的存在。如果沒有,那實現一個自己的協議是非常好的,正如我們為詩歌下載客戶端做的那樣。
Protocol Factories
因此每個連接需要一個自己的Protocol,而且這個Protocol是我們自己定義的類的實例。由于我們會將創建連接的工作交給Twisted來完成,Twisted需要一種方式來為一個新的連接創建一個合適的協議。創建協議就是Protocol Factories的工作了。
也許你已經猜到了,Protocol Factory的API由IProtocolFactory來定義,同樣在interfaces模塊中。Protocol Factory就是Factory模式的一個具體實現。buildProtocol方法在每次被調用時返回一個新Protocol實例,它就是Twisted用來為新連接創建新Protocol實例的方法。
詩歌下載客戶端2.0:第一滴心血
好吧,讓我們來看看由Twisted支持的詩歌下載客戶端2.0。源碼可以在這里twisted-client-2/get-poetry.py。你可以像前面一樣運行它,并得到相同的輸出。這也是最后一個在接收到數據時打印其任務的客戶端版本了。到現在為止,對于所有Twisted程序都是交替執行任務并處理相對較少數量數據的,應該很清晰了。我們依然通過print函數來展示在關鍵時刻在進行什么內容,但將來客戶端不會在這樣繁鎖。
在第二個版本中,sockets不會再出現了。我們甚至不需要引入socket模塊也不用引用socket對象和文件描述符。取而代之的是,我們告訴reactor來創建到詩歌服務器的連接,代碼如下面所示:
factory = PoetryClientFactory(len(addresses))from twisted.internet import reactorfor address in addresses: host, port = address reactor.connectTCP(host, port, factory)
我們需要關注的是connectTCP這個函數。前兩個參數的含義很明顯,不解釋了。第三個參數是我們自定義的PoetryClientFactory類的實例對象。這是一個專門針對詩歌下載客戶端的Protocol Factory,將它傳遞給reactor可以讓Twisted為我們創建一個PoetryProtocol實例。
值得注意的是,從一開始我們既沒有實現Factory也沒有去實現Protocol,不像在前面那個客戶端中我們去實例化我們PoetrySocket類。我們只是繼承了Twisted在twisted.internet.protocol 中提供的基類。Factory的基類是twisted.internet.protocol.Factory,但我們使用客戶端專用(即不像服務器端那樣監聽一個連接,而是主動創建一個連接)的ClientFactory子類來繼承。
我們同樣利用了Twisted的Factory已經實現了buildProtocol方法這一優勢來為我們所用。我們要在子類中調用基類中的實現:
def buildProtocol(self, address): proto = ClientFactory.buildProtocol(self, address) proto.task_num = self.task_num self.task_num += 1 return proto
基類怎么會知道我們要創建什么樣的Protocol呢?注意,我們的PoetryClientFactory中有一個protocol類變量:
class PoetryClientFactory(ClientFactory): task_num = 1 protocol = PoetryProtocol # tell base class what proto to build
基類Factory實現buildProtocol的過程是:安裝(創建一個實例)我們設置在protocol變量上的Protocol類與在這個實例(此處即PoetryProtocol的實例)的factory屬性上設置一個產生它的Factory的引用(此處即實例化PoetryProtocol的PoetryClientFactory)。這個過程如圖

正如我們提到的那樣,位于Protocol對象內的factory屬性字段允許在都由同一個factory產生的Protocol之間共享數據。由于Factories都是由用戶代碼來創建的(即在用戶的控制中),因此這個屬性也可以實現Protocol對象將數據傳遞回一開始初始化請求的代碼中來,這將在第六部分看到。
值得注意的是,雖然在Protocol中有一個屬性指向生成其的Protocol Factory,在Factory中也有一個變量指向一個Protocol類,但通常來說,一個Factory可以生成多個Protocol。
在Protocol創立的第二步便是通過makeConnection與一個Transport聯系起來。我們無需自己來實現這個函數而使用Twisted提供的默認實現。默認情況是,makeConnection將Transport的一個引用賦給(Protocol的)transport屬性,同時置(同樣是Protocol的)connected屬性為True

一旦初始化到這一步后,Protocol開始其真正的工作―將低層的數據流翻譯成高層的協議規定格式的消息。處理接收到數據的主要方法是dataReceived,我們的客戶端是這樣實現的:
def dataReceived(self, data): self.poem += data msg = 'Task %d: got %d bytes of poetry from %s' print msg % (self.task_num, len(data), self.transport.getHost())
每次dateReceved被調用就意味著我們得到一個新字符串。由于與異步I/O交互,我們不知道能接收到多少數據,因此將接收到的數據緩存下來直到完成一個完整的協議規定格式的消息。在我們的例子中,詩歌只有在連接關閉時才下載完畢,因此我們只是不斷地將接收到的數據添加到我們的.poem屬性字段中。
注意我們使用了Transport的getHost方法來取得數據來自的服務器信息。我們這樣做只是與前面的客戶端保持一致。相反,我們的代碼沒有必要這樣做,因為我們沒有向服務器發送任何消息,也就沒有必要知道服務器的信息了。
我們來看一下dataReceved運行時的快照。在2.0版本相同的目錄下有一個twisted-client-2/get-poetry-stack.py。它與2.0版本的不同之處只在于:
def dataReceived(self, data): traceback.print_stack() os._exit(0)
這樣一改,我們就能打印出跟蹤堆棧的信息,然后離開程序,可以用下面的命令來運行它:
python twisted-client-2/get-poetry-stack.py 10000
你會得到內容如下的跟蹤堆棧:
File "twisted-client-2/get-poetry-stack.py", line 125, in poetry_main()... # I removed a bunch of lines hereFile ".../twisted/internet/tcp.py", line 463, in doRead # Note the doRead callback return self.protocol.dataReceived(data)File "twisted-client-2/get-poetry-stack.py", line 58, in dataReceived traceback.print_stack()
看見沒,有我們在1.0版本客戶端的doRead回調函數。我們前面也提到過,Twisted在建立新抽象層會使用已有的實現而不是另起爐灶。因此必然會有一個IReadDescriptor的實例在辛苦的工作,它是由Twisted代碼而非我們自己的代碼來實現。如果你表示懷疑,那么就看看twisted.internet.tcp中的實現吧。如果你瀏覽代碼會發現,由同一個類實現了IWriteDescriptor與ITransport。因此 IReadDescriptor實際上就是變相的Transport類。可以用圖10來形象地說明dateReceived的回調過程:

一旦詩歌下載完成,PoetryProtocol就會通知它的PooetryClientFactory:
def connectionLost(self, reason): self.poemReceived(self.poem) def poemReceived(self, poem): self.factory.poem_finished(self.task_num, poem)
當transport的連接關閉時,conncetionLost回調會被激活。reason參數是一個twisted.python.failure.Failure的實例對象,其攜帶的信息能夠說明連接是被安全的關閉還是由于出錯被關閉的。我們的客戶端因認為總是能完整地下載完詩歌而忽略了這一參數。
工廠會在所有的詩歌都下載完畢后關閉reactor。再次重申:我們代碼的工作就是用來下載詩歌-這意味我們的PoetryClientFactory缺少復用性。我們將在下一部分修正這一缺陷。值得注意的是,poem_finish回調函數是如何通過跟蹤剩余詩歌數的:
... self.poetry_count -= 1 if self.poetry_count == 0: ...
如果我們采用多線程以讓每個線程分別下載詩歌,這樣我們就必須使用一把鎖來管理這段代碼以免多個線程在同一時間調用poem_finish。但是在交互式體系下就不必擔心了。由于reactor只能一次啟用一個回調。
新的客戶端實現在處理錯誤上也比先前的優雅的多,下面是PoetryClientFactory處理錯誤連接的回調實現代碼:
def clientConnectionFailed(self, connector, reason): print 'Failed to connect to:', connector.getDestination() self.poem_finished()
注意,回調是在工廠內部而不是協議內部實現。由于協議是在連接建立后才創建的,而工廠能夠在連接未能成功建立時捕獲消息。
新聞熱點
疑難解答
圖片精選