国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

Python使用asyncio包處理并發(fā)詳解

2020-01-04 16:42:43
字體:
供稿:網(wǎng)友

阻塞型I/O和GIL

CPython 解釋器本身就不是線程安全的,因此有全局解釋器鎖(GIL),一次只允許使用一個線程執(zhí)行 Python 字節(jié)碼。因此,一個 Python 進(jìn)程通常不能同時使用多個 CPU 核心。

然而,標(biāo)準(zhǔn)庫中所有執(zhí)行阻塞型 I/O 操作的函數(shù),在等待操作系統(tǒng)返回結(jié)果時都會釋放GIL。這意味著在 Python 語言這個層次上可以使用多線程,而 I/O 密集型 Python 程序能從中受益:一個 Python 線程等待網(wǎng)絡(luò)響應(yīng)時,阻塞型 I/O 函數(shù)會釋放 GIL,再運行一個線程。

asyncio

這個包使用事件循環(huán)驅(qū)動的協(xié)程實現(xiàn)并發(fā)。 asyncio 大量使用 yield from 表達(dá)式,因此與Python 舊版不兼容。

asyncio 包使用的“協(xié)程”是較嚴(yán)格的定義。適合asyncio API 的協(xié)程在定義體中必須使用 yield from,而不能使用 yield。此外,適合 asyncio 的協(xié)程要由調(diào)用方驅(qū)動,并由調(diào)用方通過 yield from 調(diào)用;

示例1

import threadingimport asyncio@asyncio.coroutinedef hello():  print('Start Hello', threading.currentThread())  yield from asyncio.sleep(5)  print('End Hello', threading.currentThread())@asyncio.coroutinedef world():  print('Start World', threading.currentThread())  yield from asyncio.sleep(3)  print('End World', threading.currentThread())# 獲取EventLoop:loop = asyncio.get_event_loop()tasks = [hello(), world()]# 執(zhí)行coroutineloop.run_until_complete(asyncio.wait(tasks))loop.close()

@asyncio.coroutine把生成器函數(shù)標(biāo)記為協(xié)程類型。
asyncio.sleep(3) 創(chuàng)建一個3秒后完成的協(xié)程。
loop.run_until_complete(future),運行直到future完成;如果參數(shù)是 coroutine object,則需要使用 ensure_future()函數(shù)包裝。
loop.close() 關(guān)閉事件循環(huán)

示例2

import asyncio@asyncio.coroutinedef worker(text):  """  協(xié)程運行的函數(shù)  :param text:  :return:  """  i = 0  while True:    print(text, i)    try:      yield from asyncio.sleep(.1)    except asyncio.CancelledError:      break    i += 1@asyncio.coroutinedef client(text, io_used):  worker_fu = asyncio.ensure_future(worker(text))  # 假裝等待I/O一段時間  yield from asyncio.sleep(io_used)  # 結(jié)束運行協(xié)程  worker_fu.cancel()  return 'done'loop = asyncio.get_event_loop()tasks = [client('xiaozhe', 3), client('zzzz', 5)]result = loop.run_until_complete(asyncio.wait(tasks))loop.close()print('Answer:', result)

解釋:

1. asyncio.ensure_future(coro_or_future, *, loop=None):計劃安排一個 coroutine object的執(zhí)行,返回一個 asyncio.Task object。
2. worker_fu.cancel(): 取消一個協(xié)程的執(zhí)行,拋出CancelledError異常。
3. asyncio.wait():協(xié)程的參數(shù)是一個由期物或協(xié)程構(gòu)成的可迭代對象; wait 會分別把各個協(xié)程包裝進(jìn)一個 Task 對象。

asyncio.Task 對象與threading.Thread對象的比較

asyncio.Task 對象差不多與 threading.Thread 對象等效。
Task 對象用于驅(qū)動協(xié)程, Thread 對象用于調(diào)用可調(diào)用的對象。
Task 對象不由自己動手實例化,而是通過把協(xié)程傳給 asyncio.ensure_future(…) 函數(shù)或loop.create_task(…) 方法獲取。
獲取的 Task 對象已經(jīng)排定了運行時間;Thread 實例則必須調(diào)用 start 方法,明確告知讓它運行。
如果想終止任務(wù),可以使用 Task.cancel() 實例方法,在協(xié)程內(nèi)部拋出CancelledError 異常。

