在开发 FastAPI 应用时,我们经常需要并发处理多个异步任务,比如同时请求多个 API、并行处理数据等。asyncio.gather()
看起来是个完美的选择,但它隐藏着一个危险的陷阱,特别是在与 FastAPI Background Tasks 结合使用时。
问题场景
假设你的 FastAPI 应用需要处理这样的需求:用户提交一批 URL,后台并发抓取这些网页内容。代码可能是这样的:
from fastapi import FastAPI, BackgroundTasks
import asyncio
import aiohttp
app = FastAPI()
async def fetch_url(session, url):
async with session.get(url, timeout=30) as response:
await asyncio.sleep(2) # 模拟处理时间
return await response.text()
async def process_urls_background(urls: list[str]):
async with aiohttp.ClientSession() as session:
# 看起来很正常的代码
results = await asyncio.gather(
*[fetch_url(session, url) for url in urls],
return_exceptions=True
)
# 处理结果...
print(f"处理完成: {len(results)} 个URL")
@app.post("/process-urls")
async def process_urls(urls: list[str], background_tasks: BackgroundTasks):
background_tasks.add_task(process_urls_background, urls)
return {"message": f"已启动后台任务处理 {len(urls)} 个URL"}
隐藏的危险
1. 僵尸任务问题
当使用 asyncio.gather()
时,如果某个任务失败(即使设置了 return_exceptions=True
),其他任务不会被取消,而是继续在后台运行。在普通的请求-响应场景中,这些任务最终会在垃圾回收时清理,但在 Background Tasks 中,情况就不同了:
# 危险示例
async def dangerous_background_task(urls):
try:
# 如果这里有任务失败,其他任务继续运行
results = await asyncio.gather(
*[fetch_long_running_task(url) for url in urls]
)
except Exception as e:
# 即使捕获了异常,其他任务仍在后台运行!
print(f"出错了: {e}")
return
# 如果函数提前返回,未完成的任务变成"僵尸任务"
2. 资源泄露
这些僵尸任务会持续占用:
- 内存:任务对象和相关数据结构
- 网络连接:HTTP 连接池中的连接
- 文件描述符:可能导致系统资源耗尽
- CPU 时间:无意义的计算继续进行
3. 实际测试
让我们用代码验证这个问题:
import asyncio
import time
import weakref
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
# 用于跟踪活跃任务
active_tasks = weakref.WeakSet()
async def long_running_task(task_id, delay):
current_task = asyncio.current_task()
active_tasks.add(current_task)
try:
print(f"任务 {task_id} 开始 (延迟 {delay}s)")
await asyncio.sleep(delay)
if task_id == 2: # 模拟第2个任务失败
raise Exception(f"任务 {task_id} 失败了!")
print(f"任务 {task_id} 完成")
return f"结果-{task_id}"
except asyncio.CancelledError:
print(f"任务 {task_id} 被取消")
raise
except Exception as e:
print(f"任务 {task_id} 异常: {e}")
raise
finally:
print(f"任务 {task_id} 清理")
@app.post("/dangerous-gather")
async def dangerous_gather(background_tasks: BackgroundTasks):
async def bg_task():
print(f"开始时活跃任务: {len(active_tasks)}")
try:
# 危险的方式:gather 不会取消其他任务
results = await asyncio.gather(
long_running_task(1, 3),
long_running_task(2, 1), # 这个会失败
long_running_task(3, 5), # 这个会继续运行!
long_running_task(4, 4), # 这个也会继续运行!
return_exceptions=True
)
print(f"Gather 结果: {results}")
except Exception as e:
print(f"背景任务异常: {e}")
print(f"结束时活跃任务: {len(active_tasks)}")
background_tasks.add_task(bg_task)
return {"message": "启动了危险的 gather 任务"}
@app.get("/task-status")
async def task_status():
return {"active_tasks": len(active_tasks)}
**访问 **/dangerous-gather
后再查看 /task-status
,你会发现即使背景任务"完成"了,仍有任务在后台运行。
解决方案
1. 使用 asyncio.TaskGroup(推荐)
**Python 3.11+ 引入的 **TaskGroup
提供了结构化并发:
@app.post("/safe-taskgroup")
async def safe_taskgroup(background_tasks: BackgroundTasks):
async def bg_task():
try:
results = []
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(long_running_task(i, i+1))
for i in range(1, 5)
]
# 只有所有任务都成功,才会执行到这里
results = [task.result() for task in tasks]
print(f"所有任务成功: {results}")
except* Exception as eg:
# 一个任务失败,所有任务都被自动取消
print(f"TaskGroup 中有任务失败: {[str(e) for e in eg.exceptions]}")
# 此时所有任务都已经被清理
background_tasks.add_task(bg_task)
return {"message": "启动了安全的 TaskGroup 任务"}
2. 手动管理任务生命周期
对于需要更细粒度控制的场景:
async def manual_task_management(urls):
tasks = []
try:
# 创建所有任务
for i, url in enumerate(urls):
task = asyncio.create_task(fetch_url(url, i))
tasks.append(task)
# 使用 as_completed 逐个处理完成的任务
results = []
for coro in asyncio.as_completed(tasks, timeout=30):
try:
result = await coro
results.append(result)
except Exception as e:
print(f"任务失败: {e}")
# 可选择是否取消其他任务
# cancel_remaining_tasks(tasks)
return results
except Exception as e:
# 确保所有任务都被取消
for task in tasks:
if not task.done():
task.cancel()
# 等待所有任务完成取消
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
raise e
def cancel_remaining_tasks(tasks):
"""取消所有未完成的任务"""
for task in tasks:
if not task.done():
task.cancel()
3. 带超时的安全 gather
如果必须使用 gather,至少要添加适当的错误处理:
async def safe_gather_with_timeout(urls, timeout=30):
tasks = [asyncio.create_task(fetch_url(url)) for url in urls]
try:
# 添加总体超时
results = await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=timeout
)
return results
except asyncio.TimeoutError:
print("操作超时,取消所有任务")
# 取消所有任务
for task in tasks:
if not task.done():
task.cancel()
# 等待取消完成
await asyncio.gather(*tasks, return_exceptions=True)
raise
except Exception as e:
# 其他异常也要清理任务
for task in tasks:
if not task.done():
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise
最佳实践总结
- 优先使用 TaskGroup:提供结构化并发,自动管理任务生命周期
- 避免裸露的 gather:特别是在 Background Tasks 中
- 总是处理异常情况:确保异常时能正确清理资源
- 添加超时机制:防止任务无限期运行
- 监控资源使用:定期检查是否有任务泄露
监控和调试
添加任务监控来及时发现问题:
import weakref
import logging
class TaskTracker:
def __init__(self):
self.tasks = weakref.WeakSet()
self.created_count = 0
def track_task(self, task, name=None):
self.tasks.add(task)
self.created_count += 1
task.add_done_callback(lambda t: logging.info(f"任务完成: {name}"))
def get_stats(self):
return {
"active_tasks": len(self.tasks),
"total_created": self.created_count
}
# 全局任务跟踪器
task_tracker = TaskTracker()
@app.middleware("http")
async def track_background_tasks(request, call_next):
before_stats = task_tracker.get_stats()
response = await call_next(request)
after_stats = task_tracker.get_stats()
if after_stats["active_tasks"] > before_stats["active_tasks"]:
logging.warning(f"请求后活跃任务增加: {after_stats}")
return response
结论
asyncio.gather()
在简单场景下很有用,但在 FastAPI Background Tasks 中使用时需要格外小心。它不会自动清理失败任务,可能导致资源泄露和系统不稳定。
选择合适的并发模式:
- 简单并发:使用
TaskGroup
- 复杂控制:手动管理任务
- 必须用 gather:添加完整的错误处理和清理机制
评论区