目 录CONTENT

文章目录

【后端】FastAPI 中 asyncio.gather 的隐藏陷阱与最佳实践

EulerBlind
2025-09-15 / 0 评论 / 0 点赞 / 0 阅读 / 0 字

在开发 FastAPI 应用时,我们经常需要并发处理多个异步任务,比如同时请求多个 API、并行处理数据等。asyncio.gather() 看起来是个完美的选择,但它隐藏着一个危险的陷阱,特别是在与 FastAPI Background Tasks 结合使用时。

问题场景

假设你的 FastAPI 应用需要处理这样的需求:用户提交一批 URL,后台并发抓取这些网页内容。代码可能是这样的:

from fastapi import FastAPI, BackgroundTasks
import asyncio
import aiohttp
​
app = FastAPI()
​
async def fetch_url(session, url):
    async with session.get(url, timeout=30) as response:
        await asyncio.sleep(2)  # 模拟处理时间
        return await response.text()
​
async def process_urls_background(urls: list[str]):
    async with aiohttp.ClientSession() as session:
        # 看起来很正常的代码
        results = await asyncio.gather(
            *[fetch_url(session, url) for url in urls],
            return_exceptions=True
        )
    
    # 处理结果...
    print(f"处理完成: {len(results)} 个URL")
​
@app.post("/process-urls")
async def process_urls(urls: list[str], background_tasks: BackgroundTasks):
    background_tasks.add_task(process_urls_background, urls)
    return {"message": f"已启动后台任务处理 {len(urls)} 个URL"}

隐藏的危险

1. 僵尸任务问题

当使用 asyncio.gather() 时,如果某个任务失败(即使设置了 return_exceptions=True),其他任务不会被取消,而是继续在后台运行。在普通的请求-响应场景中,这些任务最终会在垃圾回收时清理,但在 Background Tasks 中,情况就不同了:

# 危险示例
async def dangerous_background_task(urls):
    try:
        # 如果这里有任务失败,其他任务继续运行
        results = await asyncio.gather(
            *[fetch_long_running_task(url) for url in urls]
        )
    except Exception as e:
        # 即使捕获了异常,其他任务仍在后台运行!
        print(f"出错了: {e}")
        return
    
    # 如果函数提前返回,未完成的任务变成"僵尸任务"

2. 资源泄露

这些僵尸任务会持续占用:

  • 内存:任务对象和相关数据结构
  • 网络连接:HTTP 连接池中的连接
  • 文件描述符:可能导致系统资源耗尽
  • CPU 时间:无意义的计算继续进行

3. 实际测试

让我们用代码验证这个问题:

import asyncio
import time
import weakref
from fastapi import FastAPI, BackgroundTasks
​
app = FastAPI()
​
# 用于跟踪活跃任务
active_tasks = weakref.WeakSet()
​
async def long_running_task(task_id, delay):
    current_task = asyncio.current_task()
    active_tasks.add(current_task)
    
    try:
        print(f"任务 {task_id} 开始 (延迟 {delay}s)")
        await asyncio.sleep(delay)
        
        if task_id == 2:  # 模拟第2个任务失败
            raise Exception(f"任务 {task_id} 失败了!")
            
        print(f"任务 {task_id} 完成")
        return f"结果-{task_id}"
        
    except asyncio.CancelledError:
        print(f"任务 {task_id} 被取消")
        raise
    except Exception as e:
        print(f"任务 {task_id} 异常: {e}")
        raise
    finally:
        print(f"任务 {task_id} 清理")
​
@app.post("/dangerous-gather")
async def dangerous_gather(background_tasks: BackgroundTasks):
    async def bg_task():
        print(f"开始时活跃任务: {len(active_tasks)}")
        
        try:
            # 危险的方式:gather 不会取消其他任务
            results = await asyncio.gather(
                long_running_task(1, 3),
                long_running_task(2, 1),  # 这个会失败
                long_running_task(3, 5),  # 这个会继续运行!
                long_running_task(4, 4),  # 这个也会继续运行!
                return_exceptions=True
            )
            print(f"Gather 结果: {results}")
        except Exception as e:
            print(f"背景任务异常: {e}")
        
        print(f"结束时活跃任务: {len(active_tasks)}")
    
    background_tasks.add_task(bg_task)
    return {"message": "启动了危险的 gather 任务"}
​
@app.get("/task-status")
async def task_status():
    return {"active_tasks": len(active_tasks)}

**访问 **/dangerous-gather 后再查看 /task-status,你会发现即使背景任务"完成"了,仍有任务在后台运行。

