国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁 > 編程 > Python > 正文

python并發(fā)和異步編程實(shí)例

2020-01-04 14:04:11
字體:
供稿:網(wǎng)友

關(guān)于并發(fā)、并行、同步阻塞、異步非阻塞、線程、進(jìn)程、協(xié)程等這些概念,單純通過文字恐怕很難有比較深刻的理解,本文就通過代碼一步步實(shí)現(xiàn)這些并發(fā)和異步編程,并進(jìn)行比較。解釋器方面本文選擇python3,畢竟python3才是python的未來,并且python3用原生的庫實(shí)現(xiàn)協(xié)程已經(jīng)非常方便了。 

1、準(zhǔn)備階段 

下面為所有測試代碼所需要的包

#! python3# coding:utf-8import socketfrom concurrent import futuresfrom selectors import DefaultSelector,EVENT_WRITE,EVENT_READimport asyncioimport aiohttpimport timefrom time import ctime

在進(jìn)行不同實(shí)現(xiàn)方式的比較時,實(shí)現(xiàn)場景就是在進(jìn)行爬蟲開發(fā)的時候通過向?qū)Ψ骄W(wǎng)站發(fā)起一系列的http請求訪問,統(tǒng)計(jì)耗時來判斷實(shí)現(xiàn)方式的優(yōu)劣,具體地,通過建立通信套接字,訪問新浪主頁,返回源碼,作為一次請求。先實(shí)現(xiàn)一個裝飾器用來統(tǒng)計(jì)函數(shù)的執(zhí)行時間:

def tsfunc(func):  def wrappedFunc(*args,**kargs):    start = time.clock()    action = func(*args,**kargs)    time_delta = time.clock() - start    print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))    return action  return wrappedFunc

輸出的格式為:當(dāng)前時間,調(diào)用的函數(shù),函數(shù)的執(zhí)行時間。 

2、阻塞/非阻塞和同步/異步 

這兩對概念不是很好區(qū)分,從定義上理解: 

阻塞:在進(jìn)行socket通信過程中,一個線程發(fā)起請求,如果當(dāng)前請求沒有返回結(jié)果,則進(jìn)入sleep狀態(tài),期間線程掛起不能做其他操作,直到有返回結(jié)果,或者超時(如果設(shè)置超時的話)。 
非阻塞:與阻塞相似,只不過在等待請求結(jié)果時,線程并不掛起而是進(jìn)行其他操作,即在不能立刻得到結(jié)果之前,該函數(shù)不會阻掛起當(dāng)前線程,而會立刻返回。 
同步:同步和阻塞比較相似,但是二者并不是同一個概念,同步是指完成事件的邏輯,是指一件事完成之后,再完成第二件事,以此類推… 
異步:異步和非阻塞比較類似,異步的概念和同步相對。當(dāng)一個異步過程調(diào)用發(fā)出后,調(diào)用者不能立刻得到結(jié)果。實(shí)際處理這個調(diào)用的部件在完成后,通過狀態(tài)、通知和回調(diào)來通知調(diào)用者,實(shí)現(xiàn)異步的方式通俗講就是“等會再告訴你”。 

1)阻塞方式 

回到代碼上,首先實(shí)現(xiàn)阻塞方式的請求函數(shù):

def blocking_way():  sock = socket.socket()  sock.connect(('www.sina.com',80))  request = 'GET / HTTP/1.0/r/nHOST:www.sina.com/r/n/r/n'  sock.send(request.encode('ascii'))  response = b''  chunk = sock.recv(4096)  while chunk:    response += chunk    chunk = sock.recv(4096)  return response

測試線程、多進(jìn)程和多線程

# 阻塞無并發(fā)@tsfuncdef sync_way():  res = []  for i in range(10):    res.append(blocking_way())  return len(res)@tsfunc# 阻塞、多進(jìn)程def process_way():  worker = 10  with futures.ProcessPoolExecutor(worker) as executor:    futs = {executor.submit(blocking_way) for i in range(10)}  return len([fut.result() for fut in futs])# 阻塞、多線程@tsfuncdef thread_way():  worker = 10  with futures.ThreadPoolExecutor(worker) as executor:    futs = {executor.submit(blocking_way) for i in range(10)}  return len([fut.result() for fut in futs])

運(yùn)行結(jié)果:

[Wed Dec 13 16:52:25 2017] sync_way() called, time delta: 0.06371647809425328[Wed Dec 13 16:52:28 2017] process_way() called, time delta: 2.31437644946734[Wed Dec 13 16:52:28 2017] thread_way() called, time delta: 0.010172946070299727

