引言
解釋器環境:python3.5.1
我們都知道python網絡編程的兩大必學模塊socket和socketserver,其中的socketserver是一個支持IO多路復用和多線程、多進程的模塊。一般我們在socketserver服務端代碼中都會寫這么一句:
server = socketserver.ThreadingTCPServer(settings.IP_PORT, MyServer)
ThreadingTCPServer這個類是一個支持多線程和TCP協議的socketserver,它的繼承關系是這樣的:
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
右邊的TCPServer實際上是它主要的功能父類,而左邊的ThreadingMixIn則是實現了多線程的類,它自己本身則沒有任何代碼。
MixIn在python的類命名中,很常見,一般被稱為“混入”,戲稱“亂入”,通常為了某種重要功能被子類繼承。
class ThreadingMixIn: daemon_threads = False def process_request_thread(self, request, client_address): try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
在ThreadingMixIn類中,其實就定義了一個屬性,兩個方法。在process_request方法中實際調用的正是python內置的多線程模塊threading。這個模塊是python中所有多線程的基礎,socketserver本質上也是利用了這個模塊。
一、線程
以上那一段,可以不用看!舉個例子,廠家要生產某個產品,在它的生產基地建設了很多廠房,每個廠房內又有多條流水生產線。所有廠房配合將整個產品生產出來,某個廠房內的所有流水線將這個廠房負責的產品部分生產出來。每個廠房擁有自己的材料庫,廠房內的生產線共享這些材料。而每一個廠家要實現生產必須擁有至少一個廠房一條生產線。那么這個廠家就是某個應用程序;每個廠房就是一個進程;每條生產線都是一個線程。
1.1 普通的多線程
在python中,threading模塊提供線程的功能。通過它,我們可以輕易的在進程中創建多個線程。下面是個例子:
import threadingimport time def show(arg): time.sleep(1) print('thread'+str(arg)) for i in range(10): t = threading.Thread(target=show, args=(i,)) t.start() print('main thread stop')上述代碼創建了10個“前臺”線程,然后控制器就交給了CPU,CPU根據指定算法進行調度,分片執行指令。
下面是Thread類的主要方法:
start 線程準備就緒,等待CPU調度
setName 為線程設置名稱
getName 獲取線程名稱
setDaemon 設置為后臺線程或前臺線程(默認)
如果是后臺線程,主線程執行過程中,后臺線程也在進行,主線程執行完畢后,后臺線程不論成功與否,均停止。如果是前臺線程,主線程執行過程中,前臺線程也在進行,主線程執行完畢后,等待前臺線程也執行完成后,程序停止。
join 逐個執行每個線程,執行完畢后繼續往下執行,該方法使得多線程變得無意義。
run 線程被cpu調度后自動執行線程對象的run方法
1.2 自定義線程類
對于threading模塊中的Thread類,本質上是執行了它的run方法。因此可以自定義線程類,讓它繼承Thread類,然后重寫run方法。
import threadingclass MyThreading(threading.Thread): def __init__(self,func,arg): super(MyThreading,self).__init__() self.func = func self.arg = arg def run(self): self.func(self.arg)def f1(args): print(args)obj = MyThreading(f1, 123)obj.start()
1.3 線程鎖
CPU執行任務時,在線程之間是進行隨機調度的,并且每個線程可能只執行n條代碼后就轉而執行另外一條線程。由于在一個進程中的多個線程之間是共享資源和數據的,這就容易造成資源搶奪或臟數據,于是就有了鎖的概念,限制某一時刻只有一個線程能訪問某個指定的數據。
1.3.1 未使用鎖
#!/usr/bin/env python# -*- coding:utf-8 -*-import threadingimport timeNUM = 0def show(): global NUM NUM += 1 name = t.getName() time.sleep(1) # 注意,這行語句的位置很重要,必須在NUM被修改后,否則觀察不到臟數據的現象。 print(name, "執行完畢后,NUM的值為: ", NUM)for i in range(10): t = threading.Thread(target=show) t.start()print('main thread stop')上述代碼運行后,結果如下:
main thread stopThread-1 執行完畢后,NUM的值為: 10Thread-2 執行完畢后,NUM的值為: 10Thread-4 執行完畢后,NUM的值為: 10Thread-9 執行完畢后,NUM的值為: 10Thread-3 執行完畢后,NUM的值為: 10Thread-6 執行完畢后,NUM的值為: 10Thread-8 執行完畢后,NUM的值為: 10Thread-7 執行完畢后,NUM的值為: 10Thread-5 執行完畢后,NUM的值為: 10Thread-10 執行完畢后,NUM的值為: 10
由此可見,由于線程同時訪問一個數據,產生了錯誤的結果。為了解決這個問題,python在threading模塊中定義了幾種線程鎖類,分別是:
1.3.2 普通鎖Lock和RLock
類名:Lock或RLock
普通鎖,也叫互斥鎖,是獨占的,同一時刻只有一個線程被放行。
import timeimport threadingNUM = 10def func(lock): global NUM lock.acquire() # 讓鎖開始起作用 NUM -= 1 time.sleep(1) print(NUM) lock.release() # 釋放鎖 lock = threading.Lock() # 實例化一個鎖對象for i in range(10): t = threading.Thread(target=func, args=(lock,)) # 記得把鎖當作參數傳遞給func參數 t.start()
以上是threading模塊的Lock類,它不支持嵌套鎖。RLcok類的用法和Lock一模一樣,但它支持嵌套,因此我們一般直接使用RLcok類。
1.3.3 信號量(Semaphore)
類名:BoundedSemaphore
這種鎖允許一定數量的線程同時更改數據,它不是互斥鎖。比如地鐵安檢,排隊人很多,工作人員只允許一定數量的人進入安檢區,其它的人繼續排隊。
#!/usr/bin/env python# -*- coding:utf-8 -*-import timeimport threadingdef run(n): semaphore.acquire() print("run the thread: %s" % n) time.sleep(1) semaphore.release()num = 0semaphore = threading.BoundedSemaphore(5) # 最多允許5個線程同時運行for i in range(20): t = threading.Thread(target=run, args=(i,)) t.start()1.3.4 事件(Event)
類名:Event
事件主要提供了三個方法 set、wait、clear。
事件機制:全局定義了一個“Flag”,如果“Flag”的值為False,那么當程序執行wait方法時就會阻塞,如果“Flag”值為True,那么wait方法時便不再阻塞。這種鎖,類似交通紅綠燈(默認是紅燈),它屬于在紅燈的時候一次性阻擋所有線程,在綠燈的時候,一次性放行所有的排隊中的線程。
clear:將“Flag”設置為False
set:將“Flag”設置為True
import threadingdef func(e,i): print(i) e.wait() # 檢測當前event是什么狀態,如果是紅燈,則阻塞,如果是綠燈則繼續往下執行。默認是紅燈。 print(i+100)event = threading.Event()for i in range(10): t = threading.Thread(target=func, args=(event, i)) t.start()event.clear() # 主動將狀態設置為紅燈inp = input(">>>")if inp == "1": event.set() # 主動將狀態設置為綠燈1.3.5 條件(condition)
類名:Condition
該機制會使得線程等待,只有滿足某條件時,才釋放n個線程。
import threadingdef condition(): ret = False r = input(">>>") if r == "yes": ret = True return retdef func(conn, i): print(i) conn.acquire() conn.wait_for(condition) # 這個方法接受一個函數的返回值 print(i+100) conn.release()c = threading.Condition()for i in range(10): t = threading.Thread(target=func, args=(c, i,)) t.start()上面的例子,每輸入一次“yes”放行了一個線程。下面這個,可以選擇一次放行幾個線程。
#!/usr/bin/env python# -*- coding:utf-8 -*-import threadingdef run(n): con.acquire() con.wait() print("run the thread: %s" %n) con.release()if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == "q": break # 下面這三行是固定語法 con.acquire() con.notify(int(inp)) # 這個方法接收一個整數,表示讓多少個線程通過 con.release()1.3 全局解釋器鎖(GIL)
既然介紹了多線程和線程鎖,那就不得不提及python的GIL,也就是全局解釋器鎖。在編程語言的世界,python因為GIL的問題廣受詬病,因為它在解釋器的層面限制了程序在同一時間只有一個線程被CPU實際執行,而不管你的程序里實際開了多少條線程。所以我們經常能發現,python中的多線程編程有時候效率還不如單線程,就是因為這個原因。那么,對于這個GIL,一些普遍的問題如下:
每種編程語言都有GIL嗎?
以python官方Cpython解釋器為代表....其他語言好像未見。
為什么要有GIL?
作為解釋型語言,Python的解釋器必須做到既安全又高效。我們都知道多線程編程會遇到的問題。解釋器要留意的是避免在不同的線程操作內部共享的數據。同時它還要保證在管理用戶線程時總是有最大化的計算資源。那么,不同線程同時訪問時,數據的保護機制是怎樣的呢?答案是解釋器全局鎖GIL。GIL對諸如當前線程狀態和為垃圾回收而用的堆分配對象這樣的東西的訪問提供著保護。
為什么不能去掉GIL?
首先,在早期的python解釋器依賴較多的全局狀態,傳承下來,使得想要移除當今的GIL變得更加困難。其次,對于程序員而言,僅僅是想要理解它的實現就需要對操作系統設計、多線程編程、C語言、解釋器設計和CPython解釋器的實現有著非常徹底的理解。
在1999年,針對Python1.5,一個“freethreading”補丁已經嘗試移除GIL,用細粒度的鎖來代替。然而,GIL的移除給單線程程序的執行速度帶來了一定的負面影響。當用單線程執行時,速度大約降低了40%。雖然使用兩個線程時在速度上得到了提高,但這個提高并沒有隨著核數的增加而線性增長。因此這個補丁沒有被采納。
另外,在python的不同解釋器實現中,如PyPy就移除了GIL,其執行速度更快(不單單是去除GIL的原因)。然而,我們通常使用的CPython占有著統治地位的使用量,所以,你懂的。
在Python 3.2中實現了一個新的GIL,并且帶著一些積極的結果。這是自1992年以來,GIL的一次最主要改變。舊的GIL通過對Python指令進行計數來確定何時放棄GIL。在新的GIL實現中,用一個固定的超時時間來指示當前的線程以放棄這個鎖。在當前線程保持這個鎖,且當第二個線程請求這個鎖的時候,當前線程就會在5ms后被強制釋放掉這個鎖(這就是說,當前線程每5ms就要檢查其是否需要釋放這個鎖)。當任務是可行的時候,這會使得線程間的切換更加可預測。
GIL對我們有什么影響?
最大的影響是我們不能隨意使用多線程。要區分任務場景。
在單核cpu情況下對性能的影響可以忽略不計,多線程多進程都差不多。在多核CPU時,多線程效率較低。GIL對單進程和多進程沒有影響。
在實際使用中有什么好的建議?
建議在IO密集型任務中使用多線程,在計算密集型任務中使用多進程。深入研究python的協程機制,你會有驚喜的。
更多的詳細介紹和說明請參考下面的文獻:
1.4 定時器(Timer)
定時器,指定n秒后執行某操作。很簡單但很使用的東西。
from threading import Timerdef hello(): print("hello, world")t = Timer(1, hello) # 表示1秒后執行hello函數t.start() 1.5 隊列
通常而言,隊列是一種先進先出的數據結構,與之對應的是堆棧這種后進先出的結構。但是在python中,它內置了一個queue模塊,它不但提供普通的隊列,還提供一些特殊的隊列。具體如下:
1.5.1 Queue:先進先出隊列
這是最常用也是最普遍的隊列,先看一個例子。
import queueq = queue.Queue(5)q.put(11)q.put(22)q.put(33)print(q.get())print(q.get())print(q.get())
Queue類的參數和方法:
maxsize 隊列的最大元素個數,也就是queue.Queue(5)中的5。當隊列內的元素達到這個值時,后來的元素默認會阻塞,等待隊列騰出位置。
def __init__(self, maxsize=0):self.maxsize = maxsizeself._init(maxsize)
qsize() 獲取當前隊列中元素的個數,也就是隊列的大小empty() 判斷當前隊列是否為空,返回True或者Falsefull() 判斷當前隊列是否已滿,返回True或者False
put(self, block=True, timeout=None)
往隊列里放一個元素,默認是阻塞和無時間限制的。如果,block設置為False,則不阻塞,這時,如果隊列是滿的,放不進去,就會彈出異常。如果timeout設置為n秒,則會等待這個秒數后才put,如果put不進去則彈出異常。
get(self, block=True, timeout=None)
從隊列里獲取一個元素。參數和put是一樣的意思。
join() 阻塞進程,直到所有任務完成,需要配合另一個方法task_done。
def join(self): with self.all_tasks_done: while self.unfinished_tasks: self.all_tasks_done.wait()
task_done() 表示某個任務完成。每一條get語句后需要一條task_done。
import queueq = queue.Queue(5)q.put(11)q.put(22)print(q.get())q.task_done()print(q.get())q.task_done()q.join()
1.5.2 LifoQueue:后進先出隊列
類似于“堆?!?,后進先出。也較常用。
import queueq = queue.LifoQueue()q.put(123)q.put(456)print(q.get())
上述代碼運行結果是:456
1.5.3 PriorityQueue:優先級隊列
帶有權重的隊列,每個元素都是一個元組,前面的數字表示它的優先級,數字越小優先級越高,同樣的優先級先進先出
q = queue.PriorityQueue()q.put((1,"alex1"))q.put((1,"alex2"))q.put((1,"alex3"))q.put((3,"alex3"))print(q.get())
1.5.4 deque:雙向隊列
Queue和LifoQueue的“綜合體”,雙向進出。方法較多,使用復雜,慎用!
q = queue.deque()q.append(123)q.append(333)q.appendleft(456)q.pop()q.popleft()
1.6 生產者消費者模型
利用多線程和隊列可以搭建一個生產者消費者模型,用于處理大并發的服務。
在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序的整體處理數據的速度。
為什么要使用生產者和消費者模式
在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這個問題于是引入了生產者和消費者模式。
什么是生產者消費者模式
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力。
這個阻塞隊列就是用來給生產者和消費者解耦的。縱觀大多數設計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學習一些設計模式的過程中,如果先找到這個模式的第三者,能幫助我們快速熟悉一個設計模式。
以上摘自方騰飛的《聊聊并發――生產者消費者模式》
下面是一個簡單的廚師做包子,顧客吃包子的例子。
#!/usr/bin/env python# -*- coding:utf-8 -*-# Author:Liu Jiangimport timeimport queueimport threadingq = queue.Queue(10)def productor(i): while True: q.put("廚師 %s 做的包子!"%i) time.sleep(2)def consumer(k): while True: print("顧客 %s 吃了一個 %s"%(k,q.get())) time.sleep(1)for i in range(3): t = threading.Thread(target=productor,args=(i,)) t.start()for k in range(10): v = threading.Thread(target=consumer,args=(k,)) v.start()1.7 線程池
在使用多線程處理任務時也不是線程越多越好,由于在切換線程的時候,需要切換上下文環境,依然會造成cpu的大量開銷。為解決這個問題,線程池的概念被提出來了。預先創建好一個較為優化的數量的線程,讓過來的任務立刻能夠使用,就形成了線程池。在python中,沒有內置的較好的線程池模塊,需要自己實現或使用第三方模塊。下面是一個簡單的線程池:
#!/usr/bin/env python# -*- coding:utf-8 -*-# Author:Liu Jiangimport queueimport timeimport threadingclass MyThreadPool: def __init__(self, maxsize=5): self.maxsize = maxsize self._q = queue.Queue(maxsize) for i in range(maxsize): self._q.put(threading.Thread) def get_thread(self): return self._q.get() def add_thread(self): self._q.put(threading.Thread)def task(i, pool): print(i) time.sleep(1) pool.add_thread()pool = MyThreadPool(5)for i in range(100): t = pool.get_thread() obj = t(target=task, args=(i,pool)) obj.start()
上面的例子是把線程類當做元素添加到隊列內。實現方法比較糙,每個線程使用后就被拋棄,一開始就將線程開到滿,因此性能較差。下面是一個相對好一點的例子:
#!/usr/bin/env python# -*- coding:utf-8 -*-import queueimport threadingimport contextlibimport timeStopEvent = object() # 創建空對象class ThreadPool(object): def __init__(self, max_num, max_task_num = None): if max_task_num: self.q = queue.Queue(max_task_num) else: self.q = queue.Queue() self.max_num = max_num self.cancel = False self.terminal = False self.generate_list = [] self.free_list = [] def run(self, func, args, callback=None): """ 線程池執行一個任務 :param func: 任務函數 :param args: 任務函數所需參數 :param callback: 任務執行失敗或成功后執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數) :return: 如果線程池已經終止,則返回True否則None """ if self.cancel: return if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() w = (func, args, callback,) self.q.put(w) def generate_thread(self): """ 創建一個線程 """ t = threading.Thread(target=self.call) t.start() def call(self): """ 循環去獲取任務函數并執行任務函數 """ current_thread = threading.currentThread self.generate_list.append(current_thread) event = self.q.get() while event != StopEvent: func, arguments, callback = event try: result = func(*arguments) success = True except Exception as e: success = False result = None if callback is not None: try: callback(success, result) except Exception as e: pass with self.worker_state(self.free_list, current_thread): if self.terminal: event = StopEvent else: event = self.q.get() else: self.generate_list.remove(current_thread) def close(self): """ 執行完所有的任務后,所有線程停止 """ self.cancel = True full_size = len(self.generate_list) while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self): """ 無論是否還有任務,終止線程 """ self.terminal = True while self.generate_list: self.q.put(StopEvent) self.q.empty() @contextlib.contextmanager def worker_state(self, state_list, worker_thread): """ 用于記錄線程中正在等待的線程數 """ state_list.append(worker_thread) try: yield finally: state_list.remove(worker_thread)# How to usepool = ThreadPool(5)def callback(status, result): # status, execute action status # result, execute action return value passdef action(i): print(i)for i in range(30): ret = pool.run(action, (i,), callback)time.sleep(5)print(len(pool.generate_list), len(pool.free_list))print(len(pool.generate_list), len(pool.free_list))# pool.close()# pool.terminate()
二、進程
在python中multiprocess模塊提供了Process類,實現進程相關的功能。但是,由于它是基于fork機制的,因此不被windows平臺支持。想要在windows中運行,必須使用if __name__ == '__main__:的方式,顯然這只能用于調試和學習,不能用于實際環境。
(PS:在這里我必須吐槽一下python的包、模塊和類的組織結構。在multiprocess中你既可以import大寫的Process,也可以import小寫的process,這兩者是完全不同的東西。這種情況在python中很多,新手容易傻傻分不清。)
下面是一個簡單的多進程例子,你會發現Process的用法和Thread的用法幾乎一模一樣。
from multiprocessing import Processdef foo(i): print("This is Process ", i)if __name__ == '__main__': for i in range(5): p = Process(target=foo, args=(i,)) p.start()2.1 進程的數據共享
每個進程都有自己獨立的數據空間,不同進程之間通常是不能共享數據,創建一個進程需要非常大的開銷。
from multiprocessing import Processlist_1 = []def foo(i): list_1.append(i) print("This is Process ", i," and list_1 is ", list_1)if __name__ == '__main__': for i in range(5): p = Process(target=foo, args=(i,)) p.start() print("The end of list_1:", list_1)運行上面的代碼,你會發現列表list_1在各個進程中只有自己的數據,完全無法共享。想要進程之間進行資源共享可以使用queues/Array/Manager這三個multiprocess模塊提供的類。
2.1.1 使用Array共享數據
from multiprocessing import Processfrom multiprocessing import Arraydef Foo(i,temp): temp[0] += 100 for item in temp: print(i,'----->',item)if __name__ == '__main__': temp = Array('i', [11, 22, 33, 44]) for i in range(2): p = Process(target=Foo, args=(i,temp)) p.start()對于Array數組類,括號內的“i”表示它內部的元素全部是int類型,而不是指字符i,列表內的元素可以預先指定,也可以指定列表長度。概括的來說就是Array類在實例化的時候就必須指定數組的數據類型和數組的大小,類似temp = Array('i', 5)。對于數據類型有下面的表格對應:
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
2.1.2 使用Manager共享數據
from multiprocessing import Process,Managerdef Foo(i,dic): dic[i] = 100+i print(dic.values())if __name__ == '__main__': manage = Manager() dic = manage.dict() for i in range(10): p = Process(target=Foo, args=(i,dic)) p.start() p.join()
Manager比Array要好用一點,因為它可以同時保存多種類型的數據格式。
2.1.3 使用queues的Queue類共享數據
import multiprocessingfrom multiprocessing import Processfrom multiprocessing import queuesdef foo(i,arg): arg.put(i) print('The Process is ', i, "and the queue's size is ", arg.qsize())if __name__ == "__main__": li = queues.Queue(20, ctx=multiprocessing) for i in range(10): p = Process(target=foo, args=(i,li,)) p.start()這里就有點類似上面的隊列了。從運行結果里,你還能發現數據共享中存在的臟數據問題。另外,比較悲催的是multiprocessing里還有一個Queue,一樣能實現這個功能。
2.2 進程鎖
為了防止和多線程一樣的出現數據搶奪和臟數據的問題,同樣需要設置進程鎖。與threading類似,在multiprocessing里也有同名的鎖類RLock, Lock, Event, Condition, Semaphore,連用法都是一樣樣的?。ㄟ@個我喜歡)
from multiprocessing import Processfrom multiprocessing import queuesfrom multiprocessing import Arrayfrom multiprocessing import RLock, Lock, Event, Condition, Semaphoreimport multiprocessingimport timedef foo(i,lis,lc): lc.acquire() lis[0] = lis[0] - 1 time.sleep(1) print('say hi',lis[0]) lc.release()if __name__ == "__main__": # li = [] li = Array('i', 1) li[0] = 10 lock = RLock() for i in range(10): p = Process(target=foo,args=(i,li,lock)) p.start()2.3 進程池
既然有線程池,那必然也有進程池。但是,python給我們內置了一個進程池,不需要像線程池那樣需要自定義,你只需要簡單的from multiprocessing import Pool。
#!/usr/bin/env python# -*- coding:utf-8 -*-from multiprocessing import Poolimport timedef f1(args): time.sleep(1) print(args)if __name__ == '__main__': p = Pool(5) for i in range(30): p.apply_async(func=f1, args= (i,)) p.close() # 等子進程執行完畢后關閉線程池 # time.sleep(2) # p.terminate() # 立刻關閉線程池 p.join()
進程池內部維護一個進程序列,當使用時,去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有以下幾個主要方法:
三、協程
線程和進程的操作是由程序觸發系統接口,最后的執行者是系統,它本質上是操作系統提供的功能。而協程的操作則是程序員指定的,在python中通過yield,人為的實現并發處理。
協程存在的意義:對于多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時。協程,則只使用一個線程,分解一個線程成為多個“微線程”,在一個線程中規定某個代碼塊的執行順序。
協程的適用場景:當程序中存在大量不需要CPU的操作時(IO)。
在不需要自己“造輪子”的年代,同樣有第三方模塊為我們提供了高效的協程,這里介紹一下greenlet和gevent。本質上,gevent是對greenlet的高級封裝,因此一般用它就行,這是一個相當高效的模塊。
在使用它們之前,需要先安裝,可以通過源碼,也可以通過pip。
3.1 greenlet
from greenlet import greenletdef test1(): print(12) gr2.switch() print(34) gr2.switch()def test2(): print(56) gr1.switch() print(78)gr1 = greenlet(test1)gr2 = greenlet(test2)gr1.switch()
實際上,greenlet就是通過switch方法在不同的任務之間進行切換。
3.2 gevent
from gevent import monkey; monkey.patch_all()import geventimport requestsdef f(url): print('GET: %s' % url) resp = requests.get(url) data = resp.text print('%d bytes received from %s.' % (len(data), url))gevent.joinall([ gevent.spawn(f, 'https://www.python.org/'), gevent.spawn(f, 'https://www.yahoo.com/'), gevent.spawn(f, 'https://github.com/'),])通過joinall將任務f和它的參數進行統一調度,實現單線程中的協程。代碼封裝層次很高,實際使用只需要了解它的幾個主要方法即可。
新聞熱點
疑難解答
圖片精選