解决方案

1. 使用 asyncio.TaskGroup(推荐)

**Python 3.11+ 引入的 **TaskGroup 提供了结构化并发:

@app.post("/safe-taskgroup")
async def safe_taskgroup(background_tasks: BackgroundTasks):
    async def bg_task():
        try:
            results = []
            async with asyncio.TaskGroup() as tg:
                tasks = [
                    tg.create_task(long_running_task(i, i+1)) 
                    for i in range(1, 5)
                ]
            
            # 只有所有任务都成功,才会执行到这里
            results = [task.result() for task in tasks]
            print(f"所有任务成功: {results}")
            
        except* Exception as eg:
            # 一个任务失败,所有任务都被自动取消
            print(f"TaskGroup 中有任务失败: {[str(e) for e in eg.exceptions]}")
            # 此时所有任务都已经被清理
    
    background_tasks.add_task(bg_task)
    return {"message": "启动了安全的 TaskGroup 任务"}

2. 手动管理任务生命周期

对于需要更细粒度控制的场景:

async def manual_task_management(urls):
    tasks = []
    try:
        # 创建所有任务
        for i, url in enumerate(urls):
            task = asyncio.create_task(fetch_url(url, i))
            tasks.append(task)
        
        # 使用 as_completed 逐个处理完成的任务
        results = []
        for coro in asyncio.as_completed(tasks, timeout=30):
            try:
                result = await coro
                results.append(result)
            except Exception as e:
                print(f"任务失败: {e}")
                # 可选择是否取消其他任务
                # cancel_remaining_tasks(tasks)
        
        return results
        
    except Exception as e:
        # 确保所有任务都被取消
        for task in tasks:
            if not task.done():
                task.cancel()
        
        # 等待所有任务完成取消
        if tasks:
            await asyncio.gather(*tasks, return_exceptions=True)
        
        raise e
​
def cancel_remaining_tasks(tasks):
    """取消所有未完成的任务"""
    for task in tasks:
        if not task.done():
            task.cancel()

3. 带超时的安全 gather

如果必须使用 gather,至少要添加适当的错误处理:

async def safe_gather_with_timeout(urls, timeout=30):
    tasks = [asyncio.create_task(fetch_url(url)) for url in urls]
    
    try:
        # 添加总体超时
        results = await asyncio.wait_for(
            asyncio.gather(*tasks, return_exceptions=True),
            timeout=timeout
        )
        return results
        
    except asyncio.TimeoutError:
        print("操作超时,取消所有任务")
        # 取消所有任务
        for task in tasks:
            if not task.done():
                task.cancel()
        
        # 等待取消完成
        await asyncio.gather(*tasks, return_exceptions=True)
        raise
    
    except Exception as e:
        # 其他异常也要清理任务
        for task in tasks:
            if not task.done():
                task.cancel()
        
        await asyncio.gather(*tasks, return_exceptions=True)
        raise

最佳实践总结

  1. 优先使用 TaskGroup:提供结构化并发,自动管理任务生命周期
  2. 避免裸露的 gather:特别是在 Background Tasks 中
  3. 总是处理异常情况:确保异常时能正确清理资源
  4. 添加超时机制:防止任务无限期运行
  5. 监控资源使用:定期检查是否有任务泄露

监控和调试

添加任务监控来及时发现问题:

import weakref
import logging
​
class TaskTracker:
    def __init__(self):
        self.tasks = weakref.WeakSet()
        self.created_count = 0
    
    def track_task(self, task, name=None):
        self.tasks.add(task)
        self.created_count += 1
        task.add_done_callback(lambda t: logging.info(f"任务完成: {name}"))
    
    def get_stats(self):
        return {
            "active_tasks": len(self.tasks),
            "total_created": self.created_count
        }
​
# 全局任务跟踪器
task_tracker = TaskTracker()
​
@app.middleware("http")
async def track_background_tasks(request, call_next):
    before_stats = task_tracker.get_stats()
    response = await call_next(request)
    after_stats = task_tracker.get_stats()
    
    if after_stats["active_tasks"] > before_stats["active_tasks"]:
        logging.warning(f"请求后活跃任务增加: {after_stats}")
    
    return response

结论

asyncio.gather() 在简单场景下很有用,但在 FastAPI Background Tasks 中使用时需要格外小心。它不会自动清理失败任务,可能导致资源泄露和系统不稳定。

选择合适的并发模式:

  • 简单并发:使用 TaskGroup
  • 复杂控制:手动管理任务
  • 必须用 gather:添加完整的错误处理和清理机制
0

评论区