可見與非并發(fā)的方式相比,啟動10個進(jìn)程完成10次請求訪問耗費(fèi)的時間最長,進(jìn)程確實(shí)需要很大的系統(tǒng)開銷,相比多線程則效果好得多,啟動10個線程并發(fā)請求,比順序請求速度快了6倍左右。 

2)非阻塞方式 

實(shí)現(xiàn)非阻塞的請求代碼,與阻塞方式的區(qū)別在于等待請求時并不掛起而是直接返回,為了確保能正確讀取消息,最原始的方式就是循環(huán)讀取,知道讀取完成為跳出循環(huán),代碼如下:

 

def nonblocking_way():  sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)  sock.setblocking(False)  try:    sock.connect(('www.sina.com', 80))  except BlockingIOError:    pass  request = 'GET / HTTP/1.0/r/nHost: www.sina.com/r/n/r/n'  data = request.encode('ascii')  while True:    try:      sock.send(data)      break    except OSError:      pass  response = b''  while True:    try:      chunk = sock.recv(4096)      while chunk:        response += chunk        chunk = sock.recv(4096)      break    except OSError:      pass  return response

測試單線程異步非阻塞方式:

@tsfuncdef async_way():  res = []  for i in range(10):    res.append(nonblocking_way())  return len(res)

測試結(jié)果與單線程同步阻塞方式相比:

[Wed Dec 13 17:18:30 2017] sync_way() called, time delta: 0.07342884475822574[Wed Dec 13 17:18:30 2017] async_way() called, time delta: 0.06509009095694886

非阻塞方式起到了一定的效果,但是并不明顯,原因肯定是讀取消息的時候雖然不是在線程掛起的時候而是在循環(huán)讀取消息的時候浪費(fèi)了時間,如果大部分時間讀浪費(fèi)了并沒有發(fā)揮異步編程的威力,解決的辦法就是后面要說的【事件驅(qū)動】 

3、回調(diào)、生成器和協(xié)程 

a、回調(diào)

 

class Crawler():  def __init__(self,url):    self.url = url    self.sock = None    self.response = b''  def fetch(self):    self.sock = socket.socket()    self.sock.setblocking(False)    try:      self.sock.connect(('www.sina.com',80))    except BlockingIOError:      pass    selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)  def connected(self,key,mask):    selector.unregister(key.fd)    get = 'GET {0} HTTP/1.0/r/nHost:www.sina.com/r/n/r/n'.format(self.url)    self.sock.send(get.encode('ascii'))    selector.register(key.fd,EVENT_READ,self.read_response)  def read_response(self,key,mask):    global stopped    while True:      try:        chunk = self.sock.recv(4096)        if chunk:          self.response += chunk          chunk = self.sock.recv(4096)        else:          selector.unregister(key.fd)          urls_todo.remove(self.url)          if not urls_todo:            stopped = True        break      except:        passdef loop():  while not stopped:    events = selector.select()    for event_key,event_mask in events:      callback = event_key.data      callback(event_key,event_mask) @tsfuncdef callback_way():  for url in urls_todo:    crawler = Crawler(url)    crawler.fetch()  loop1()

這是通過傳統(tǒng)回調(diào)方式實(shí)現(xiàn)的異步編程,結(jié)果如下: 

[Tue Mar 27 17:52:49 2018] callback_way() called, time delta: 0.054735804048789374 

b、生成器

class Crawler2:  def __init__(self, url):    self.url = url    self.response = b''  def fetch(self):    global stopped    sock = socket.socket()    yield from connect(sock, ('www.sina.com', 80))    get = 'GET {0} HTTP/1.0/r/nHost: www.sina.com/r/n/r/n'.format(self.url)    sock.send(get.encode('ascii'))    self.response = yield from read_all(sock)    urls_todo.remove(self.url)    if not urls_todo:      stopped = Trueclass Task:  def __init__(self, coro):    self.coro = coro    f = Future1()    f.set_result(None)    self.step(f)  def step(self, future):    try:      # send會進(jìn)入到coro執(zhí)行, 即fetch, 直到下次yield      # next_future 為yield返回的對象      next_future = self.coro.send(future.result)    except StopIteration:      return    next_future.add_done_callback(self.step)def loop1():  while not stopped:    events = selector.select()    for event_key,event_mask in events:      callback = event_key.data      callback()

運(yùn)行結(jié)果如下: 

[Tue Mar 27 17:54:27 2018] generate_way() called, time delta: 0.2914336347673473

