国产探花免费观看_亚洲丰满少妇自慰呻吟_97日韩有码在线_资源在线日韩欧美_一区二区精品毛片,辰东完美世界有声小说,欢乐颂第一季,yy玄幻小说排行榜完本

首頁(yè) > 編程 > Python > 正文

【Python爬蟲4】并發(fā)并行下載

2019-11-08 03:19:28
字體:
來(lái)源:轉(zhuǎn)載
供稿:網(wǎng)友

1一百萬(wàn)個(gè)網(wǎng)站1用普通方法解析Alexa列表2復(fù)用爬蟲代碼解析Alexa列表2串行爬蟲3并發(fā)并行爬蟲0并發(fā)并行工作原理1多線程爬蟲2多進(jìn)程爬蟲4性能對(duì)比

這篇將介紹使用多線程和多進(jìn)程這兩種方式并發(fā)并行下載網(wǎng)頁(yè),并將它們與串行下載的性能進(jìn)行比較。

1一百萬(wàn)個(gè)網(wǎng)站

亞馬遜子公司Alexa提供了最受歡迎的100萬(wàn)個(gè)網(wǎng)站列表(http://www.alexa.com/topsites ),我們也可以通過(guò)http://s3.amazonaws.com/alexa-static/top-1m.csv.zip 直接下載這一列表的壓縮文件,這樣就不用去提取Alexa網(wǎng)站的數(shù)據(jù)了。

排名 域名
1 google.com
2 youtube.com
3 facebook.com
4 baidu.com
5 yahoo.com
6 wikipedia.com
7 google.co.in
8 amazon.com
9 QQ.com
10 google.co.jp
11 live.com
12 taobao.com

1.1用普通方法解析Alexa列表

提取數(shù)據(jù)的4個(gè)步驟: - 下載.zip文件; - 從.zip文件中提取出CSV文件; - 解析CSV文件; - 遍歷CSV文件中的每一行,從中提取出域名數(shù)據(jù)。

# -*- coding: utf-8 -*-import csvfrom zipfile import ZipFilefrom StringIO import StringIOfrom downloader import Downloaderdef alexa(): D = Downloader() zipped_data = D('http://s3.amazonaws.com/alexa-static/top-1m.csv.zip') urls = [] # top 1 million URL's will be stored in this list with ZipFile(StringIO(zipped_data)) as zf: csv_filename = zf.namelist()[0] for _, website in csv.reader(zf.open(csv_filename)): urls.append('http://' + website) return urlsif __name__ == '__main__': PRint len(alexa())

下載得到的壓縮數(shù)據(jù)是使用StringIO封裝之后,才傳給ZipFile,是因?yàn)閆ipFile需要一個(gè)相關(guān)的接口,而不是字符串。由于這個(gè)zip文件只包含一個(gè)文件,所以直接選擇第一個(gè)文件即可。然后在域名數(shù)據(jù)前添加http://協(xié)議,附加到URL列表中。

1.2復(fù)用爬蟲代碼解析Alexa列表

要復(fù)用上述功能,需要修改scrape_callback接口。

# -*- coding: utf-8 -*-import csvfrom zipfile import ZipFilefrom StringIO import StringIOfrom mongo_cache import MongoCacheclass AlexaCallback: def __init__(self, max_urls=1000): self.max_urls = max_urls self.seed_url = 'http://s3.amazonaws.com/alexa-static/top-1m.csv.zip' def __call__(self, url, html): if url == self.seed_url: urls = [] #cache = MongoCache() with ZipFile(StringIO(html)) as zf: csv_filename = zf.namelist()[0] for _, website in csv.reader(zf.open(csv_filename)): if 'http://' + website not in cache: urls.append('http://' + website) if len(urls) == self.max_urls: break return urls

這里添加了一個(gè)新的輸入?yún)?shù)max_urls,用于設(shè)定從Alexa文件中提取的URL數(shù)量。如果真要下載100萬(wàn)個(gè)網(wǎng)頁(yè),那要消耗11天的時(shí)間,所以這里只設(shè)置為1000個(gè)URL。

2串行爬蟲

# -*- coding: utf-8 -*-from link_crawler import link_crawlerfrom mongo_cache import MongoCachefrom alexa_cb import AlexaCallbackdef main(): scrape_callback = AlexaCallback() cache = MongoCache() #cache.clear() link_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, timeout=10, ignore_robots=True)if __name__ == '__main__': main()

time python ...

3并發(fā)并行爬蟲

