Python Web Scraping

2025年的反爬技术已经进化到了令人发指的地步:Cloudflare Turnstile、指纹检测、行为分析、Canvas指纹……传统的requests+BeautifulSoup已经完全不够用了。本文将教你如何构建一个能应对现代反爬的企业级爬虫系统。

2025年反爬技术全景

网站常用的反爬手段

  1. Cloudflare Turnstile:替代传统验证码的行为验证
  2. 浏览器指纹:Canvas、WebGL、AudioContext指纹
  3. TLS指纹:JA3/JA4指纹检测
  4. 行为分析:鼠标轨迹、滚动模式、点击间隔
  5. IP信誉:代理IP黑名单、频率限制
  6. Honeypot:隐藏链接、蜜罐元素

技术栈选择

工具 用途 推荐度
Playwright 浏览器自动化 ⭐⭐⭐⭐⭐
httpx HTTP客户端 ⭐⭐⭐⭐
Scrapy 爬虫框架 ⭐⭐⭐⭐
curl_cffi TLS指纹伪装 ⭐⭐⭐⭐⭐
undetected-chromedriver 反检测浏览器 ⭐⭐⭐

方案一:轻量级反检测HTTP爬虫

适合不需要JS渲染的静态页面:

import asyncio
from curl_cffi.requests import AsyncSession

class StealthScraper:
    def __init__(self):
        # 使用Chrome浏览器指纹
        self.session = AsyncSession(impersonate="chrome124")

    async def fetch(self, url: str, headers: dict = None) -> str:
        """发送请求并返回HTML"""
        default_headers = {
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Accept-Encoding": "gzip, deflate, br",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Sec-Fetch-Site": "none",
        }
        if headers:
            default_headers.update(headers)

        response = await self.session.get(url, headers=default_headers, timeout=30)
        return response.text

    async def fetch_many(self, urls: list[str], delay: float = 2.0) -> list[str]:
        """批量请求,带延迟"""
        results = []
        for url in urls:
            try:
                html = await self.fetch(url)
                results.append(html)
                await asyncio.sleep(delay + __import__('random').uniform(0.5, 2.0))
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                results.append(None)
        return results

# 使用示例
async def main():
    scraper = StealthScraper()
    html = await scraper.fetch("https://example.com")
    print(html[:500])

asyncio.run(main())

为什么用curl_cffi?

curl_cffi使用C语言级别的TLS指纹伪装,可以完美模拟Chrome、Firefox等浏览器的TLS握手特征。这是绕过Cloudflare等WAF的关键技术。

方案二:Playwright反检测浏览器爬虫

适合需要JS渲染的动态页面:

from playwright.async_api import async_playwright
import asyncio
import random

class BrowserScraper:
    def __init__(self):
        self.browser = None
        self.context = None

    async def setup(self):
        """初始化浏览器"""
        playwright = await async_playwright().start()
        self.browser = await playwright.chromium.launch(
            headless=True,
            args=[
                '--disable-blink-features=AutomationControlled',
                '--no-sandbox',
                '--disable-setuid-sandbox',
            ]
        )
        self.context = await self.browser.new_context(
            viewport={'width': 1920, 'height': 1080},
            user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
            locale='zh-CN',
            timezone_id='Asia/Shanghai',
        )
        # 隐藏webdriver特征
        await self.context.add_init_script("""
            Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
            Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]});
            Object.defineProperty(navigator, 'languages', {get: () => ['zh-CN', 'zh', 'en']});
            window.chrome = { runtime: {} };
        """)

    async def human_scroll(self, page):
        """模拟人类滚动行为"""
        await page.evaluate("window.scrollTo(0, 0)")
        await asyncio.sleep(random.uniform(0.5, 1.5))

        total_height = await page.evaluate("document.body.scrollHeight")
        viewport = 1080
        position = 0

        while position < total_height:
            scroll_amount = random.randint(200, 500)
            position += scroll_amount
            await page.evaluate(f"window.scrollTo(0, {position})")
            await asyncio.sleep(random.uniform(0.3, 1.0))

    async def scrape(self, url: str) -> str:
        """爬取页面"""
        page = await self.context.new_page()
        try:
            await page.goto(url, wait_until='networkidle', timeout=30000)
            await self.human_scroll(page)
            await asyncio.sleep(random.uniform(1, 3))
            content = await page.content()
            return content
        finally:
            await page.close()

    async def close(self):
        await self.browser.close()

