国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 編程 > Python > 正文

Python實(shí)現(xiàn)的異步代理爬蟲及代理池

2019-11-25 16:18:49
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

使用python asyncio實(shí)現(xiàn)了一個(gè)異步代理池,根據(jù)規(guī)則爬取代理網(wǎng)站上的免費(fèi)代理,在驗(yàn)證其有效后存入redis中,定期擴(kuò)展代理的數(shù)量并檢驗(yàn)池中代理的有效性,移除失效的代理。同時(shí)用aiohttp實(shí)現(xiàn)了一個(gè)server,其他的程序可以通過(guò)訪問(wèn)相應(yīng)的url來(lái)從代理池中獲取代理。

源碼

Github

環(huán)境

  • Python 3.5+
  • Redis
  • PhantomJS(可選)
  • Supervisord(可選)

因?yàn)榇a中大量使用了asyncio的async和await語(yǔ)法,它們是在Python3.5中才提供的,所以最好使用Python3.5及以上的版本,我使用的是Python3.6。

依賴

  • redis
  • aiohttp
  • bs4
  • lxml
  • requests
  • selenium

selenium包主要是用來(lái)操作PhantomJS的。

下面來(lái)對(duì)代碼進(jìn)行說(shuō)明。

1. 爬蟲部分

核心代碼

async def start(self): for rule in self._rules: parser = asyncio.ensure_future(self._parse_page(rule)) # 根據(jù)規(guī)則解析頁(yè)面來(lái)獲取代理 logger.debug('{0} crawler started'.format(rule.__rule_name__)) if not rule.use_phantomjs:  await page_download(ProxyCrawler._url_generator(rule), self._pages, self._stop_flag) # 爬取代理網(wǎng)站的頁(yè)面 else:  await page_download_phantomjs(ProxyCrawler._url_generator(rule), self._pages,rule.phantomjs_load_flag, self._stop_flag) # 使用PhantomJS爬取 await self._pages.join() parser.cancel() logger.debug('{0} crawler finished'.format(rule.__rule_name__))

上面的核心代碼實(shí)際上是一個(gè)用asyncio.Queue實(shí)現(xiàn)的生產(chǎn)-消費(fèi)者模型,下面是該模型的一個(gè)簡(jiǎn)單實(shí)現(xiàn):

import asynciofrom random import randomasync def produce(queue, n): for x in range(1, n + 1): print('produce ', x) await asyncio.sleep(random()) await queue.put(x) # 向queue中放入itemasync def consume(queue): while 1: item = await queue.get() # 等待從queue中獲取item print('consume ', item) await asyncio.sleep(random()) queue.task_done() # 通知queue當(dāng)前item處理完畢 async def run(n): queue = asyncio.Queue() consumer = asyncio.ensure_future(consume(queue)) await produce(queue, n) # 等待生產(chǎn)者結(jié)束 await queue.join() # 阻塞直到queue不為空 consumer.cancel() # 取消消費(fèi)者任務(wù),否則它會(huì)一直阻塞在get方法處def aio_queue_run(n): loop = asyncio.get_event_loop() try: loop.run_until_complete(run(n)) # 持續(xù)運(yùn)行event loop直到任務(wù)run(n)結(jié)束 finally: loop.close()if __name__ == '__main__': aio_queue_run(5)

運(yùn)行上面的代碼,一種可能的輸出如下:

produce 1produce 2consume 1produce 3produce 4consume 2produce 5consume 3consume 4consume 5

爬取頁(yè)面

async def page_download(urls, pages, flag): url_generator = urls async with aiohttp.ClientSession() as session: for url in url_generator:  if flag.is_set():  break  await asyncio.sleep(uniform(delay - 0.5, delay + 1))  logger.debug('crawling proxy web page {0}'.format(url))  try:  async with session.get(url, headers=headers, timeout=10) as response:   page = await response.text()   parsed = html.fromstring(decode_html(page)) # 使用bs4來(lái)輔助lxml解碼網(wǎng)頁(yè):http://lxml.de/elementsoup.html#Using only the encoding detection   await pages.put(parsed)   url_generator.send(parsed) # 根據(jù)當(dāng)前頁(yè)面來(lái)獲取下一頁(yè)的地址  except StopIteration:  break  except asyncio.TimeoutError:  logger.error('crawling {0} timeout'.format(url))  continue # TODO: use a proxy  except Exception as e:  logger.error(e)

