目 录CONTENT

文章目录

【后端】接口性能优化实战

EulerBlind
2025-10-23 / 0 评论 / 0 点赞 / 2 阅读 / 0 字

问题背景

最近遇到一个让人头疼的性能问题:生产环境中的用户统计接口 /statistics 一直处于pending状态,用户反馈页面加载非常慢,体验很差。这个接口的功能是统计所有用户的Agent使用情况,包括任务数、简历数、转化率等关键指标。

初步检查发现,这个接口在测试环境就已经出现超时现象,完全无法返回数据。作为一个需要实时展示的统计面板,这样的性能表现显然是不可接受的。

问题排查:找出真正的瓶颈

面对一个pending的接口,第一步是要找出它到底卡在哪里。我的排查思路是:先测试各个外部依赖,再逐步深入到具体的查询逻辑。

测试外部服务连通性

首先测试了应用服务的基础连通性,结果很理想:

async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5)) as session:
    start_time = time.time()
    async with session.get(f"{base_url}/health") as response:
        elapsed = time.time() - start_time
        # 结果: 0.058秒,响应正常

服务响应很快,这说明网络和服务本身没问题。接着测试了get_current_user_info接口,耗时0.14秒,也很正常。

定位到数据库查询

经过一系列测试,最终锁定了问题:复杂的4表JOIN查询导致了超时。原始代码是这样的:

query = (
    select(
        ac_subquery.c.user_id,
        ac_subquery.c.session_count,
        ac_subquery.c.cost,
        func.count(func.distinct(case((Resume.site == "a", AISearchTaskItem.candidate_id)))).label("a_total_count"),
        func.count(func.distinct(case((Resume.site != "a", AISearchTaskItem.candidate_id)))).label("other_total_count"),
        func.count(func.distinct(AISearchTaskItem.candidate_id)).label("total_count"),
    )
    .select_from(ac_subquery)
    .outerjoin(AISearchTask, AISearchTask.user_id == ac_subquery.c.user_id)
    .outerjoin(AISearchTaskItem, AISearchTaskItem.task_id == AISearchTask.id)
    .outerjoin(Resume, Resume.id == AISearchTaskItem.resume_id)
    .group_by(ac_subquery.c.user_id, ac_subquery.c.session_count, ac_subquery.c.cost)
)

查看数据量后,我明白了问题所在:

  • AgentConversation: 823条
  • AISearchTask: 12,352,324条(1235万)
  • AISearchTaskItem: 12,584,411条(1258万)
  • Resume: 9,722,825条(972万)

这是一个涉及千万级数据的4表JOIN查询!数据库在处理这样的查询时,即使有索引,也需要扫描和匹配大量数据,导致查询超时。

优化过程:一步步接近目标

找到瓶颈后,我开始了一系列优化尝试。每次优化都基于前一次的经验,逐步改进。

优化v1:分步查询,移除大表JOIN(32.9秒)

第一个想法是拆分这个复杂的JOIN。既然Resume表有972万条记录,那就不要JOIN它了。我将查询改为两步:

第一步,只查询AgentConversation和AISearchTask的数据:

ac_subquery = (
    select(
        AgentConversation.user_id.label("user_id"),
        func.count(func.distinct(AgentConversation.session_id)).label("session_count"),
        cast(func.sum(AgentConversation.total_cost) * 14, Integer).label("cost"),
    )
    .where(and_(*ac_subquery_conditions))
    .group_by(AgentConversation.user_id)
    .subquery()
)

第二步,分批查询AISearchTask和AISearchTaskItem:

batch_size = 50  # 每批50个用户
for i in range(0, len(base_user_ids), batch_size):
    batch_user_ids = base_user_ids[i:i + batch_size]
    batch_stats_query = (
        select(
            AISearchTask.user_id,
            func.count(func.distinct(AISearchTaskItem.candidate_id)).label("total_count"),
        )
        .select_from(AISearchTask)
        .join(AISearchTaskItem, AISearchTaskItem.task_id == AISearchTask.id)
        .where(AISearchTask.user_id.in_(batch_user_ids))
        .group_by(AISearchTask.user_id)
    )
    batch_results = (await self.db.execute(batch_stats_query)).mappings().all()

