2026年,爬虫技术已经发展到了一个新阶段。AI驱动的反爬系统越来越智能,法律法规也在不断完善。在这个背景下,写爬虫不再只是技术问题,更是法律和道德问题。这篇文章从法律边界、道德规范、技术实践三个维度,帮你建立完整的”道德爬虫”知识体系。
在中国,与爬虫相关的主要法律包括:
合法爬取的特征:
可能违法的特征:
robots.txt是网站的”门牌”,告诉爬虫哪些页面可以访问,哪些不可以:
# robots.txt示例
User-agent: *
Allow: /public/
Disallow: /private/
Disallow: /api/
Disallow: /user/
# 限制爬取频率
Crawl-delay: 5
import urllib.robotparser
def check_robots_txt(url):
"""检查URL是否允许爬取"""
rp = urllib.robotparser.RobotFileParser()
base_url = f"{url.scheme}://{url.netloc}/robots.txt"
rp.set_url(base_url)
rp.read()
can_fetch = rp.can_fetch("*", url.geturl())
crawl_delay = rp.crawl_delay("*")
return can_fetch, crawl_delay
重要提醒:robots.txt是道德约束,不是法律强制。但它代表了网站所有者的意愿,遵守它是道德爬虫的第一步。
import asyncio
import aiohttp
import time
class EthicalCrawler:
def __init__(self, base_delay=1.0, max_concurrent=5):
self.base_delay = base_delay
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.request_times = []
async def fetch(self, session, url):
"""带频率控制的请求"""
async with self.semaphore:
# 自适应延迟:根据最近请求时间动态调整
if self.request_times:
elapsed = time.time() - self.request_times[-1]
if elapsed < self.base_delay:
await asyncio.sleep(self.base_delay - elapsed)
self.request_times.append(time.time())
# 只保留最近100次请求时间
self.request_times = self.request_times[-100:]
try:
async with session.get(url) as response:
if response.status == 429:
# 被限流,等待后重试
retry_after = int(response.headers.get("Retry-After", 60))
print(f"触发限流,等待{retry_after}秒")
await asyncio.sleep(retry_after)
return await self.fetch(session, url)
response.raise_for_status()
return await response.text()
except aiohttp.ClientError as e:
print(f"请求失败: {url} - {e}")
return None
# 好的User-Agent:清楚表明你的身份
HEADERS = {
"User-Agent": "MyResearchBot/1.0 (学术研究项目; contact@example.com)",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "zh-CN,zh;q=0.9",
}
# 不好的User-Agent:伪装成浏览器
BAD_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...",
}
原则:诚实表明你的身份。如果网站管理员有问题,他们可以通过联系方式找到你。
import asyncio
import aiohttp
import urllib.robotparser
from urllib.parse import urlparse
import logging
from dataclasses import dataclass
from typing import Optional
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class CrawlConfig:
base_url: str
max_concurrent: int = 5
base_delay: float = 2.0
timeout: int = 30
max_retries: int = 3
respect_robots: bool = True
user_agent: str = "EthicalCrawler/1.0 (research; contact@example.com)"
class EthicalCrawler:
def __init__(self, config: CrawlConfig):
self.config = config
self.semaphore = asyncio.Semaphore(config.max_concurrent)
self.visited = set()
self.robot_parser = urllib.robotparser.RobotFileParser()
if config.respect_robots:
self.robot_parser.set_url(f"{config.base_url}/robots.txt")
try:
self.robot_parser.read()
logger.info("已加载robots.txt规则")
except Exception as e:
logger.warning(f"无法读取robots.txt: {e}")
def is_allowed(self, url: str) -> bool:
"""检查URL是否允许爬取"""
if not self.config.respect_robots:
return True
parsed = urlparse(url)
if parsed.netloc != urlparse(self.config.base_url).netloc:
logger.warning(f"跨域请求: {url}")
return False
return self.robot_parser.can_fetch(self.config.user_agent, url)
async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> Optional[str]:
"""获取页面内容"""
if url in self.visited:
return None
if not self.is_allowed(url):
logger.info(f"robots.txt禁止访问: {url}")
return None
async with self.semaphore:
self.visited.add(url)
logger.info(f"正在爬取: {url}")
for attempt in range(self.config.max_retries):
try:
async with session.get(
url,
headers={"User-Agent": self.config.user_agent},
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
) as response:
if response.status == 429:
retry_after = int(
response.headers.get("Retry-After", 60)
)
logger.warning(f"限流,等待{retry_after}秒")
await asyncio.sleep(retry_after)
continue
if response.status >= 400:
logger.warning(f"HTTP {response.status}: {url}")
return None
# 遵守延迟
await asyncio.sleep(self.config.base_delay)
return await response.text()
except Exception as e:
logger.error(f"请求异常(第{attempt+1}次): {e}")
if attempt < self.config.max_retries - 1:
await asyncio.sleep(2 ** attempt) # 指数退避
return None
async def crawl(self, urls: list[str]) -> list[tuple[str, str]]:
"""批量爬取URL"""
results = []
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks, return_exceptions=True)
for url, page in zip(urls, pages):
if isinstance(page, str) and page:
results.append((url, page))
elif isinstance(page, Exception):
logger.error(f"爬取失败: {url} - {page}")
logger.info(f"爬取完成: {len(results)}/{len(urls)} 个页面成功")
return results
# 使用示例
async def main():
config = CrawlConfig(
base_url="https://example.com",
max_concurrent=3,
base_delay=3.0,
)
crawler = EthicalCrawler(config)
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
]
results = await crawler.crawl(urls)
for url, content in results:
print(f"成功获取 {url}: {len(content)} 字符")
if __name__ == "__main__":
asyncio.run(main())
爬到数据只是第一步,如何使用同样重要:
在写爬虫之前,先检查目标网站是否提供官方API:
import requests
def check_api_availability(base_url):
"""检查常见的API端点"""
common_paths = [
"/api", "/api/v1", "/api/v2",
"/docs", "/swagger", "/openapi.json",
"/graphql"
]
for path in common_paths:
try:
resp = requests.get(f"{base_url}{path}", timeout=5)
if resp.status_code == 200:
print(f"发现API端点: {base_url}{path}")
return f"{base_url}{path}"
except requests.RequestException:
continue
return None
官方API通常有明确的使用条款,遵守这些条款比写爬虫更安全、更稳定、更高效。
道德爬虫的核心原则很简单:尊重网站所有者的意愿,不给对方造成负担,不侵犯他人权益。
具体来说:
技术能力越大,责任越大。做一个有底线的爬虫工程师,不仅保护自己,也保护整个技术社区的健康发展。