Python的asyncio库让异步编程变得前所未有的简单,但简单并不意味着容易。在实际项目中,很多开发者会掉进一些看似不起眼的陷阱,导致程序性能下降、死锁甚至崩溃。今天我把这几年踩过的坑和解决方案整理出来,希望能帮你少走弯路。
这是最常见的错误。很多开发者在async函数里直接使用requests.get()、open()等同步IO操作,结果整个事件循环被阻塞,其他协程全部卡住。
# 错误示范
async def fetch_data():
response = requests.get("https://api.example.com/data") # 阻塞整个事件循环
return response.json()
# 正确做法:使用aiohttp
import aiohttp
async def fetch_data():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/data") as response:
return await response.json()
# 或者用run_in_executor包装同步调用
import asyncio
from functools import partial
async def fetch_data():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None,
partial(requests.get, "https://api.example.com/data")
)
return result.json()
关键原则:在async函数中,任何可能超过几毫秒的IO操作都应该用异步版本或放到线程池中执行。
忘记加await是asyncio中最隐蔽的bug。代码不会报错,但协程不会被执行,你只会得到一个coroutine对象。
# 错误示范
async def main():
task = asyncio.create_task(some_work()) # 创建了任务但没有等待
# some_work可能还没执行完,main就结束了
# 正确做法
async def main():
task = asyncio.create_task(some_work())
result = await task # 等待任务完成
更隐蔽的情况是在函数调用链中遗漏await:
# 错误:中间某层忘记await
async def process(data):
cleaned = clean_data(data) # 忘记await!clean_data是协程
return cleaned
# 正确
async def process(data):
cleaned = await clean_data(data)
return cleaned
建议:使用mypy的asyncio插件或pyright等类型检查工具,它们能帮你检测到遗漏的await。
asyncio.gather默认在某个任务失败时会取消其他所有任务。如果你需要部分失败不影响整体,必须设置return_exceptions=True。
# 默认行为:一个失败全部取消
async def fetch_all():
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3)
) # 如果fetch_user(2)抛异常,fetch_user(3)也会被取消
# 更好的做法
async def fetch_all():
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务{i}失败: {result}")
else:
print(f"任务{i}成功: {result}")
在Python 3.10+中,asyncio.run()会自动处理事件循环的创建和关闭。但在一些需要手动管理事件循环的场景(如Jupyter Notebook、FastAPI的某些中间件),忘记关闭会导致资源泄漏。
# Python 3.10+ 推荐方式
async def main():
await do_something()
asyncio.run(main()) # 自动创建和关闭事件循环
# 需要手动管理的场景
async def main():
# ... 你的代码
pass
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
finally:
# 清理所有待处理的任务
pending = asyncio.all_tasks(loop)
for task in pending:
task.cancel()
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
一次性创建上千个并发请求看起来很高效,但实际上可能导致连接池耗尽、内存暴涨、甚至被目标服务器封IP。
# 错误:一次性创建10000个任务
async def fetch_all_urls(urls):
tasks = [fetch(url) for url in urls] # 10000个任务同时运行
return await asyncio.gather(*tasks)
# 正确:使用信号量控制并发数
async def fetch_all_urls(urls, max_concurrent=20):
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_fetch(url):
async with semaphore:
return await fetch(url)
tasks = [limited_fetch(url) for url in urls]
return await asyncio.gather(*tasks)
# 或者使用TaskGroup + 信号量(Python 3.11+)
async def fetch_all_urls(urls, max_concurrent=20):
semaphore = asyncio.Semaphore(max_concurrent)
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(limited_fetch(url, semaphore))
在asyncio中调用同步代码,而同步代码又尝试调用asyncio函数,这种循环依赖会导致死锁。
# 死锁场景
def sync_function():
# 这个函数在同步上下文中被调用
asyncio.run(async_function()) # 如果已经在事件循环中,这会报错或死锁
# 解决方案1:使用nest_asyncio(适合Jupyter等特殊环境)
import nest_asyncio
nest_asyncio.apply()
# 解决方案2:重新设计,避免嵌套
async def main():
# 把所有逻辑都放在异步上下文中
result = await async_function()
processed = await loop.run_in_executor(None, sync_process, result)
当Task被取消时,会抛出asyncio.CancelledError。如果你在异步上下文管理器中忽略了它,可能导致数据库连接、文件句柄等资源无法正确释放。
# 错误:取消后资源泄漏
async def process_with_db():
conn = await get_db_connection()
try:
result = await conn.execute("SELECT * FROM users")
await asyncio.sleep(10) # 这里可能被取消
return result
except asyncio.CancelledError:
return None # conn没有关闭!
# 正确:使用async with确保资源释放
async def process_with_db():
async with await get_db_connection() as conn:
result = await conn.execute("SELECT * FROM users")
await asyncio.sleep(10)
return result
# async with会确保即使被取消也能正确关闭连接
asyncio.sleep(0)的作用是让出控制权给事件循环,让其他等待中的协程有机会执行。但很多人误以为它能保证公平调度。
# asyncio.sleep(0)的正确用途
async def producer(queue):
for item in items:
await queue.put(item)
await asyncio.sleep(0) # 让消费者有机会处理
# 错误:以为sleep(0)能保证其他任务执行
async def bad_example():
while True:
await do_work()
await asyncio.sleep(0) # 这只是"建议"让出控制权,不保证
# 如果需要真正的定时执行,使用实际的时间间隔
async def timed_example():
while True:
await do_work()
await asyncio.sleep(0.1) # 至少间隔100ms
asyncio的事件循环不能跨进程共享。如果你使用多进程(multiprocessing),每个进程都需要自己独立的事件循环。
# 错误:在子进程中使用父进程的事件循环
import multiprocessing
async def worker():
# 这里会失败,因为子进程没有事件循环
await asyncio.sleep(1)
def start_worker():
multiprocessing.Process(target=worker).start()
# 正确:在子进程中创建新的事件循环
def start_worker():
def run():
asyncio.run(worker()) # 每个进程独立的事件循环
multiprocessing.Process(target=run).start()
# 或者使用多线程代替多进程(GIL对IO密集型任务影响不大)
import threading
async def worker():
await asyncio.sleep(1)
def start_workers(count=5):
for _ in range(count):
threading.Thread(target=lambda: asyncio.run(worker())).start()
协程中的异常如果不被正确捕获,可能会被”吞噬”,只留下一个”Task exception was never retrieved”的警告。
# 错误:异常被吞噬
async def main():
asyncio.create_task(risky_operation()) # 如果risky_operation抛异常,你只会看到警告
await asyncio.sleep(1)
# 正确:添加回调处理异常
def handle_exception(task):
try:
task.result()
except Exception as e:
print(f"任务失败: {e}")
async def main():
task = asyncio.create_task(risky_operation())
task.add_done_callback(handle_exception)
await asyncio.sleep(1)
# 或者使用TaskGroup(Python 3.11+)自动传播异常
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(risky_operation())
# 如果risky_operation抛异常,这里会自动抛出
asyncio是一个强大的工具,但需要开发者对事件循环的运行机制有清晰的理解。记住这几个核心原则:
掌握这些要点后,asyncio会成为你构建高性能Python应用的利器。如果你在项目中遇到其他asyncio相关的问题,欢迎在评论区讨论。