这个版本的优化思路是:

  1. 移除Resume表的JOIN,减少972万条数据的关联
  2. 串行批处理,每批处理50个用户
  3. 暂时不区分a/other简历(避免JOIN Resume表)

测试结果:32.9秒完成,虽然还是很慢,但至少能返回数据了!这证明了方向是对的。

优化v2:引入并发处理(26.3秒)

既然批处理有效,那能不能并发执行呢?我引入了asyncio的并发机制:

async def query_batch(batch_num: int, batch_user_ids: List[int]):
    """并发查询单个批次"""
    batch_stats_query = (...)  # 查询逻辑
    batch_results = (await self.db.execute(batch_stats_query)).mappings().all()
    return batch_results

# 创建所有批次的任务
tasks = []
for i in range(0, len(base_user_ids), batch_size):
    batch_user_ids = base_user_ids[i:i + batch_size]
    tasks.append(query_batch(i // batch_size + 1, batch_user_ids))

# 并发执行(最多3个批次同时)
max_concurrent = 3
for i in range(0, len(tasks), max_concurrent):
    batch_tasks = tasks[i:i + max_concurrent]
    results = await asyncio.gather(*batch_tasks)

这个版本的改进:

  • 批次大小改为10个用户
  • 最多3个批次并发执行
  • 利用数据库连接池的并发能力

测试结果:26.3秒,提升了20%。但我发现第一批用户用了27秒,因为这批包含了数据量最大的用户(28万+条简历)。

优化v3:更小批次+更高并发(19.5秒)

既然大用户会拖累整个批次,那就让批次更小,并发更高:

batch_size = 5         # 减小到5个用户/批
max_concurrent = 5     # 提高到5个批次并发

这样的好处是:

  • 更小的批次让大用户和小用户分布更均匀
  • 更高的并发充分利用数据库性能
  • 70个用户分成14批,分3轮并发执行

测试结果:19.5秒,提升了41%!从日志看:

  • 第1轮(5批并发):14秒
  • 第2轮(5批并发):2秒
  • 第3轮(4批并发):1.5秒

优化v4:完全避免JOIN(失败:49.9秒)

到这里我想,既然JOIN是瓶颈,能不能完全不用JOIN?我尝试了这样的方案:

步骤1:先查询每个用户的task_ids(单表查询)

task_query = (
    select(AISearchTask.user_id, AISearchTask.id)
    .where(AISearchTask.user_id.in_(batch_user_ids))
)

步骤2:按task_ids查询candidates(单表查询,无JOIN)

count_query = (
    select(func.count(func.distinct(AISearchTaskItem.candidate_id)))
    .where(AISearchTaskItem.task_id.in_(task_ids))  # 直接用IN
)

理论上,两次单表查询应该比JOIN快。但测试结果让我大跌眼镜:49.9秒

原因分析:某些用户有非常多的task_ids(几千个),导致 WHERE task_id IN (...) 的IN语句过长。数据库在处理这种超长IN语句时,反而比JOIN更慢。

这个失败的尝试给了我一个重要启示:JOIN本身不是问题,多用户批量JOIN才是瓶颈

优化v5:单用户查询+超高并发(最终版:17.5秒)

既然多用户批量JOIN慢,那就每个用户单独查询:

async def count_single_user(user_id: int):
    """统计单个用户的candidate数量"""
    count_query = (
        select(func.count(func.distinct(AISearchTaskItem.candidate_id)))
        .select_from(AISearchTask)
        .join(AISearchTaskItem, AISearchTaskItem.task_id == AISearchTask.id)
        .where(AISearchTask.user_id == user_id)  # 单用户查询
    )
    count = await self.db.scalar(count_query)
    return user_id, count or 0

# 并发查询所有用户
tasks = [count_single_user(user_id) for user_id in base_user_ids]

# 超高并发(15个用户同时)
max_concurrent = 15
for i in range(0, len(tasks), max_concurrent):
    batch_tasks = tasks[i:i + max_concurrent]
    results = await asyncio.gather(*batch_tasks)

这个方案的核心优势:

  1. 单用户JOIN很快WHERE user_id = X 的JOIN有索引支持,非常高效
  2. 避免大用户拖累:每个用户独立查询,互不影响
  3. 超高并发:15个连接同时执行,充分利用数据库性能
  4. 负载均衡:每轮等待最慢的用户,而非最慢的批次

测试结果:17.5秒,相比原始的32.9秒提升了47%

缓存方案:为什么选择内存缓存而非Redis

在考虑缓存方案时,我面临两个选择:内存缓存Redis缓存。最终选择了内存缓存,但这是一个需要权衡的决策。

业务特点分析

首先要明确这个统计面板的业务特点:

数据实时性要求不高。这是一个用户使用情况的统计面板,展示的是累积的任务数、简历数等指标。这些数据有以下特点:

  1. 变化频率低:用户的任务数和简历数不是秒级变化的,即使有新增,5-10分钟的延迟也是可接受的
  2. 查看频率低:管理员不会每分钟都刷新统计面板,通常是每天查看几次
  3. 数据容忍度高:统计数据允许有一定的延迟,不像交易金额那样需要强一致性

访问模式分析

  • 同一个管理员短时间内可能多次查看(刷新页面、切换tab)
  • 不同服务器实例上的请求访问相同的数据
  • 高峰期可能有多个管理员同时访问

内存缓存方案(当前实现)

基于项目的实际情况(没有Redis配置),我实现了类级别的内存缓存:

class AgentConversationHistoryService:
    # 类级别缓存,所有实例共享
    _cache = {}
    _cache_expire = {}
    
    @classmethod
    def _get_cache(cls, key: str):
        """获取缓存,如果过期则返回None"""
        from datetime import datetime
        if key in cls._cache:
            expire_time = cls._cache_expire.get(key)
            if expire_time and expire_time > datetime.now():
                return cls._cache[key]
            else:
                cls._cache.pop(key, None)
                cls._cache_expire.pop(key, None)
        return None
    
    @classmethod
    def _set_cache(cls, key: str, value, expire_seconds: int = 300):
        """设置缓存,默认5分钟过期"""
        from datetime import datetime, timedelta
        cls._cache[key] = value
        cls._cache_expire[key] = datetime.now() + timedelta(seconds=expire_seconds)

内存缓存的优势

  1. 零依赖:不需要部署和配置Redis
  2. 极低延迟:内存访问,<1ms
  3. 简单可靠:没有网络故障、连接池等问题
  4. 开发快速:不需要序列化/反序列化

内存缓存的局限

  1. 单机隔离:每个服务实例有独立的缓存,无法共享
  2. 重启丢失:服务重启后缓存清空
  3. 内存占用:数据一直占用进程内存

Redis缓存方案(更优选择)

虽然当前实现使用内存缓存,但从架构角度看,Redis缓存才是更好的长期方案

为什么Redis更适合这个场景

  1. 跨实例共享:多个服务实例共享同一份缓存,避免重复计算

    场景:3个服务实例,内存缓存下需要计算3次
    实例1: 查询数据库 17.5秒,缓存到实例1内存
    实例2: 查询数据库 17.5秒,缓存到实例2内存  ← 重复计算
    实例3: 查询数据库 17.5秒,缓存到实例3内存  ← 重复计算
    
    Redis缓存下只需计算1次:
    实例1: 查询数据库 17.5秒,缓存到Redis
    实例2: 从Redis读取 <0.1秒  ← 命中缓存
    实例3: 从Redis读取 <0.1秒  ← 命中缓存
    
  2. 持久化可选:Redis可以配置持久化,服务重启后缓存仍然有效

  3. 灵活的过期策略:可以精确控制缓存时间,甚至根据业务高峰期动态调整

  4. 缓存预热:可以通过定时任务在凌晨预先计算并缓存数据

Redis方案实现

async def get_user_statistics_with_redis(self, user: ATSUser, context: MCPContext):
    """使用Redis缓存的统计查询"""
    from redis import Redis
    import json
    
    redis_client = Redis(host='localhost', port=6379, db=0)
    cache_key = f"user_statistics:{user.tid}"
    
    # 1. 尝试从Redis获取
    cached_data = redis_client.get(cache_key)
    if cached_data:
        logging.info(f"Redis cache hit: {cache_key}")
        return json.loads(cached_data)
    
    # 2. 缓存未命中,查询数据库
    result = await self._query_statistics(user, context)
    
    # 3. 保存到Redis(5分钟过期)
    redis_client.setex(
        cache_key,
        300,  # 5分钟
        json.dumps(result, default=str)
    )
    
    return result

缓存时间的业务考量

基于业务特点,我建议的缓存策略是:

  • 日间(9:00-18:00):5分钟缓存

    • 理由:工作时间可能有更频繁的数据更新
    • 5分钟的延迟在统计场景完全可接受
  • 夜间(18:00-9:00):30分钟缓存

    • 理由:夜间数据变化少,且访问量低
    • 更长的缓存时间可以减轻数据库压力
  • 凌晨定时刷新:每天凌晨2点主动计算并缓存

    • 理由:避免白天首次访问的17.5秒等待
    • 用户白天访问时直接命中缓存
# 定时任务:凌晨预热缓存
@scheduler.task('cron', hour=2, minute=0)
async def warm_statistics_cache():
    """凌晨2点预热统计数据缓存"""
    # 获取所有需要统计的租户
    for tenant_id in get_all_tenants():
        try:
            # 计算统计数据
            result = await calculate_statistics(tenant_id)
            # 缓存到Redis(缓存到早上9点)
            redis_client.setex(
                f"user_statistics:{tenant_id}",
                25200,  # 7小时
                json.dumps(result, default=str)
            )
            logging.info(f"Cache warmed for tenant {tenant_id}")
        except Exception as e:
            logging.error(f"Failed to warm cache for {tenant_id}: {e}")

方案选择建议

当前阶段(已实现):

  • 使用内存缓存
  • 适合单实例或小规模部署
  • 快速上线,满足基本需求

推荐升级(长期方案):

  • 迁移到Redis缓存
  • 添加缓存预热机制
  • 适合多实例、大规模部署

业务价值对比

场景内存缓存Redis缓存
单实例部署✅ 够用⚠️ 过度设计
多实例部署❌ 重复计算✅ 共享缓存
服务重启❌ 缓存丢失✅ 缓存保留
首次访问17.5秒<0.1秒(预热后)
重复访问<0.1秒<0.1秒
部署成本✅ 无需额外组件⚠️ 需要Redis

最终建议:如果你的服务是多实例部署,或者预期访问量较大,强烈建议使用Redis缓存方案。5分钟的缓存时间配合凌晨预热,可以让99%的请求命中缓存,用户几乎感受不到任何延迟。

缓存效果对比:

  • 首次查询(无缓存):17.5秒
  • 首次查询(Redis预热后):<0.1秒
  • 重复访问:<0.1秒
  • 加速比:175倍

性能对比与总结

让我用一个表格来展示整个优化过程:

版本响应时间提升幅度核心策略
原始版本超时/卡死-4表JOIN,1200万+记录
优化v132.9秒可用分步查询,串行批处理
优化v226.3秒↓20%并发3批,10用户/批
优化v319.5秒↓41%并发5批,5用户/批
优化v449.9秒❌失败完全无JOIN
优化v517.5秒✅↓47%并发15,单用户查询
缓存命中<0.1秒↓99.7%内存缓存,5分钟过期

经验总结

经过这次优化实战,我总结了以下几点经验:

1. 性能瓶颈要用数据说话

不要凭感觉猜测瓶颈在哪里,而是要通过测试获取准确的数据。在这个案例中,我通过逐步测试:

  • 网络连通性:58ms ✅
  • 外部服务接口:140ms ✅
  • 数据库查询:超时 ❌

准确定位到了真正的瓶颈。如果一开始就盲目优化ATS接口或网络,那就是南辕北辙了。

这一点在缓存方案选择上同样适用:我先用内存缓存快速验证了缓存的有效性(从17.5秒降到<0.1秒),证明了缓存策略可行后,再考虑升级到Redis。而不是一开始就引入Redis,增加不必要的复杂度。

2. 复杂查询要拆分,但不是越拆越好

一开始我认为JOIN是罪魁祸首,所以尝试完全避免JOIN。但v4的失败告诉我:JOIN本身不慢,多用户批量JOIN才慢

关键洞察:

  • 单用户JOIN(WHERE user_id = X):有索引,很快
  • 多用户JOIN(WHERE user_id IN (...)):扫描数据量大,慢
  • 超长IN语句(IN (1000个id)):比JOIN更慢

所以最佳方案是:单用户查询 + 高并发

3. 并发不是越高越好,要考虑数据库压力

在优化过程中,我尝试了不同的并发数:

  • 3个批次并发:安全,但提升有限
  • 5个批次并发:效果不错
  • 10个用户并发:单用户查询快,可以更高
  • 15个用户并发:最佳平衡点

过高的并发会导致:

  • 数据库连接池耗尽
  • 查询相互竞争资源
  • 反而降低整体性能

实际测试发现,15个并发是这个场景的最佳值。如果你的场景不同,需要实测找到最佳并发数。

4. 缓存策略要基于业务特性

内存缓存让重复请求从17.5秒降到<0.1秒,效果显著。但缓存方案的选择必须基于业务特性

业务分析是关键

  • 数据实时性:这个统计面板的数据不需要秒级更新,5分钟延迟完全可接受
  • 访问频率:管理员查看频率不高,但可能短时间内多次刷新
  • 部署架构:多实例部署意味着内存缓存会重复计算

方案演进路径

  1. 快速验证:先用内存缓存验证效果(当前)
  2. 生产优化:升级到Redis缓存(推荐)
  3. 极致优化:添加定时任务预热缓存

Redis缓存的业务价值

  • 多实例场景下,避免重复计算(3个实例节省2次×17.5秒)
  • 配合凌晨定时任务,白天首次访问也能秒开
  • 5分钟缓存时间对于统计场景来说,既保证了数据新鲜度,又大幅降低了数据库压力

这不是技术选择,而是业务需求驱动的架构决策。

5. 优化是渐进的过程,不要追求一步到位

回顾这次优化,从32.9秒到17.5秒,经历了5个版本的迭代。每个版本都基于前一版本的经验:

  • v1验证了分步查询的可行性
  • v2证明了并发的有效性
  • v3找到了批次大小的平衡点
  • v4的失败揭示了JOIN的真相
  • v5最终找到了最佳方案

如果一开始就想找到完美方案,很可能会陷入分析瘫痪。不如先做一个可用的版本,然后逐步优化。

6. 监控和日志是优化的基础

在优化过程中,详细的日志帮了大忙:

logging.info(f"[get_user_statistics] Step 1 completed: {len(base_user_ids)} users found")
logging.info(f"[get_user_statistics] Processing batch {batch_num}/{total_batches}...")
logging.info(f"[get_user_statistics] Batch {batch_num} completed: {len(batch_results)} results")

这些日志让我清楚地看到:

  • 每个步骤的耗时
  • 哪个批次最慢
  • 数据量分布

没有这些信息,很难找到真正的瓶颈。建议在关键路径上都添加日志和监控。

写在最后

这次优化让我深刻体会到:性能优化不是玄学,而是科学。通过系统的测试、数据分析和渐进式优化,我们成功将一个超时的接口优化到了17.5秒,缓存命中时更是达到<0.1秒。

更重要的是,这个过程中积累的经验:

  • 如何定位性能瓶颈
  • 如何选择优化策略
  • 如何评估优化效果
  • 如何避免过度优化

这些经验适用于各种性能优化场景。希望这篇文章能给你带来一些启发。如果你也遇到类似的性能问题,不妨试试这些方法。

最后,性能优化永无止境。如果未来有需求,还可以考虑:

  • 升级到Redis缓存(强烈推荐):多实例部署必备,配合定时任务预热,首次访问也能秒开
  • 添加数据库索引:对AISearchTask.user_idAISearchTaskItem.task_id建立联合索引(预计可降到10-12秒)
  • 定时任务预计算:凌晨2点定时计算并缓存,白天全部命中缓存(可降到<0.1秒)
  • 前端分页加载:按团队或时间段分页,每页3-5秒

优先级建议:Redis缓存(立竿见影,多实例必备) > 数据库索引(持续优化) > 定时预计算(完美体验)

但现在17.5秒的首次加载 + <0.1秒的缓存命中,已经能够满足业务需求了。记住:够用就好,不要过度优化。而Redis缓存是从"够用"升级到"完美"的性价比最高的方案。

0
博主关闭了所有页面的评论