線程與協(xié)程的安全比較

如果使用線程做過重要的編程,因為調(diào)度程序任何時候都能中斷線程。必須記住保留鎖,去保護(hù)程序中的重要部分,防止多步操作在執(zhí)行的過程中中斷,防止數(shù)據(jù)處于無效狀態(tài)。

協(xié)程默認(rèn)會做好全方位保護(hù),以防止中斷。我們必須顯式產(chǎn)出才能讓程序的余下部分運行。對協(xié)程來說,無需保留鎖,在多個線程之間同步操作,協(xié)程自身就會同步,因為在任意時刻只有一個協(xié)程運行。想交出控制權(quán)時,可以使用 yield 或 yield from 把控制權(quán)交還調(diào)度程序。這就是能夠安全地取消協(xié)程的原因:按照定義,協(xié)程只能在暫停的 yield處取消,因此可以處理 CancelledError 異常,執(zhí)行清理操作。

Future(期物)

通常情況下自己不應(yīng)該創(chuàng)建期物,而只能由并發(fā)框架(concurrent.futures 或 asyncio)實例化。原因很簡單:期物表示終將發(fā)生的事情,而確定某件事會發(fā)生的唯一方式是執(zhí)行的時間已經(jīng)排定。

asyncio.Future

在 asyncio 包中, BaseEventLoop.create_task(…) 方法接收一個協(xié)程,排定它的運行時間,然后返回一個 asyncio.Task 實例——也是 asyncio.Future 類的實例,因為 Task 是Future 的子類,用于包裝協(xié)程。

asyncio.ensure_future(coro_or_future, *, loop=None)

這個函數(shù)統(tǒng)一了協(xié)程和期物:第一個參數(shù)可以是二者中的任何一個。如果是 Future 或 Task 對象,那就原封不動地返回。如果是協(xié)程,那么 async 函數(shù)會調(diào)用loop.create_task(…) 方法創(chuàng)建 Task 對象。 loop= 關(guān)鍵字參數(shù)是可選的,用于傳入事件循環(huán);如果沒有傳入,那么 async 函數(shù)會通過調(diào)用 asyncio.get_event_loop() 函數(shù)獲取循環(huán)對象。

BaseEventLoop.create_task(coro)

這個方法排定協(xié)程的執(zhí)行時間,返回一個 asyncio.Task 對象。

asyncio 包中有多個函數(shù)會自動把參數(shù)指定的協(xié)程包裝在 asyncio.Task 對象中,例如 BaseEventLoop.run_until_complete(…) 方法。

asyncio.as_completed

為了集成進(jìn)度條,我們可以使用的是 as_completed 生成器函數(shù);幸好, asyncio 包提供了這個生成器函數(shù)的相應(yīng)版本。

使用asyncio和aiohttp包

從 Python 3.4 起, asyncio 包只直接支持 TCP 和 UDP。如果想使用 HTTP 或其他協(xié)議,那么要借助第三方包 aiohttp 。

cc_list = ['China', 'USA']@asyncio.coroutinedef get_flag(cc):  url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())  resp = yield from aiohttp.request('GET', url)  image = yield from resp.read()  return image@asyncio.coroutinedef download_one(name):   image = yield from get_flag(name)   save_flag(image, name.lower() + '.gif')  return nameloop = asyncio.get_event_loop() wait_coro = asyncio.wait([download_one(cc) for cc in sorted(cc_list)]) res, _ = loop.run_until_complete(wait_coro) loop.close()

使用 asyncio 包時,我們編寫的異步代碼中包含由 asyncio 本身驅(qū)動的協(xié)程(即委派生成器),而生成器最終把職責(zé)委托給 asyncio 包或第三方庫(如aiohttp)中的協(xié)程。這種處理方式相當(dāng)于架起了管道,讓 asyncio 事件循環(huán)(通過我們編寫的協(xié)程)驅(qū)動執(zhí)行低層異步 I/O 操作的庫函數(shù)。

避免阻塞型調(diào)用