# 使用示例
async def main():
    scraper = BrowserScraper()
    await scraper.setup()
    html = await scraper.scrape("https://example.com")
    print(html[:500])
    await scraper.close()

asyncio.run(main())

方案三:企业级分布式爬虫架构

对于大规模数据采集需求:

# 爬虫调度器
import redis
import json
from typing import Optional

class CrawlScheduler:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "crawl:queue"
        self.result_key = "crawl:results"
        self.visited_key = "crawl:visited"

    def add_url(self, url: str, priority: int = 5, meta: dict = None):
        """添加URL到队列"""
        if self.redis.sismember(self.visited_key, url):
            return False
        task = json.dumps({
            "url": url,
            "priority": priority,
            "meta": meta or {},
            "retry_count": 0
        })
        self.redis.zadd(self.queue_key, {task: priority})
        return True

    def get_next_task(self) -> Optional[dict]:
        """获取下一个任务"""
        result = self.redis.zpopmax(self.queue_key)
        if result:
            task_json = result[0][0]
            return json.loads(task_json)
        return None

    def mark_visited(self, url: str):
        """标记URL已访问"""
        self.redis.sadd(self.visited_key, url)

    def save_result(self, url: str, data: dict):
        """保存爬取结果"""
        self.redis.hset(self.result_key, url, json.dumps(data))

    def get_queue_size(self) -> int:
        return self.redis.zcard(self.queue_key)

# 代理池管理
class ProxyPool:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.pool_key = "proxy:pool"

    def add_proxy(self, proxy: str, score: int = 100):
        self.redis.zadd(self.pool_key, {proxy: score})

    def get_proxy(self) -> Optional[str]:
        result = self.redis.zrevrange(self.pool_key, 0, 0)
        return result[0].decode() if result else None

    def report_success(self, proxy: str):
        self.redis.zincrby(self.pool_key, 1, proxy)

    def report_failure(self, proxy: str):
        score = self.redis.zscore(self.pool_key, proxy)
        if score and score > 0:
            self.redis.zincrby(self.pool_key, -10, proxy)

数据解析技巧

from parsel import Selector
import json

def parse_article(html: str) -> dict:
    """解析文章页面"""
    sel = Selector(text=html)

    return {
        "title": sel.css("h1.article-title::text").get("").strip(),
        "author": sel.css("span.author-name::text").get("").strip(),
        "content": "\n".join(sel.css("div.article-content p::text").getall()),
        "publish_date": sel.css("time::attr(datetime)").get(""),
        "tags": sel.css("a.tag::text").getall(),
        "views": parse_views(sel.css("span.views::text").get("0")),
    }

def parse_views(text: str) -> int:
    """解析阅读数(如 '1.2万' -> 12000)"""
    text = text.strip()
    if '万' in text:
        return int(float(text.replace('万', '')) * 10000)
    elif '亿' in text:
        return int(float(text.replace('亿', '')) * 100000000)
    return int(text.replace(',', ''))

合规与道德

  1. 遵守robots.txt:尊重网站的爬取规则
  2. 控制频率:每秒请求数不超过2次
  3. 标识身份:User-Agent中包含联系方式
  4. 数据使用:遵守相关法律法规
  5. 缓存策略:避免重复请求相同内容

总结

2025年的爬虫开发已经从”能抓就行”进化到”反检测+分布式+合规”的综合工程。掌握curl_cffi的TLS伪装和Playwright的浏览器自动化,是应对现代反爬的核心技能。

💡 独家建议:对于大多数场景,推荐优先使用curl_cffi方案(轻量高效),只在必须JS渲染时才使用Playwright。同时,考虑使用Scrapy-Playwright插件将两者结合,获得最佳效果。