c、協(xié)程

def nonblocking_way():  sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)  sock.setblocking(False)  try:    sock.connect(('www.sina.com', 80))  except BlockingIOError:    pass  request = 'GET / HTTP/1.0/r/nHost: www.sina.com/r/n/r/n'  data = request.encode('ascii')  while True:    try:      sock.send(data)      break    except OSError:      pass  response = b''  while True:    try:      chunk = sock.recv(4096)      while chunk:        response += chunk        chunk = sock.recv(4096)      break    except OSError:      pass  return response@tsfuncdef asyncio_way():    tasks = [fetch(host+url) for url in urls_todo]    loop.run_until_complete(asyncio.gather(*tasks))    return (len(tasks))

運(yùn)行結(jié)果: 

[Tue Mar 27 17:56:17 2018] asyncio_way() called, time delta: 0.43688060698484166

到此終于把并發(fā)和異步編程實(shí)例代碼測試完,下邊貼出全部代碼,共讀者自行測試,在任務(wù)量加大時,相信結(jié)果會大不一樣。

#! python3# coding:utf-8import socketfrom concurrent import futuresfrom selectors import DefaultSelector,EVENT_WRITE,EVENT_READimport asyncioimport aiohttpimport timefrom time import ctimedef tsfunc(func):  def wrappedFunc(*args,**kargs):    start = time.clock()    action = func(*args,**kargs)    time_delta = time.clock() - start    print ('[{0}] {1}() called, time delta: {2}'.format(ctime(),func.__name__,time_delta))    return action  return wrappedFuncdef blocking_way():  sock = socket.socket()  sock.connect(('www.sina.com',80))  request = 'GET / HTTP/1.0/r/nHOST:www.sina.com/r/n/r/n'  sock.send(request.encode('ascii'))  response = b''  chunk = sock.recv(4096)  while chunk:    response += chunk    chunk = sock.recv(4096)  return responsedef nonblocking_way():  sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)  sock.setblocking(False)  try:    sock.connect(('www.sina.com', 80))  except BlockingIOError:    pass  request = 'GET / HTTP/1.0/r/nHost: www.sina.com/r/n/r/n'  data = request.encode('ascii')  while True:    try:      sock.send(data)      break    except OSError:      pass  response = b''  while True:    try:      chunk = sock.recv(4096)      while chunk:        response += chunk        chunk = sock.recv(4096)      break    except OSError:      pass  return responseselector = DefaultSelector()stopped = Falseurls_todo = ['/','/1','/2','/3','/4','/5','/6','/7','/8','/9']class Crawler():  def __init__(self,url):    self.url = url    self.sock = None    self.response = b''  def fetch(self):    self.sock = socket.socket()    self.sock.setblocking(False)    try:      self.sock.connect(('www.sina.com',80))    except BlockingIOError:      pass    selector.register(self.sock.fileno(),EVENT_WRITE,self.connected)  def connected(self,key,mask):    selector.unregister(key.fd)    get = 'GET {0} HTTP/1.0/r/nHost:www.sina.com/r/n/r/n'.format(self.url)    self.sock.send(get.encode('ascii'))    selector.register(key.fd,EVENT_READ,self.read_response)  def read_response(self,key,mask):    global stopped    while True:      try:        chunk = self.sock.recv(4096)        if chunk:          self.response += chunk          chunk = self.sock.recv(4096)        else:          selector.unregister(key.fd)          urls_todo.remove(self.url)          if not urls_todo:            stopped = True        break      except:        passdef loop():  while not stopped:    events = selector.select()    for event_key,event_mask in events:      callback = event_key.data      callback(event_key,event_mask)# 基于生成器的協(xié)程class Future:  def __init__(self):    self.result = None    self._callbacks = []  def add_done_callback(self,fn):    self._callbacks.append(fn)  def set_result(self,result):    self.result = result    for fn in self._callbacks:      fn(self)class Crawler1():  def __init__(self,url):    self.url = url    self.response = b''  def fetch(self):    sock = socket.socket()    sock.setblocking(False)    try:      sock.connect(('www.sina.com',80))    except BlockingIOError:      pass    f = Future()    def on_connected():      f.set_result(None)    selector.register(sock.fileno(),EVENT_WRITE,on_connected)    yield f    selector.unregister(sock.fileno())    get = 'GET {0} HTTP/1.0/r/nHost: www.sina.com/r/n/r/n'.format(self.url)    sock.send(get.encode('ascii'))    global stopped    while True:      f = Future()      def on_readable():        f.set_result(sock.recv(4096))      selector.register(sock.fileno(),EVENT_READ,on_readable)      chunk = yield f      selector.unregister(sock.fileno())      if chunk:        self.response += chunk      else:        urls_todo.remove(self.url)        if not urls_todo:          stopped = True        break# yield from 改進(jìn)的生成器協(xié)程class Future1:  def __init__(self):    self.result = None    self._callbacks = []  def add_done_callback(self,fn):    self._callbacks.append(fn)  def set_result(self,result):    self.result = result    for fn in self._callbacks:      fn(self)  def __iter__(self):    yield self    return self.resultdef connect(sock, address):  f = Future1()  sock.setblocking(False)  try:    sock.connect(address)  except BlockingIOError:    pass  def on_connected():    f.set_result(None)  selector.register(sock.fileno(), EVENT_WRITE, on_connected)  yield from f  selector.unregister(sock.fileno())def read(sock):  f = Future1()  def on_readable():    f.set_result(sock.recv(4096))  selector.register(sock.fileno(), EVENT_READ, on_readable)  chunk = yield from f  selector.unregister(sock.fileno())  return chunkdef read_all(sock):  response = []  chunk = yield from read(sock)  while chunk:    response.append(chunk)    chunk = yield from read(sock)  return b''.join(response)class Crawler2:  def __init__(self, url):    self.url = url    self.response = b''  def fetch(self):    global stopped    sock = socket.socket()    yield from connect(sock, ('www.sina.com', 80))    get = 'GET {0} HTTP/1.0/r/nHost: www.sina.com/r/n/r/n'.format(self.url)    sock.send(get.encode('ascii'))    self.response = yield from read_all(sock)    urls_todo.remove(self.url)    if not urls_todo:      stopped = Trueclass Task:  def __init__(self, coro):    self.coro = coro    f = Future1()    f.set_result(None)    self.step(f)  def step(self, future):    try:      # send會進(jìn)入到coro執(zhí)行, 即fetch, 直到下次yield      # next_future 為yield返回的對象      next_future = self.coro.send(future.result)    except StopIteration:      return    next_future.add_done_callback(self.step)def loop1():  while not stopped:    events = selector.select()    for event_key,event_mask in events:      callback = event_key.data      callback()# asyncio 協(xié)程host = 'http://www.sina.com'loop = asyncio.get_event_loop()async def fetch(url):  async with aiohttp.ClientSession(loop=loop) as session:    async with session.get(url) as response:      response = await response.read()      return response@tsfuncdef asyncio_way():    tasks = [fetch(host+url) for url in urls_todo]    loop.run_until_complete(asyncio.gather(*tasks))    return (len(tasks))@tsfuncdef sync_way():  res = []  for i in range(10):    res.append(blocking_way())  return len(res)@tsfuncdef process_way():  worker = 10  with futures.ProcessPoolExecutor(worker) as executor:    futs = {executor.submit(blocking_way) for i in range(10)}  return len([fut.result() for fut in futs])@tsfuncdef thread_way():  worker = 10  with futures.ThreadPoolExecutor(worker) as executor:    futs = {executor.submit(blocking_way) for i in range(10)}  return len([fut.result() for fut in futs])@tsfuncdef async_way():  res = []  for i in range(10):    res.append(nonblocking_way())  return len(res)@tsfuncdef callback_way():  for url in urls_todo:    crawler = Crawler(url)    crawler.fetch()  loop1()@tsfuncdef generate_way():  for url in urls_todo:    crawler = Crawler2(url)    Task(crawler.fetch())  loop1()if __name__ == '__main__':  #sync_way()  #process_way()  #thread_way()  #async_way()  #callback_way()  #generate_way()  asyncio_way()

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持VEVB武林網(wǎng)。


注:相關(guān)教程知識閱讀請移步到python教程頻道。
發(fā)表評論 共有條評論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 九龙县| 巴彦县| 炎陵县| 个旧市| 交城县| 彭水| 平陆县| 壤塘县| 肇庆市| 随州市| 来安县| 易门县| 靖边县| 景东| 黔东| 曲水县| 称多县| 大埔县| 化隆| 洪雅县| 卫辉市| 兰坪| 太和县| 上思县| 噶尔县| 台东市| 都匀市| 博野县| 庆云县| 丰都县| 江阴市| 柘城县| 祁东县| 梅州市| 登封市| 阿拉善左旗| 双峰县| 保亭| 沧源| 五峰| 方正县|