使用aiohttp實(shí)現(xiàn)的網(wǎng)頁(yè)爬取函數(shù),大部分代理網(wǎng)站都可以使用上面的方法來(lái)爬取,對(duì)于使用js動(dòng)態(tài)生成頁(yè)面的網(wǎng)站可以使用selenium控制PhantomJS來(lái)爬取――本項(xiàng)目對(duì)爬蟲的效率要求不高,代理網(wǎng)站的更新頻率是有限的,不需要頻繁的爬取,完全可以使用PhantomJS。

解析代理

最簡(jiǎn)單的莫過(guò)于用xpath來(lái)解析代理了,使用Chrome瀏覽器的話,直接通過(guò)右鍵就能獲得選中的頁(yè)面元素的xpath:

 

安裝Chrome的擴(kuò)展“XPath Helper”就可以直接在頁(yè)面上運(yùn)行和調(diào)試xpath,十分方便:

 

BeautifulSoup不支持xpath,使用lxml來(lái)解析頁(yè)面,代碼如下:

async def _parse_proxy(self, rule, page): ips = page.xpath(rule.ip_xpath) # 根據(jù)xpath解析得到list類型的ip地址集合 ports = page.xpath(rule.port_xpath) # 根據(jù)xpath解析得到list類型的ip地址集合 if not ips or not ports: logger.warning('{2} crawler could not get ip(len={0}) or port(len={1}), please check the xpaths or network'.  format(len(ips), len(ports), rule.__rule_name__)) return proxies = map(lambda x, y: '{0}:{1}'.format(x.text.strip(), y.text.strip()), ips, ports) if rule.filters: # 根據(jù)過(guò)濾字段來(lái)過(guò)濾代理,如“高匿”、“透明”等 filters = [] for i, ft in enumerate(rule.filters_xpath):  field = page.xpath(ft)  if not field:  logger.warning('{1} crawler could not get {0} field, please check the filter xpath'.   format(rule.filters[i], rule.__rule_name__))  continue  filters.append(map(lambda x: x.text.strip(), field)) filters = zip(*filters) selector = map(lambda x: x == rule.filters, filters) proxies = compress(proxies, selector)for proxy in proxies:await self._proxies.put(proxy) # 解析后的代理放入asyncio.Queue中

爬蟲規(guī)則

網(wǎng)站爬取、代理解析、濾等等操作的規(guī)則都是由各個(gè)代理網(wǎng)站的規(guī)則類定義的,使用元類和基類來(lái)管理規(guī)則類。基類定義如下:

class CrawlerRuleBase(object, metaclass=CrawlerRuleMeta): start_url = None page_count = 0 urls_format = None next_page_xpath = None next_page_host = '' use_phantomjs = False phantomjs_load_flag = None filters = () ip_xpath = None port_xpath = None filters_xpath = ()

各個(gè)參數(shù)的含義如下:

start_url(必需)

爬蟲的起始頁(yè)面。

ip_xpath(必需)

爬取IP的xpath規(guī)則。

port_xpath(必需)

爬取端口號(hào)的xpath規(guī)則。

page_count

爬取的頁(yè)面數(shù)量。

urls_format

頁(yè)面地址的格式字符串,通過(guò)urls_format.format(start_url, n)來(lái)生成第n頁(yè)的地址,這是比較常見(jiàn)的頁(yè)面地址格式。

next_page_xpath,next_page_host

由xpath規(guī)則來(lái)獲取下一頁(yè)的url(常見(jiàn)的是相對(duì)路徑),結(jié)合host得到下一頁(yè)的地址:next_page_host + url。

use_phantomjs, phantomjs_load_flag

