整合營銷服務商

          電腦端+手機端+微信端=數據同步管理

          免費咨詢熱線:

          python:大規模異步爬蟲用asyncio實現異步爬蟲

          于異步IO這個概念,可能有些小猿們不是非常明白,那就先來看看異步IO是怎么回事兒。

          為了大家能夠更形象得理解這個概念,我們拿放羊來打個比方:

          • 下載請求開始,就是放羊出去吃草;
          • 下載任務完成,就是羊吃飽回羊圈。

          同步放羊的過程就是這樣的:

          羊倌兒小同要放100只羊,他就先放一只羊出去吃草,等羊吃飽了回來在放第二只羊,等第二只羊吃飽了回來再放第三只羊出去吃草……這樣放羊的羊倌兒實在是……

          再看看異步放羊的過程:

          羊倌兒小異也要放100只羊,他觀察后發現,小同放羊的方法比較笨,他覺得草地一下能容下10只羊(帶寬)吃草,所以它就一次放出去10只羊等它們回來,然后他還可以給羊剪剪羊毛。有的羊吃得快回來的早,他就把羊關到羊圈接著就再放出去幾只,盡量保證草地上都有10只羊在吃草。

          很明顯,異步放羊的效率高多了。同樣的,網絡世界里也是異步的效率高。

          到了這里,可能有小猿要問,為什么不用多線程、多進程實現爬蟲呢? 沒錯,多線程和多進程也可以提高前面那個同步爬蟲的抓取效率,但是異步IO提高的更多,也更適合爬蟲這個場景。后面機會我們可以對比一下三者抓取的效率。

          1. 異步的downloader

          還記得我們之前使用requests實現的那個downloader嗎?同步情況下,它很好用,但不適合異步,所以我們要先改造它。幸運的是,已經有aiohttp模塊來支持異步http請求了,那么我們就用aiohttp來實現異步downloader。

          async def fetch(session, url, headers=None, timeout=9):
           _headers = {
           'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
           'Windows NT 6.1; Win64; x64; Trident/5.0)'),
           }
           if headers:
           _headers = headers
           try:
           async with session.get(url, headers=_headers, timeout=timeout) as response:
           status = response.status
           html = await response.read()
           encoding = response.get_encoding()
           if encoding == 'gb2312':
           encoding = 'gbk'
           html = html.decode(encoding, errors='ignore')
           redirected_url = str(response.url)
           except Exception as e:
           msg = 'Failed download: {} | exception: {}, {}'.format(url, str(type(e)), str(e))
           print(msg)
           html = ''
           status = 0
           redirected_url = url
           return status, html, redirected_url
          
          • 這個異步的downloader,我們稱之為fetch(),它有兩個必須參數:
          • seesion: 這是一個aiohttp.ClientSession的對象,這個對象的初始化在crawler里面完成,每次調用fetch()時,作為參數傳遞。

          url:這是需要下載的網址。

          實現中使用了異步上下文管理器(async with),編碼的判斷我們還是用cchardet來實現。

          有了異步下載器,我們的異步爬蟲就可以寫起來啦~

          2. 異步新聞爬蟲

          跟同步爬蟲一樣,我們還是把整個爬蟲定義為一個類,它的主要成員有:

          • self.urlpool 網址池
          • self.loop 異步的事件循環
          • self.seesion aiohttp.ClientSession的對象,用于異步下載
          • self.db 基于aiomysql的異步數據庫連接
          • self._workers 當前并發下載(放出去的羊)的數量

          通過這幾個主要成員來達到異步控制、異步下載、異步存儲(數據庫)的目的,其它成員作為輔助。爬蟲類的相關方法,參加下面的完整實現代碼:

          #!/usr/bin/env python3
          # File: news-crawler-async.py
          # Author: veelion
          import traceback
          import time
          import asyncio
          import aiohttp
          import urllib.parse as urlparse
          import farmhash
          import lzma
          import uvloop
          asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
          import sanicdb
          from urlpool import UrlPool
          import functions as fn
          import config
          class NewsCrawlerAsync:
           def __init__(self, name):
           self._workers = 0
           self._workers_max = 30
           self.logger = fn.init_file_logger(name+ '.log')
           self.urlpool = UrlPool(name)
           self.loop = asyncio.get_event_loop()
           self.session = aiohttp.ClientSession(loop=self.loop)
           self.db = sanicdb.SanicDB(
           config.db_host,
           config.db_db,
           config.db_user,
           config.db_password,
           loop=self.loop
           )
           async def load_hubs(self,):
           sql = 'select url from crawler_hub'
           data = await self.db.query(sql)
           self.hub_hosts = set()
           hubs = []
           for d in data:
           host = urlparse.urlparse(d['url']).netloc
           self.hub_hosts.add(host)
           hubs.append(d['url'])
           self.urlpool.set_hubs(hubs, 300)
           async def save_to_db(self, url, html):
           urlhash = farmhash.hash64(url)
           sql = 'select url from crawler_html where urlhash=%s'
           d = await self.db.get(sql, urlhash)
           if d:
           if d['url'] != url:
           msg = 'farmhash collision: %s <=> %s' % (url, d['url'])
           self.logger.error(msg)
           return True
           if isinstance(html, str):
           html = html.encode('utf8')
           html_lzma = lzma.compress(html)
           sql = ('insert into crawler_html(urlhash, url, html_lzma) '
           'values(%s, %s, %s)')
           good = False
           try:
           await self.db.execute(sql, urlhash, url, html_lzma)
           good = True
           except Exception as e:
           if e.args[0] == 1062:
           # Duplicate entry
           good = True
           pass
           else:
           traceback.print_exc()
           raise e
           return good
           def filter_good(self, urls):
           goodlinks = []
           for url in urls:
           host = urlparse.urlparse(url).netloc
           if host in self.hub_hosts:
           goodlinks.append(url)
           return goodlinks
           async def process(self, url, ishub):
           status, html, redirected_url = await fn.fetch(self.session, url)
           self.urlpool.set_status(url, status)
           if redirected_url != url:
           self.urlpool.set_status(redirected_url, status)
           # 提取hub網頁中的鏈接, 新聞網頁中也有“相關新聞”的鏈接,按需提取
           if status != 200:
           return
           if ishub:
           newlinks = fn.extract_links_re(redirected_url, html)
           goodlinks = self.filter_good(newlinks)
           print("%s/%s, goodlinks/newlinks" % (len(goodlinks), len(newlinks)))
           self.urlpool.addmany(goodlinks)
           else:
           await self.save_to_db(redirected_url, html)
           self._workers -= 1
           async def loop_crawl(self,):
           await self.load_hubs()
           last_rating_time = time.time()
           counter = 0
           while 1:
           tasks = self.urlpool.pop(self._workers_max)
           if not tasks:
           print('no url to crawl, sleep')
           await asyncio.sleep(3)
           continue
           for url, ishub in tasks.items():
           self._workers += 1
           counter += 1
           print('crawl:', url)
           asyncio.ensure_future(self.process(url, ishub))
           gap = time.time() - last_rating_time
           if gap > 5:
           rate = counter / gap
           print('\tloop_crawl() rate:%s, counter: %s, workers: %s' % (round(rate, 2), counter, self._workers))
           last_rating_time = time.time()
           counter = 0
           if self._workers > self._workers_max:
           print('====== got workers_max, sleep 3 sec to next worker =====')
           await asyncio.sleep(3)
           def run(self):
           try:
           self.loop.run_until_complete(self.loop_crawl())
           except KeyboardInterrupt:
           print('stopped by yourself!')
           del self.urlpool
           pass
          if __name__ == '__main__':
           nc = NewsCrawlerAsync('yrx-async')
           nc.run()
          

          爬蟲的主流程是在方法loop_crawl()里面實現的。它的主體是一個while循環,每次從self.urlpool里面獲取定量的爬蟲作為下載任務(從羊圈里面選出一批羊),通過ensure_future()開始異步下載(把這些羊都放出去)。而process()這個方法的流程是下載網頁并存儲、提取新的url,這就類似羊吃草、下崽等。

          通過self._workers和self._workers_max來控制并發量。不能一直并發,給本地CPU、網絡帶寬帶來壓力,同樣也會給目標服務器帶來壓力。

          至此,我們實現了同步和異步兩個新聞爬蟲,分別實現了NewsCrawlerSync和NewsCrawlerAsync兩個爬蟲類,他們的結構幾乎完全一樣,只是抓取流程一個是順序的,一個是并發的。小猿們可以通過對比兩個類的實現,來更好的理解異步的流程。

          爬蟲知識點

          1. uvloop模塊

          uvloop這個模塊是用Cython編寫建立在libuv庫之上,它是asyncio內置事件循環的替代,使用它僅僅是多兩行代碼而已:

          import uvloop
          asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
          

          uvloop使得asyncio很快,比odejs、gevent和其它Python異步框架的快至少2倍,接近于Go語言的性能。

          uvloop作者的性能測試

          這是uvloop作者的性能對比測試。

          目前,uvloop不支持Windows系統和Python 3.5 及其以上版本,這在它源碼的setup.py文件中可以看到:

          if sys.platform in ('win32', 'cygwin', 'cli'):
           raise RuntimeError('uvloop does not support Windows at the moment')
          vi = sys.version_info
          if vi < (3, 5):
           raise RuntimeError('uvloop requires Python 3.5 or greater')
          

          所以,使用Windows的小猿們要運行異步爬蟲,就要把uvloop那兩行注釋掉哦。

          思考題

          1. 給同步的downloader()或異步的fetch()添加功能

          或許有些小猿還沒見過這樣的html代碼,它出現在<head>里面:

          <meta http-equiv="refresh" content="5; url=https://example.com/">
          

          它的意思是,告訴瀏覽器在5秒之后跳轉到另外一個url:https://example.com/。

          那么問題來了,請給downloader(fetch())添加代碼,讓它支持這個跳轉。

          2. 如何控制hub的刷新頻率,及時發現最新新聞

          這是我們寫新聞爬蟲要考慮的一個很重要的問題,我們實現的新聞爬蟲中并沒有實現這個機制,小猿們來思考一下,并對手實現實現。

          ??Crawlee 是Apify SDK的繼承者。用TypeScript完全重寫,以獲得更好的開發者體驗,并具有更強大的抗阻塞功能。界面與 Apify SDK 幾乎相同,因此升級輕而易舉。閱讀升級指南以了解更改。

          Crawlee 涵蓋了端到端的爬行和抓取,并幫助您構建可靠的抓取工具。快速地。

          即使使用默認配置,您的爬蟲也會像人類一樣在現代機器人保護的雷達下飛行。Crawlee 為您提供了在 Web 上抓取鏈接、抓取數據并將其存儲到磁盤或云中的工具,同時保持可配置以滿足您的項目需求。

          Crawlee 以crawleeNPM 包的形式提供。

          安裝

          我們建議您訪問Crawlee 文檔中的介紹教程以獲取更多信息。

          Crawlee 需要Node.js 16 或更高版本

          使用 Crawlee CLI

          試用 Crawlee 的最快方法是使用Crawlee CLI并選擇Getting started example。CLI 將安裝所有必要的依賴項并添加樣板代碼供您使用。

          npx crawlee create my-crawler
          cd my-crawler npm start

          手動安裝

          如果您更喜歡將 Crawlee 添加到您自己的項目中,請嘗試以下示例。因為它使用PlaywrightCrawler我們還需要安裝Playwright。它沒有與 Crawlee 捆綁以減少安裝大小。

          npm install crawlee playwright
          import { PlaywrightCrawler, Dataset } from 'crawlee';
          
          // PlaywrightCrawler crawls the web using a headless
          // browser controlled by the Playwright library.
          const crawler = new PlaywrightCrawler({
              // Use the requestHandler to process each of the crawled pages.
              async requestHandler({ request, page, enqueueLinks, log }) {
                  const title = await page.title();
                  log.info(`Title of ${request.loadedUrl} is '${title}'`);
          
                  // Save results as JSON to ./storage/datasets/default
                  await Dataset.pushData({ title, url: request.loadedUrl });
          
                  // Extract links from the current page
                  // and add them to the crawling queue.
                  await enqueueLinks();
              },
              // Uncomment this option to see the browser window.
              // headless: false,
          });
          
          // Add first URL to the queue and start the crawl.
          await crawler.run(['https://crawlee.dev']);

          默認情況下,Crawlee 將數據存儲到./storage當前工作目錄中。您可以通過 Crawlee 配置覆蓋此目錄。詳見配置指南請求存儲結果存儲

          特征

          • 用于HTTP 和無頭瀏覽器抓取的單一界面
          • 用于抓取 URL 的持久隊列(廣度和深度優先)
          • 表格數據和文件的可插拔存儲
          • 使用可用系統資源自動擴展
          • 集成代理輪換和會話管理
          • 使用鉤子可定制的生命周期
          • CLI引導您的項目
          • 可配置的路由錯誤處理重試
          • 準備部署的Dockerfile
          • 用TypeScript用泛型編寫

          HTTP 爬取

          • 零配置HTTP2 支持,即使是代理
          • 自動生成類似瀏覽器的標題
          • 復制瀏覽器TLS 指紋
          • 集成快速HTML 解析器。Cheerio 和 JSDOM
          • 是的,您也可以抓取JSON API

          真正的瀏覽器爬取

          • JavaScript渲染截圖
          • 無頭有頭支持
          • 零配置生成類人指紋
          • 自動瀏覽器管理
          • 使用相同界面的PlaywrightPuppeteer
          • Chrome , Firefox , Webkit等等

          網友留言問怎么用python抓取小說,今天小編就給大家分享一下用python抓取起點中文網的免費小說教程,用到的庫有urllib2、BeautifulSoup,下面就來看看吧!(關注并私信我python,給你發價值萬元的python學習教程。


          主站蜘蛛池模板: 久久se精品一区二区国产| 3D动漫精品啪啪一区二区下载| 后入内射国产一区二区| 精品国产日韩亚洲一区91| 免费一区二区三区在线视频| 极品少妇伦理一区二区| 日本亚洲成高清一区二区三区| 亚洲国产情侣一区二区三区| 在线免费一区二区| 91福利国产在线观看一区二区 | 亚洲视频一区在线播放| 精品香蕉一区二区三区| 无码AV一区二区三区无码| 亚洲AV综合色区无码一区| 亚洲AV成人精品一区二区三区| 亚洲一区视频在线播放| 国产福利91精品一区二区| V一区无码内射国产| 国产福利一区二区三区| 久久综合一区二区无码| 国产精品福利一区二区| 中文字幕一区视频一线| 蜜芽亚洲av无码一区二区三区| 国产伦精品一区二区三区视频猫咪 | 国产一区在线观看免费| 伊人久久大香线蕉av一区| 99精品国产一区二区三区2021| 亚州国产AV一区二区三区伊在| 日韩高清国产一区在线 | 视频一区二区中文字幕| 3d动漫精品成人一区二区三| 国产亚洲福利一区二区免费看| 熟妇人妻AV无码一区二区三区| 日韩人妻无码免费视频一区二区三区| 99久久精品国产高清一区二区| 欧美日韩国产免费一区二区三区| 亚洲AV无码一区二区三区DV| 日本免费精品一区二区三区| 中文字幕AV一区二区三区| 亚洲国产精品一区二区成人片国内 | 日本无码一区二区三区白峰美|