于異步IO這個概念,可能有些小猿們不是非常明白,那就先來看看異步IO是怎么回事兒。
為了大家能夠更形象得理解這個概念,我們拿放羊來打個比方:
同步放羊的過程就是這樣的:
羊倌兒小同要放100只羊,他就先放一只羊出去吃草,等羊吃飽了回來在放第二只羊,等第二只羊吃飽了回來再放第三只羊出去吃草……這樣放羊的羊倌兒實在是……
再看看異步放羊的過程:
羊倌兒小異也要放100只羊,他觀察后發現,小同放羊的方法比較笨,他覺得草地一下能容下10只羊(帶寬)吃草,所以它就一次放出去10只羊等它們回來,然后他還可以給羊剪剪羊毛。有的羊吃得快回來的早,他就把羊關到羊圈接著就再放出去幾只,盡量保證草地上都有10只羊在吃草。
很明顯,異步放羊的效率高多了。同樣的,網絡世界里也是異步的效率高。
到了這里,可能有小猿要問,為什么不用多線程、多進程實現爬蟲呢? 沒錯,多線程和多進程也可以提高前面那個同步爬蟲的抓取效率,但是異步IO提高的更多,也更適合爬蟲這個場景。后面機會我們可以對比一下三者抓取的效率。
還記得我們之前使用requests實現的那個downloader嗎?同步情況下,它很好用,但不適合異步,所以我們要先改造它。幸運的是,已經有aiohttp模塊來支持異步http請求了,那么我們就用aiohttp來實現異步downloader。
async def fetch(session, url, headers=None, timeout=9): _headers = { 'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; ' 'Windows NT 6.1; Win64; x64; Trident/5.0)'), } if headers: _headers = headers try: async with session.get(url, headers=_headers, timeout=timeout) as response: status = response.status html = await response.read() encoding = response.get_encoding() if encoding == 'gb2312': encoding = 'gbk' html = html.decode(encoding, errors='ignore') redirected_url = str(response.url) except Exception as e: msg = 'Failed download: {} | exception: {}, {}'.format(url, str(type(e)), str(e)) print(msg) html = '' status = 0 redirected_url = url return status, html, redirected_url
實現中使用了異步上下文管理器(async with),編碼的判斷我們還是用cchardet來實現。
有了異步下載器,我們的異步爬蟲就可以寫起來啦~
跟同步爬蟲一樣,我們還是把整個爬蟲定義為一個類,它的主要成員有:
通過這幾個主要成員來達到異步控制、異步下載、異步存儲(數據庫)的目的,其它成員作為輔助。爬蟲類的相關方法,參加下面的完整實現代碼:
#!/usr/bin/env python3 # File: news-crawler-async.py # Author: veelion import traceback import time import asyncio import aiohttp import urllib.parse as urlparse import farmhash import lzma import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) import sanicdb from urlpool import UrlPool import functions as fn import config class NewsCrawlerAsync: def __init__(self, name): self._workers = 0 self._workers_max = 30 self.logger = fn.init_file_logger(name+ '.log') self.urlpool = UrlPool(name) self.loop = asyncio.get_event_loop() self.session = aiohttp.ClientSession(loop=self.loop) self.db = sanicdb.SanicDB( config.db_host, config.db_db, config.db_user, config.db_password, loop=self.loop ) async def load_hubs(self,): sql = 'select url from crawler_hub' data = await self.db.query(sql) self.hub_hosts = set() hubs = [] for d in data: host = urlparse.urlparse(d['url']).netloc self.hub_hosts.add(host) hubs.append(d['url']) self.urlpool.set_hubs(hubs, 300) async def save_to_db(self, url, html): urlhash = farmhash.hash64(url) sql = 'select url from crawler_html where urlhash=%s' d = await self.db.get(sql, urlhash) if d: if d['url'] != url: msg = 'farmhash collision: %s <=> %s' % (url, d['url']) self.logger.error(msg) return True if isinstance(html, str): html = html.encode('utf8') html_lzma = lzma.compress(html) sql = ('insert into crawler_html(urlhash, url, html_lzma) ' 'values(%s, %s, %s)') good = False try: await self.db.execute(sql, urlhash, url, html_lzma) good = True except Exception as e: if e.args[0] == 1062: # Duplicate entry good = True pass else: traceback.print_exc() raise e return good def filter_good(self, urls): goodlinks = [] for url in urls: host = urlparse.urlparse(url).netloc if host in self.hub_hosts: goodlinks.append(url) return goodlinks async def process(self, url, ishub): status, html, redirected_url = await fn.fetch(self.session, url) self.urlpool.set_status(url, status) if redirected_url != url: self.urlpool.set_status(redirected_url, status) # 提取hub網頁中的鏈接, 新聞網頁中也有“相關新聞”的鏈接,按需提取 if status != 200: return if ishub: newlinks = fn.extract_links_re(redirected_url, html) goodlinks = self.filter_good(newlinks) print("%s/%s, goodlinks/newlinks" % (len(goodlinks), len(newlinks))) self.urlpool.addmany(goodlinks) else: await self.save_to_db(redirected_url, html) self._workers -= 1 async def loop_crawl(self,): await self.load_hubs() last_rating_time = time.time() counter = 0 while 1: tasks = self.urlpool.pop(self._workers_max) if not tasks: print('no url to crawl, sleep') await asyncio.sleep(3) continue for url, ishub in tasks.items(): self._workers += 1 counter += 1 print('crawl:', url) asyncio.ensure_future(self.process(url, ishub)) gap = time.time() - last_rating_time if gap > 5: rate = counter / gap print('\tloop_crawl() rate:%s, counter: %s, workers: %s' % (round(rate, 2), counter, self._workers)) last_rating_time = time.time() counter = 0 if self._workers > self._workers_max: print('====== got workers_max, sleep 3 sec to next worker =====') await asyncio.sleep(3) def run(self): try: self.loop.run_until_complete(self.loop_crawl()) except KeyboardInterrupt: print('stopped by yourself!') del self.urlpool pass if __name__ == '__main__': nc = NewsCrawlerAsync('yrx-async') nc.run()
爬蟲的主流程是在方法loop_crawl()里面實現的。它的主體是一個while循環,每次從self.urlpool里面獲取定量的爬蟲作為下載任務(從羊圈里面選出一批羊),通過ensure_future()開始異步下載(把這些羊都放出去)。而process()這個方法的流程是下載網頁并存儲、提取新的url,這就類似羊吃草、下崽等。
通過self._workers和self._workers_max來控制并發量。不能一直并發,給本地CPU、網絡帶寬帶來壓力,同樣也會給目標服務器帶來壓力。
至此,我們實現了同步和異步兩個新聞爬蟲,分別實現了NewsCrawlerSync和NewsCrawlerAsync兩個爬蟲類,他們的結構幾乎完全一樣,只是抓取流程一個是順序的,一個是并發的。小猿們可以通過對比兩個類的實現,來更好的理解異步的流程。
爬蟲知識點
uvloop這個模塊是用Cython編寫建立在libuv庫之上,它是asyncio內置事件循環的替代,使用它僅僅是多兩行代碼而已:
import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
uvloop使得asyncio很快,比odejs、gevent和其它Python異步框架的快至少2倍,接近于Go語言的性能。
uvloop作者的性能測試
這是uvloop作者的性能對比測試。
目前,uvloop不支持Windows系統和Python 3.5 及其以上版本,這在它源碼的setup.py文件中可以看到:
if sys.platform in ('win32', 'cygwin', 'cli'): raise RuntimeError('uvloop does not support Windows at the moment') vi = sys.version_info if vi < (3, 5): raise RuntimeError('uvloop requires Python 3.5 or greater')
所以,使用Windows的小猿們要運行異步爬蟲,就要把uvloop那兩行注釋掉哦。
思考題
或許有些小猿還沒見過這樣的html代碼,它出現在<head>里面:
<meta http-equiv="refresh" content="5; url=https://example.com/">
它的意思是,告訴瀏覽器在5秒之后跳轉到另外一個url:https://example.com/。
那么問題來了,請給downloader(fetch())添加代碼,讓它支持這個跳轉。
這是我們寫新聞爬蟲要考慮的一個很重要的問題,我們實現的新聞爬蟲中并沒有實現這個機制,小猿們來思考一下,并對手實現實現。
??Crawlee 是Apify SDK的繼承者。用TypeScript完全重寫,以獲得更好的開發者體驗,并具有更強大的抗阻塞功能。界面與 Apify SDK 幾乎相同,因此升級輕而易舉。閱讀升級指南以了解更改。
Crawlee 涵蓋了端到端的爬行和抓取,并幫助您構建可靠的抓取工具。快速地。
即使使用默認配置,您的爬蟲也會像人類一樣在現代機器人保護的雷達下飛行。Crawlee 為您提供了在 Web 上抓取鏈接、抓取數據并將其存儲到磁盤或云中的工具,同時保持可配置以滿足您的項目需求。
Crawlee 以crawleeNPM 包的形式提供。
我們建議您訪問Crawlee 文檔中的介紹教程以獲取更多信息。
Crawlee 需要Node.js 16 或更高版本。
試用 Crawlee 的最快方法是使用Crawlee CLI并選擇Getting started example。CLI 將安裝所有必要的依賴項并添加樣板代碼供您使用。
npx crawlee create my-crawler
cd my-crawler npm start
如果您更喜歡將 Crawlee 添加到您自己的項目中,請嘗試以下示例。因為它使用PlaywrightCrawler我們還需要安裝Playwright。它沒有與 Crawlee 捆綁以減少安裝大小。
npm install crawlee playwright
import { PlaywrightCrawler, Dataset } from 'crawlee';
// PlaywrightCrawler crawls the web using a headless
// browser controlled by the Playwright library.
const crawler = new PlaywrightCrawler({
// Use the requestHandler to process each of the crawled pages.
async requestHandler({ request, page, enqueueLinks, log }) {
const title = await page.title();
log.info(`Title of ${request.loadedUrl} is '${title}'`);
// Save results as JSON to ./storage/datasets/default
await Dataset.pushData({ title, url: request.loadedUrl });
// Extract links from the current page
// and add them to the crawling queue.
await enqueueLinks();
},
// Uncomment this option to see the browser window.
// headless: false,
});
// Add first URL to the queue and start the crawl.
await crawler.run(['https://crawlee.dev']);
默認情況下,Crawlee 將數據存儲到./storage當前工作目錄中。您可以通過 Crawlee 配置覆蓋此目錄。詳見配置指南、請求存儲和結果存儲。
網友留言問怎么用python抓取小說,今天小編就給大家分享一下用python抓取起點中文網的免費小說教程,用到的庫有urllib2、BeautifulSoup,下面就來看看吧!(關注并私信我python,給你發價值萬元的python學習教程。
*請認真填寫需求信息,我們會在24小時內與您取得聯系。