有兩種方法能避免阻塞型調(diào)用中止整個應(yīng)用程序的進(jìn)程:
1. 在單獨的線程中運行各個阻塞型操作
2. 把每個阻塞型操作轉(zhuǎn)換成非阻塞的異步調(diào)用使用

多個線程是可以的,但是各個操作系統(tǒng)線程(Python 使用的是這種線程)消耗的內(nèi)存達(dá)兆字節(jié)(具體的量取決于操作系統(tǒng)種類)。如果要處理幾千個連接,而每個連接都使用一個線程的話,我們負(fù)擔(dān)不起。

把生成器當(dāng)作協(xié)程使用是異步編程的另一種方式。對事件循環(huán)來說,調(diào)用回調(diào)與在暫停的協(xié)程上調(diào)用 .send() 方法效果差不多。各個暫停的協(xié)程是要消耗內(nèi)存,但是比線程消耗的內(nèi)存數(shù)量級小。

上面的腳本為什么會很快

在上面的腳本中,調(diào)用 loop.run_until_complete 方法時,事件循環(huán)驅(qū)動各個download_one 協(xié)程,運行到第一個 yield from 表達(dá)式處時,那個表達(dá)式驅(qū)動各個get_flag 協(xié)程,然后在get_flag協(xié)程里面運行到第一個 yield from 表達(dá)式處時,調(diào)用 aiohttp.request(…)函數(shù)。這些調(diào)用都不會阻塞,因此在零點幾秒內(nèi)所有請求全部開始。

asyncio 的基礎(chǔ)設(shè)施獲得第一個響應(yīng)后,事件循環(huán)把響應(yīng)發(fā)給等待結(jié)果的 get_flag 協(xié)程。得到響應(yīng)后, get_flag 向前執(zhí)行到下一個 yield from 表達(dá)式處,調(diào)用resp.read() 方法,然后把控制權(quán)還給主循環(huán)。其他響應(yīng)會陸續(xù)返回。所有 get_ flag 協(xié)程都獲得結(jié)果后,委派生成器 download_one 恢復(fù),保存圖像文件。

async和await

為了簡化并更好地標(biāo)識異步IO,從Python 3.5開始引入了新的語法async和await,可以讓coroutine的代碼更簡潔易讀。

async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換。
1. 把@asyncio.coroutine替換為async
2. 把yield from替換為await

例如:

@asyncio.coroutinedef hello():  print("Hello world!")  r = yield from asyncio.sleep(1)  print("Hello again!")

等同于

async def hello():  print("Hello world!")  r = await asyncio.sleep(1)  print("Hello again!")

網(wǎng)站請求實例

import asyncioimport aiohttpurls = [  'http://www.163.com/',  'http://www.sina.com.cn/',  'https://www.hupu.com/',  'http://www.csdn.net/']async def get_url_data(u):  """  讀取url的數(shù)據(jù)  :param u:  :return:  """  print('running ', u)  async with aiohttp.ClientSession() as session:    async with session.get(u) as resp:      print(u, resp.status, type(resp.text()))      # print(await resp.text())  return resp.headersasync def request_url(u):  """  主調(diào)度函數(shù)  :param u:  :return:  """  res = await get_url_data(u)  return resloop = asyncio.get_event_loop()task_lists = asyncio.wait([request_url(u) for u in urls])all_res, _ = loop.run_until_complete(task_lists)loop.close()print(all_res)

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持VEVB武林網(wǎng)。

發(fā)表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發(fā)表
主站蜘蛛池模板: 哈尔滨市| 温宿县| 丹东市| 林口县| 绥宁县| 全南县| 桃园市| 遂平县| 中江县| 正蓝旗| 乌鲁木齐市| 内黄县| 肥城市| 绥芬河市| 科尔| 上林县| 平遥县| 来宾市| 林西县| 濉溪县| 夏津县| 南雄市| 杂多县| 翼城县| 威海市| 临武县| 织金县| 横山县| 西乡县| 自治县| 南木林县| 宜君县| 闽侯县| 交城县| 乐安县| 宜州市| 通辽市| 甘德县| 道孚县| 文登市| 鄂伦春自治旗|