use_phantomjs用于標(biāo)識(shí)爬取該網(wǎng)站是否需要使用PhantomJS,若使用,需定義phantomjs_load_flag(網(wǎng)頁(yè)上的某個(gè)元素,str類型)作為PhantomJS頁(yè)面加載完畢的標(biāo)志。

filters

過(guò)濾字段集合,可迭代類型。用于過(guò)濾代理。

爬取各個(gè)過(guò)濾字段的xpath規(guī)則,與過(guò)濾字段按順序一一對(duì)應(yīng)。

元類CrawlerRuleMeta用于管理規(guī)則類的定義,如:如果定義use_phantomjs=True,則必須定義phantomjs_load_flag,否則會(huì)拋出異常,不在此贅述。

目前已經(jīng)實(shí)現(xiàn)的規(guī)則有西刺代理、快代理、360代理、66代理和 秘密代理。新增規(guī)則類也很簡(jiǎn)單,通過(guò)繼承CrawlerRuleBase來(lái)定義新的規(guī)則類YourRuleClass,放在proxypool/rules目錄下,并在該目錄下的__init__.py中添加from . import YourRuleClass(這樣通過(guò)CrawlerRuleBase.__subclasses__()就可以獲取全部的規(guī)則類了),重啟正在運(yùn)行的proxy pool即可應(yīng)用新的規(guī)則。

2. 檢驗(yàn)部分

免費(fèi)的代理雖然多,但是可用的卻不多,所以爬取到代理后需要對(duì)其進(jìn)行檢驗(yàn),有效的代理才能放入代理池中,而代理也是有時(shí)效性的,還要定期對(duì)池中的代理進(jìn)行檢驗(yàn),及時(shí)移除失效的代理。

這部分就很簡(jiǎn)單了,使用aiohttp通過(guò)代理來(lái)訪問(wèn)某個(gè)網(wǎng)站,若超時(shí),則說(shuō)明代理無(wú)效。

async def validate(self, proxies): logger.debug('validator started') while 1: proxy = await proxies.get() async with aiohttp.ClientSession() as session:  try:  real_proxy = 'http://' + proxy  async with session.get(self.validate_url, proxy=real_proxy, timeout=validate_timeout) as resp:   self._conn.put(proxy)  except Exception as e:  logger.error(e) proxies.task_done()

3. server部分

使用aiohttp實(shí)現(xiàn)了一個(gè)web server,啟動(dòng)后,訪問(wèn)http://host:port即可顯示主頁(yè):

  • 訪問(wèn)http://host:port/get來(lái)從代理池獲取1個(gè)代理,如:'127.0.0.1:1080';
  • 訪問(wèn)http://host:port/get/n來(lái)從代理池獲取n個(gè)代理,如:"['127.0.0.1:1080', '127.0.0.1:443', '127.0.0.1:80']";
  • 訪問(wèn)http://host:port/count來(lái)獲取代理池的容量,如:'42'。

因?yàn)橹黜?yè)是一個(gè)靜態(tài)的html頁(yè)面,為避免每來(lái)一個(gè)訪問(wèn)主頁(yè)的請(qǐng)求都要打開(kāi)、讀取以及關(guān)閉該html文件的開(kāi)銷,將其緩存到了redis中,通過(guò)html文件的修改時(shí)間來(lái)判斷其是否被修改過(guò),如果修改時(shí)間與redis緩存的修改時(shí)間不同,則認(rèn)為html文件被修改了,則重新讀取文件,并更新緩存,否則從redis中獲取主頁(yè)的內(nèi)容。

返回代理是通過(guò)aiohttp.web.Response(text=ip.decode('utf-8'))實(shí)現(xiàn)的,text要求str類型,而從redis中獲取到的是bytes類型,需要進(jìn)行轉(zhuǎn)換。返回的多個(gè)代理,使用eval即可轉(zhuǎn)換為list類型。