為了加快下載網(wǎng)頁(yè)速度,我們用多進(jìn)程和多線程將串行下載擴(kuò)展成并發(fā)下載,并將delay標(biāo)識(shí)最小時(shí)間間隔為1秒,以免造成服務(wù)器過(guò)載,或?qū)е翴P地址封禁。

3.0并發(fā)并行工作原理

并行是基于多處理器多核而言的,讓多個(gè)處理器多核真正同時(shí)跑多個(gè)程序或多個(gè)進(jìn)程。而并發(fā)是單個(gè)處理器而言的,同一時(shí)刻每個(gè)處理器只會(huì)執(zhí)行一個(gè)進(jìn)程,然后在不同進(jìn)程間快速切換,宏觀上給人以多個(gè)程序同時(shí)運(yùn)行的感覺(jué),但微觀上單個(gè)處理器還是串行工作的。同理,在一個(gè)進(jìn)程中,程序的執(zhí)行也是不同線程間進(jìn)行切換的,每個(gè)線程執(zhí)行程序的的不同部分。這就意味著當(dāng)一個(gè)線程等待網(wǎng)頁(yè)下載時(shí),進(jìn)程可以切換到其他線程執(zhí)行,避免浪費(fèi)處理器時(shí)間。因此,為了充分利用計(jì)算機(jī)中的所有資源盡可能快地下載數(shù)據(jù),我們需要將下載分發(fā)到多個(gè)進(jìn)程和線程中。

3.1多線程爬蟲

我們可以修改第一篇文章鏈接爬蟲隊(duì)列結(jié)構(gòu)的代碼,修改為多個(gè)線程中啟動(dòng)爬蟲循環(huán)process_queue(),以便并發(fā)下載這些鏈接。

import timeimport threadingimport urlparsefrom downloader import DownloaderSLEEP_TIME = 1def threaded_crawler(seed_url, delay=5, cache=None, scrape_callback=None, user_agent='Wu_Being', proxies=None, num_retries=1, max_threads=10, timeout=60): """Crawl this website in multiple threads """ # the queue of URL's that still need to be crawled #crawl_queue = Queue.deque([seed_url]) crawl_queue = [seed_url] # the URL's that have been seen seen = set([seed_url]) D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout) def process_queue(): while True: try: url = crawl_queue.pop() except IndexError: # crawl queue is empty break else: html = D(url) if scrape_callback: try: links = scrape_callback(url, html) or [] except Exception as e: print 'Error in callback for: {}: {}'.format(url, e) else: for link in links: link = normalize(seed_url, link) # check whether already crawled this link if link not in seen: seen.add(link) # add this new link to queue crawl_queue.append(link) # wait for all download threads to finish threads = [] while threads or crawl_queue: # the crawl is still active for thread in threads: if not thread.is_alive(): # remove the stopped threads threads.remove(thread) while len(threads) < max_threads and crawl_queue: # can start some more threads thread = threading.Thread(target=process_queue) thread.setDaemon(True) # set daemon so main thread can exit when receives ctrl-c thread.start() threads.append(thread) # all threads have been processed # sleep temporarily so CPU can focus execution on other threads time.sleep(SLEEP_TIME)def normalize(seed_url, link): """Normalize this URL by removing hash and adding domain """ link, _ = urlparse.urldefrag(link) # remove hash to avoid duplicates return urlparse.urljoin(seed_url, link)

上面代碼在循環(huán)會(huì)不斷創(chuàng)建線程,直到達(dá)到線程池threads的最大值。在爬取過(guò)程中,如果當(dāng)前列隊(duì)沒(méi)有更多可以爬取的URL時(shí),該線程會(huì)提前停止。 例如當(dāng)前有兩個(gè)線程以及兩個(gè)待下載的URL,當(dāng)?shù)谝粋€(gè)線程完成下載時(shí),待爬取隊(duì)列為空,則該線程退出。第二個(gè)線程稍后也完成了下載,但又發(fā)現(xiàn)了另一個(gè)待下載的URL。此時(shí),thread循環(huán)注意到還有URL需要下載,并且線程數(shù)未達(dá)到最大值,因些創(chuàng)建一個(gè)新的下載線程。

# -*- coding: utf-8 -*-import sysfrom threaded_crawler import threaded_crawlerfrom mongo_cache import MongoCachefrom alexa_cb import AlexaCallbackdef main(max_threads): scrape_callback = AlexaCallback() cache = MongoCache() #cache.clear() threaded_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=10)if __name__ == '__main__': max_threads = int(sys.argv[1]) main(max_threads)

$time python 3threaded_test.py 5 上面使用了5個(gè)線程,因此下載速度幾乎是串行版本的5倍。

