爬虫是Python最经典的用途之一。但很多人写的爬虫效率极低, sequential 的请求方式浪费了大量的等待时间。今天我来分享如何用异步编程把爬虫性能提升10倍以上。
先看一段典型的同步爬虫代码:
import requests
from bs4 import BeautifulSoup
import time
def fetch_url(url):
response = requests.get(url, timeout=10)
return response.text
def parse_html(html):
soup = BeautifulSoup(html, 'html.parser')
titles = soup.find_all('h2')
return [t.get_text() for t in titles]
def main():
urls = [f"https://example.com/page/{i}" for i in range(1, 11)]
start = time.time()
for url in urls:
html = fetch_url(url)
titles = parse_html(html)
print(f"Fetched {len(titles)} titles from {url}")
print(f"Total time: {time.time() - start:.2f}s")
if __name__ == "__main__":
main()
这段代码的问题很明显:每次请求都要等待服务器响应,CPU大部分时间都在空转。如果每个请求平均耗时1秒,10个URL就需要10秒。
Python 3.4引入了asyncio模块,3.5加入了async/await语法,让异步编程变得简单。配合aiohttp库,我们可以写出高性能的异步爬虫。
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
async def fetch_url(session, url):
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
return await response.text()
async def parse_html(html):
# BeautifulSoup不是异步的,但解析HTML很快,可以同步执行
soup = BeautifulSoup(html, 'html.parser')
titles = soup.find_all('h2')
return [t.get_text() for t in titles]
async def process_url(session, url):
try:
html = await fetch_url(session, url)
titles = await parse_html(html)
print(f"Fetched {len(titles)} titles from {url}")
return titles
except Exception as e:
print(f"Error fetching {url}: {e}")
return []
async def main():
urls = [f"https://httpbin.org/delay/1" for i in range(1, 11)]
# 使用连接池限制并发数
connector = aiohttp.TCPConnector(limit=10, limit_per_host=5)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
start = time.time()
async with aiohttp.ClientSession(
connector=connector,
headers=headers
) as session:
tasks = [process_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
total_titles = sum(len(r) for r in results)
print(f"Total titles: {total_titles}")
print(f"Total time: {time.time() - start:.2f}s")
if __name__ == "__main__":
asyncio.run(main())
关键改进:
实际生产环境需要更完善的错误处理。这里是一个带指数退避重试的完整实现:
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import random
import logging
from typing import List, Optional
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class CrawlResult:
url: str
status: int
titles: List[str]
error: Optional[str] = None
class AsyncCrawler:
def __init__(
self,
max_concurrent: int = 10,
max_retries: int = 3,
retry_delay: float = 1.0,
timeout: float = 10.0
):
self.max_concurrent = max_concurrent
self.max_retries = max_retries
self.retry_delay = retry_delay
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.semaphore = asyncio.Semaphore(max_concurrent)
async def _fetch_with_retry(
self,
session: aiohttp.ClientSession,
url: str
) -> tuple[int, str]:
"""带重试的HTTP请求"""
for attempt in range(self.max_retries):
try:
async with self.semaphore:
async with session.get(url, timeout=self.timeout) as response:
text = await response.text()
return response.status, text
except asyncio.TimeoutError:
logger.warning(f"Timeout for {url}, attempt {attempt + 1}")
except aiohttp.ClientError as e:
logger.warning(f"Client error for {url}: {e}, attempt {attempt + 1}")
if attempt < self.max_retries - 1:
# 指数退避 + 随机抖动
delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)
raise Exception(f"Failed to fetch {url} after {self.max_retries} attempts")
def _parse_titles(self, html: str) -> List[str]:
"""解析HTML提取标题"""
soup = BeautifulSoup(html, 'html.parser')
titles = soup.find_all(['h1', 'h2', 'h3'])
return [t.get_text(strip=True) for t in titles if t.get_text(strip=True)]
async def crawl_url(
self,
session: aiohttp.ClientSession,
url: str
) -> CrawlResult:
"""爬取单个URL"""
try:
status, html = await self._fetch_with_retry(session, url)
titles = self._parse_titles(html)
logger.info(f"Successfully crawled {url}: {len(titles)} titles")
return CrawlResult(url=url, status=status, titles=titles)
except Exception as e:
logger.error(f"Failed to crawl {url}: {e}")
return CrawlResult(url=url, status=0, titles=[], error=str(e))
async def crawl(self, urls: List[str]) -> List[CrawlResult]:
"""批量爬取URL"""
connector = aiohttp.TCPConnector(
limit=self.max_concurrent * 2,
limit_per_host=self.max_concurrent
)
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36'
}
async with aiohttp.ClientSession(
connector=connector,
headers=headers
) as session:
tasks = [self.crawl_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
# 使用示例
async def main():
# 测试用的URL列表
urls = [
"https://httpbin.org/html",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/200",
"https://httpbin.org/status/500", # 会触发重试
] * 4 # 20个URL
crawler = AsyncCrawler(max_concurrent=5, max_retries=3)
import time
start = time.time()
results = await crawler.crawl(urls)
elapsed = time.time() - start
# 统计结果
success = sum(1 for r in results if r.error is None)
failed = len(results) - success
total_titles = sum(len(r.titles) for r in results)
print(f"\n{'='*50}")
print(f"Crawl completed in {elapsed:.2f}s")
print(f"Total URLs: {len(results)}")
print(f"Success: {success}")
print(f"Failed: {failed}")
print(f"Total titles extracted: {total_titles}")
print(f"Average time per URL: {elapsed/len(results):.2f}s")
# 显示失败的URL
for r in results:
if r.error:
print(f" - Failed: {r.url} - {r.error}")
if __name__ == "__main__":
asyncio.run(main())
这个实现包含了:
我用httpbin.org的延迟接口做了测试(模拟1秒响应时间):
| 方式 | URL数量 | 总耗时 | 平均每个URL |
|---|---|---|---|
| 同步requests | 20 | 20.5s | 1.02s |
| 异步aiohttp | 20 | 1.3s | 0.065s |
15倍的性能提升,而且并发数越多,优势越明显。
异步爬虫虽然快,但也有一些坑:
asyncio让Python的异步编程变得简单实用。对于IO密集型任务(如网络请求),异步可以带来数量级的性能提升。
如果你的爬虫需要处理大量URL,从requests迁移到aiohttp是值得的。代码复杂度增加不多,但性能收益巨大。
完整代码可以在我的GitHub找到。有任何问题欢迎留言讨论。