返回主頁(yè)則不同,是通過(guò)aiohttp.web.Response(body=main_page_cache, content_type='text/html') ,這里body要求的是bytes類型,直接將從redis獲取的緩存返回即可,conten_type='text/html'必不可少,否則無(wú)法通過(guò)瀏覽器加載主頁(yè),而是會(huì)將主頁(yè)下載下來(lái)――在運(yùn)行官方文檔中的示例代碼的時(shí)候也要注意這點(diǎn),那些示例代碼基本上都沒(méi)有設(shè)置content_type。

這部分不復(fù)雜,注意上面提到的幾點(diǎn),而關(guān)于主頁(yè)使用的靜態(tài)資源文件的路徑,可以參考之前的博客《aiohttp之添加靜態(tài)資源路徑》。

4. 運(yùn)行

將整個(gè)代理池的功能分成了3個(gè)獨(dú)立的部分:

proxypool

定期檢查代理池容量,若低于下限則啟動(dòng)代理爬蟲并對(duì)代理檢驗(yàn),通過(guò)檢驗(yàn)的爬蟲放入代理池,達(dá)到規(guī)定的數(shù)量則停止爬蟲。

proxyvalidator

用于定期檢驗(yàn)代理池中的代理,移除失效代理。

proxyserver

啟動(dòng)server。

這3個(gè)獨(dú)立的任務(wù)通過(guò)3個(gè)進(jìn)程來(lái)運(yùn)行,在Linux下可以使用supervisod來(lái)=管理這些進(jìn)程,下面是supervisord的配置文件示例:

; supervisord.conf[unix_http_server]file=/tmp/supervisor.sock [inet_http_server]  port=127.0.0.1:9001 [supervisord]logfile=/tmp/supervisord.log logfile_maxbytes=5MB logfile_backups=10  loglevel=debug  pidfile=/tmp/supervisord.pid nodaemon=false  minfds=1024   minprocs=200   [rpcinterface:supervisor]supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface[supervisorctl]serverurl=unix:///tmp/supervisor.sock[program:proxyPool]command=python /path/to/ProxyPool/run_proxypool.py  redirect_stderr=truestdout_logfile=NONE[program:proxyValidator]command=python /path/to/ProxyPool/run_proxyvalidator.pyredirect_stderr=true  stdout_logfile=NONE[program:proxyServer]command=python /path/to/ProxyPool/run_proxyserver.pyautostart=falseredirect_stderr=true  stdout_logfile=NONE

因?yàn)轫?xiàng)目自身已經(jīng)配置了日志,所以這里就不需要再用supervisord捕獲stdout和stderr了。通過(guò)supervisord -c supervisord.conf啟動(dòng)supervisord,proxyPool和proxyServer則會(huì)隨之自動(dòng)啟動(dòng),proxyServer需要手動(dòng)啟動(dòng),訪問(wèn)http://127.0.0.1:9001即可通過(guò)網(wǎng)頁(yè)來(lái)管理這3個(gè)進(jìn)程了:

supervisod的官方文檔說(shuō)目前(版本3.3.1)不支持python3,但是我在使用過(guò)程中沒(méi)有發(fā)現(xiàn)什么問(wèn)題,可能也是由于我并沒(méi)有使用supervisord的復(fù)雜功能,只是把它當(dāng)作了一個(gè)簡(jiǎn)單的進(jìn)程狀態(tài)監(jiān)控和啟停工具了。

以上就是本文的全部?jī)?nèi)容,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作能帶來(lái)一定的幫助,同時(shí)也希望多多支持武林網(wǎng)!

發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 楚雄市| 芦山县| 永寿县| 南投市| 遵化市| 株洲市| 罗定市| 永寿县| 沈丘县| 阳朔县| 灌云县| 信宜市| 宜兴市| 虹口区| 石嘴山市| 定安县| 彭阳县| 新化县| 安新县| 阿拉善盟| 泾阳县| 濮阳市| 江山市| 额敏县| 衡阳市| 富锦市| 崇信县| 浠水县| 芒康县| 仙游县| 墨竹工卡县| 宜昌市| 长泰县| 棋牌| 鄂尔多斯市| 鹿邑县| 云和县| 青神县| 东源县| 淮安市| 桂阳县|