3.2多進(jìn)程爬蟲

對(duì)于有多核的中央處理器,則可以啟動(dòng)多進(jìn)程。

# -*- coding: utf-8 -*-import sysfrom process_crawler import process_crawlerfrom mongo_cache import MongoCachefrom alexa_cb import AlexaCallbackdef main(max_threads): scrape_callback = AlexaCallback() cache = MongoCache() cache.clear() process_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=10) ##process_crawlerif __name__ == '__main__': max_threads = int(sys.argv[1]) main(max_threads)

下面代碼首先獲取中央處理器內(nèi)核個(gè)數(shù),然后啟動(dòng)相應(yīng)的進(jìn)程個(gè)數(shù),在每進(jìn)程啟動(dòng)多個(gè)線程爬蟲。之前的爬蟲隊(duì)列是存儲(chǔ)在本地內(nèi)存中,其他進(jìn)程都無(wú)法處理這一爬蟲,為了解決這一問(wèn)題,需要把爬蟲隊(duì)列轉(zhuǎn)移到MongoDB當(dāng)中。單獨(dú)存儲(chǔ)隊(duì)列,意味著即使是不同服務(wù)器上的爬蟲也能夠協(xié)同處理同一個(gè)爬蟲任務(wù)。我們可以使用更加健壯的隊(duì)列,比如專用的消息傳輸工具Celery,這里我們利用MongoDB實(shí)現(xiàn)的隊(duì)列代碼。在threaded_crawler需要做如下修改: - 內(nèi)建的隊(duì)列換成基于MongoDB的新隊(duì)列MongoQueue; - 由于隊(duì)列內(nèi)部實(shí)現(xiàn)中處理重復(fù)URL的問(wèn)題,因此不再需要seen變量; - 在URL處理結(jié)束后調(diào)用complete()方法,用于記錄該URL已經(jīng)被成功解析。

import timeimport urlparseimport threadingimport multiprocessingfrom mongo_cache import MongoCachefrom mongo_queue import MongoQueuefrom downloader import DownloaderSLEEP_TIME = 1### process_crawler(scrape_callback.seed_url, scrape_callback=scrape_callback, cache=cache, max_threads=max_threads, timeout=10)def process_crawler(args, **kwargs): #args:number of args, kwargs:args list num_cpus = multiprocessing.cpu_count() #pool = multiprocessing.Pool(processes=num_cpus) print 'Starting {} processes...'.format(num_cpus) ###################### processes = [] for i in range(num_cpus): p = multiprocessing.Process(target=threaded_crawler, args=[args], kwargs=kwargs)### threaded_crawler #parsed = pool.apply_async(threaded_link_crawler, args, kwargs) p.start() processes.append(p) # wait for processes to complete for p in processes: p.join()def threaded_crawler(seed_url, delay=5, cache=None, scrape_callback=None, user_agent='wu_being', proxies=None, num_retries=1, max_threads=10, timeout=60): """Crawl using multiple threads """ # the queue of URL's that still need to be crawled crawl_queue = MongoQueue() ###################### crawl_queue.clear() ###################### crawl_queue.push(seed_url) ###################### D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout) def process_queue(): while True: # keep track that are processing url try: url = crawl_queue.pop() ###################### except KeyError: # currently no urls to process break else: html = D(url) if scrape_callback: try: links = scrape_callback(url, html) or [] except Exception as e: print 'Error in callback for: {}: {}'.format(url, e) else: for link in links: ############# # add this new link to queue###################### crawl_queue.push(normalize(seed_url, link))###################### crawl_queue.complete(url) ###################### # wait for all download threads to finish threads = [] while threads or crawl_queue: ###################### for thread in threads: if not thread.is_alive(): threads.remove(thread) while len(threads) < max_threads and crawl_queue.peek(): ####################### # can start some more threads thread = threading.Thread(target=process_queue) thread.setDaemon(True) # set daemon so main thread can exit when receives ctrl-c thread.start() threads.append(thread) time.sleep(SLEEP_TIME)def normalize(seed_url, link): """Normalize this URL by removing hash and adding domain """ link, _ = urlparse.urldefrag(link) # remove hash to avoid duplicates return urlparse.urljoin(seed_url, link)

MongoQueue定義了三種狀態(tài): - OUTSTANDING:添加一人新URL時(shí); - PROCESSING:隊(duì)列中取出準(zhǔn)備下載時(shí); - COMPLETE:完成下載時(shí)。

由于大部分線程都在從隊(duì)列準(zhǔn)備取出未完成處理的URL,比如處理的URL線程被終止的情況。所以在該類中使用了timeout參數(shù),默認(rèn)為300秒。在repaire()方法中,如果某個(gè)URL的處理時(shí)間超過(guò)了這個(gè)timeout值,我們就認(rèn)定處理過(guò)程出現(xiàn)了錯(cuò)誤,URL的狀態(tài)將被重新設(shè)為OUTSTANDING,以便再次處理。

from datetime import datetime, timedeltafrom pymongo import MongoClient, errorsclass MongoQueue: """ >>> timeout = 1 >>> url = 'http://example.webscraping.com' >>> q = MongoQueue(timeout=timeout) >>> q.clear() # ensure empty queue >>> q.push(url) # add test URL >>> q.peek() == q.pop() == url # pop back this URL True >>> q.repair() # immediate repair will do nothin >>> q.pop() # another pop should be empty >>> q.peek() >>> import time; time.sleep(timeout) # wait for timeout >>> q.repair() # now repair will release URL Released: test >>> q.pop() == url # pop URL again True >>> bool(q) # queue is still active while outstanding True >>> q.complete(url) # complete this URL >>> bool(q) # queue is not complete False """ # possible states of a download OUTSTANDING, PROCESSING, COMPLETE = range(3) def __init__(self, client=None, timeout=300): """ host: the host to connect to MongoDB port: the port to connect to MongoDB timeout: the number of seconds to allow for a timeout """ self.client = MongoClient() if client is None else client self.db = self.client.cache self.timeout = timeout def __nonzero__(self): """Returns True if there are more jobs to process """ record = self.db.crawl_queue.find_one( {'status': {'$ne': self.COMPLETE}} ) return True if record else False def push(self, url): """Add new URL to queue if does not exist """ try: self.db.crawl_queue.insert({'_id': url, 'status': self.OUTSTANDING}) except errors.DuplicateKeyError as e: pass # this is already in the queue def pop(self): """Get an outstanding URL from the queue and set its status to processing. If the queue is empty a KeyError exception is raised. """ record = self.db.crawl_queue.find_and_modify( query={'status': self.OUTSTANDING}, update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.now()}} ) if record: return record['_id'] else: self.repair() raise KeyError() def peek(self): record = self.db.crawl_queue.find_one({'status': self.OUTSTANDING}) if record: return record['_id'] def complete(self, url): self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.COMPLETE}}) def repair(self): """Release stalled jobs """ record = self.db.crawl_queue.find_and_modify( query={ 'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)}, 'status': {'$ne': self.COMPLETE} }, update={'$set': {'status': self.OUTSTANDING}} ) if record: print 'Released:', record['_id'] def clear(self): self.db.crawl_queue.drop()

4性能對(duì)比

腳本 線程數(shù) 進(jìn)程數(shù) 時(shí)間 與串行時(shí)間比
串行 1 1
多線程 5 1
多線程 10 1
多線程 20 1
多進(jìn)程 5 2
多進(jìn)程 10 2
多進(jìn)程 20 2

此外,下載的帶寬是有限的,最終添加新線程將無(wú)法加快的下載速度。因此要想獲得更好性能的爬蟲,就需要在多臺(tái)服務(wù)器上分布式部署爬蟲,并且所有服務(wù)器都要指向同一個(gè)MongoDB隊(duì)列實(shí)例中。

Wu_Being 博客聲明:本人博客歡迎轉(zhuǎn)載,請(qǐng)標(biāo)明博客原文和原鏈接!謝謝! 【Python爬蟲系列】《【Python爬蟲4】并發(fā)并行下載》http://blog.csdn.net/u014134180/article/details/55506994 Python爬蟲系列的GitHub代碼文件:https://github.com/1040003585/WebScrapingWithPython


發(fā)表評(píng)論 共有條評(píng)論
用戶名: 密碼:
驗(yàn)證碼: 匿名發(fā)表
主站蜘蛛池模板: 高雄县| 江川县| 乡宁县| 乡城县| 南充市| 金堂县| 新余市| 淮北市| 米泉市| 郧西县| 基隆市| 桂东县| 贡山| 齐齐哈尔市| 宁化县| 双流县| 拉孜县| 达尔| 建水县| 白朗县| 天峻县| 呼图壁县| 肇源县| 瓮安县| 浪卡子县| 城口县| 政和县| 武乡县| 吉隆县| 襄汾县| 聊城市| 隆林| 远安县| 息烽县| 柳河县| 龙南县| 海丰县| 增城市| 博罗县| 霍邱县| 英吉沙县|