问题背景
最近遇到一个让人头疼的性能问题:生产环境中的用户统计接口 /statistics 一直处于pending状态,用户反馈页面加载非常慢,体验很差。这个接口的功能是统计所有用户的Agent使用情况,包括任务数、简历数、转化率等关键指标。
初步检查发现,这个接口在测试环境就已经出现超时现象,完全无法返回数据。作为一个需要实时展示的统计面板,这样的性能表现显然是不可接受的。
问题排查:找出真正的瓶颈
面对一个pending的接口,第一步是要找出它到底卡在哪里。我的排查思路是:先测试各个外部依赖,再逐步深入到具体的查询逻辑。
测试外部服务连通性
首先测试了应用服务的基础连通性,结果很理想:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=5)) as session:
start_time = time.time()
async with session.get(f"{base_url}/health") as response:
elapsed = time.time() - start_time
# 结果: 0.058秒,响应正常
服务响应很快,这说明网络和服务本身没问题。接着测试了get_current_user_info接口,耗时0.14秒,也很正常。
定位到数据库查询
经过一系列测试,最终锁定了问题:复杂的4表JOIN查询导致了超时。原始代码是这样的:
query = (
select(
ac_subquery.c.user_id,
ac_subquery.c.session_count,
ac_subquery.c.cost,
func.count(func.distinct(case((Resume.site == "a", AISearchTaskItem.candidate_id)))).label("a_total_count"),
func.count(func.distinct(case((Resume.site != "a", AISearchTaskItem.candidate_id)))).label("other_total_count"),
func.count(func.distinct(AISearchTaskItem.candidate_id)).label("total_count"),
)
.select_from(ac_subquery)
.outerjoin(AISearchTask, AISearchTask.user_id == ac_subquery.c.user_id)
.outerjoin(AISearchTaskItem, AISearchTaskItem.task_id == AISearchTask.id)
.outerjoin(Resume, Resume.id == AISearchTaskItem.resume_id)
.group_by(ac_subquery.c.user_id, ac_subquery.c.session_count, ac_subquery.c.cost)
)
查看数据量后,我明白了问题所在:
- AgentConversation: 823条
- AISearchTask: 12,352,324条(1235万)
- AISearchTaskItem: 12,584,411条(1258万)
- Resume: 9,722,825条(972万)
这是一个涉及千万级数据的4表JOIN查询!数据库在处理这样的查询时,即使有索引,也需要扫描和匹配大量数据,导致查询超时。
优化过程:一步步接近目标
找到瓶颈后,我开始了一系列优化尝试。每次优化都基于前一次的经验,逐步改进。
优化v1:分步查询,移除大表JOIN(32.9秒)
第一个想法是拆分这个复杂的JOIN。既然Resume表有972万条记录,那就不要JOIN它了。我将查询改为两步:
第一步,只查询AgentConversation和AISearchTask的数据:
ac_subquery = (
select(
AgentConversation.user_id.label("user_id"),
func.count(func.distinct(AgentConversation.session_id)).label("session_count"),
cast(func.sum(AgentConversation.total_cost) * 14, Integer).label("cost"),
)
.where(and_(*ac_subquery_conditions))
.group_by(AgentConversation.user_id)
.subquery()
)
第二步,分批查询AISearchTask和AISearchTaskItem:
batch_size = 50 # 每批50个用户
for i in range(0, len(base_user_ids), batch_size):
batch_user_ids = base_user_ids[i:i + batch_size]
batch_stats_query = (
select(
AISearchTask.user_id,
func.count(func.distinct(AISearchTaskItem.candidate_id)).label("total_count"),
)
.select_from(AISearchTask)
.join(AISearchTaskItem, AISearchTaskItem.task_id == AISearchTask.id)
.where(AISearchTask.user_id.in_(batch_user_ids))
.group_by(AISearchTask.user_id)
)
batch_results = (await self.db.execute(batch_stats_query)).mappings().all()
这个版本的优化思路是:
- 移除Resume表的JOIN,减少972万条数据的关联
- 串行批处理,每批处理50个用户
- 暂时不区分a/other简历(避免JOIN Resume表)
测试结果:32.9秒完成,虽然还是很慢,但至少能返回数据了!这证明了方向是对的。
优化v2:引入并发处理(26.3秒)
既然批处理有效,那能不能并发执行呢?我引入了asyncio的并发机制:
async def query_batch(batch_num: int, batch_user_ids: List[int]):
"""并发查询单个批次"""
batch_stats_query = (...) # 查询逻辑
batch_results = (await self.db.execute(batch_stats_query)).mappings().all()
return batch_results
# 创建所有批次的任务
tasks = []
for i in range(0, len(base_user_ids), batch_size):
batch_user_ids = base_user_ids[i:i + batch_size]
tasks.append(query_batch(i // batch_size + 1, batch_user_ids))
# 并发执行(最多3个批次同时)
max_concurrent = 3
for i in range(0, len(tasks), max_concurrent):
batch_tasks = tasks[i:i + max_concurrent]
results = await asyncio.gather(*batch_tasks)
这个版本的改进:
- 批次大小改为10个用户
- 最多3个批次并发执行
- 利用数据库连接池的并发能力
测试结果:26.3秒,提升了20%。但我发现第一批用户用了27秒,因为这批包含了数据量最大的用户(28万+条简历)。
优化v3:更小批次+更高并发(19.5秒)
既然大用户会拖累整个批次,那就让批次更小,并发更高:
batch_size = 5 # 减小到5个用户/批
max_concurrent = 5 # 提高到5个批次并发
这样的好处是:
- 更小的批次让大用户和小用户分布更均匀
- 更高的并发充分利用数据库性能
- 70个用户分成14批,分3轮并发执行
测试结果:19.5秒,提升了41%!从日志看:
- 第1轮(5批并发):14秒
- 第2轮(5批并发):2秒
- 第3轮(4批并发):1.5秒
优化v4:完全避免JOIN(失败:49.9秒)
到这里我想,既然JOIN是瓶颈,能不能完全不用JOIN?我尝试了这样的方案:
步骤1:先查询每个用户的task_ids(单表查询)
task_query = (
select(AISearchTask.user_id, AISearchTask.id)
.where(AISearchTask.user_id.in_(batch_user_ids))
)
步骤2:按task_ids查询candidates(单表查询,无JOIN)
count_query = (
select(func.count(func.distinct(AISearchTaskItem.candidate_id)))
.where(AISearchTaskItem.task_id.in_(task_ids)) # 直接用IN
)
理论上,两次单表查询应该比JOIN快。但测试结果让我大跌眼镜:49.9秒!
原因分析:某些用户有非常多的task_ids(几千个),导致 WHERE task_id IN (...) 的IN语句过长。数据库在处理这种超长IN语句时,反而比JOIN更慢。
这个失败的尝试给了我一个重要启示:JOIN本身不是问题,多用户批量JOIN才是瓶颈。
优化v5:单用户查询+超高并发(最终版:17.5秒)
既然多用户批量JOIN慢,那就每个用户单独查询:
async def count_single_user(user_id: int):
"""统计单个用户的candidate数量"""
count_query = (
select(func.count(func.distinct(AISearchTaskItem.candidate_id)))
.select_from(AISearchTask)
.join(AISearchTaskItem, AISearchTaskItem.task_id == AISearchTask.id)
.where(AISearchTask.user_id == user_id) # 单用户查询
)
count = await self.db.scalar(count_query)
return user_id, count or 0
# 并发查询所有用户
tasks = [count_single_user(user_id) for user_id in base_user_ids]
# 超高并发(15个用户同时)
max_concurrent = 15
for i in range(0, len(tasks), max_concurrent):
batch_tasks = tasks[i:i + max_concurrent]
results = await asyncio.gather(*batch_tasks)
这个方案的核心优势:
- 单用户JOIN很快:
WHERE user_id = X的JOIN有索引支持,非常高效 - 避免大用户拖累:每个用户独立查询,互不影响
- 超高并发:15个连接同时执行,充分利用数据库性能
- 负载均衡:每轮等待最慢的用户,而非最慢的批次
测试结果:17.5秒,相比原始的32.9秒提升了47%!
缓存方案:为什么选择内存缓存而非Redis
在考虑缓存方案时,我面临两个选择:内存缓存或Redis缓存。最终选择了内存缓存,但这是一个需要权衡的决策。
业务特点分析
首先要明确这个统计面板的业务特点:
数据实时性要求不高。这是一个用户使用情况的统计面板,展示的是累积的任务数、简历数等指标。这些数据有以下特点:
- 变化频率低:用户的任务数和简历数不是秒级变化的,即使有新增,5-10分钟的延迟也是可接受的
- 查看频率低:管理员不会每分钟都刷新统计面板,通常是每天查看几次
- 数据容忍度高:统计数据允许有一定的延迟,不像交易金额那样需要强一致性
访问模式分析:
- 同一个管理员短时间内可能多次查看(刷新页面、切换tab)
- 不同服务器实例上的请求访问相同的数据
- 高峰期可能有多个管理员同时访问
内存缓存方案(当前实现)
基于项目的实际情况(没有Redis配置),我实现了类级别的内存缓存:
class AgentConversationHistoryService:
# 类级别缓存,所有实例共享
_cache = {}
_cache_expire = {}
@classmethod
def _get_cache(cls, key: str):
"""获取缓存,如果过期则返回None"""
from datetime import datetime
if key in cls._cache:
expire_time = cls._cache_expire.get(key)
if expire_time and expire_time > datetime.now():
return cls._cache[key]
else:
cls._cache.pop(key, None)
cls._cache_expire.pop(key, None)
return None
@classmethod
def _set_cache(cls, key: str, value, expire_seconds: int = 300):
"""设置缓存,默认5分钟过期"""
from datetime import datetime, timedelta
cls._cache[key] = value
cls._cache_expire[key] = datetime.now() + timedelta(seconds=expire_seconds)
内存缓存的优势:
- 零依赖:不需要部署和配置Redis
- 极低延迟:内存访问,<1ms
- 简单可靠:没有网络故障、连接池等问题
- 开发快速:不需要序列化/反序列化
内存缓存的局限:
- 单机隔离:每个服务实例有独立的缓存,无法共享
- 重启丢失:服务重启后缓存清空
- 内存占用:数据一直占用进程内存
Redis缓存方案(更优选择)
虽然当前实现使用内存缓存,但从架构角度看,Redis缓存才是更好的长期方案。
为什么Redis更适合这个场景:
-
跨实例共享:多个服务实例共享同一份缓存,避免重复计算
场景:3个服务实例,内存缓存下需要计算3次 实例1: 查询数据库 17.5秒,缓存到实例1内存 实例2: 查询数据库 17.5秒,缓存到实例2内存 ← 重复计算 实例3: 查询数据库 17.5秒,缓存到实例3内存 ← 重复计算 Redis缓存下只需计算1次: 实例1: 查询数据库 17.5秒,缓存到Redis 实例2: 从Redis读取 <0.1秒 ← 命中缓存 实例3: 从Redis读取 <0.1秒 ← 命中缓存 -
持久化可选:Redis可以配置持久化,服务重启后缓存仍然有效
-
灵活的过期策略:可以精确控制缓存时间,甚至根据业务高峰期动态调整
-
缓存预热:可以通过定时任务在凌晨预先计算并缓存数据
Redis方案实现:
async def get_user_statistics_with_redis(self, user: ATSUser, context: MCPContext):
"""使用Redis缓存的统计查询"""
from redis import Redis
import json
redis_client = Redis(host='localhost', port=6379, db=0)
cache_key = f"user_statistics:{user.tid}"
# 1. 尝试从Redis获取
cached_data = redis_client.get(cache_key)
if cached_data:
logging.info(f"Redis cache hit: {cache_key}")
return json.loads(cached_data)
# 2. 缓存未命中,查询数据库
result = await self._query_statistics(user, context)
# 3. 保存到Redis(5分钟过期)
redis_client.setex(
cache_key,
300, # 5分钟
json.dumps(result, default=str)
)
return result
缓存时间的业务考量:
基于业务特点,我建议的缓存策略是:
-
日间(9:00-18:00):5分钟缓存
- 理由:工作时间可能有更频繁的数据更新
- 5分钟的延迟在统计场景完全可接受
-
夜间(18:00-9:00):30分钟缓存
- 理由:夜间数据变化少,且访问量低
- 更长的缓存时间可以减轻数据库压力
-
凌晨定时刷新:每天凌晨2点主动计算并缓存
- 理由:避免白天首次访问的17.5秒等待
- 用户白天访问时直接命中缓存
# 定时任务:凌晨预热缓存
@scheduler.task('cron', hour=2, minute=0)
async def warm_statistics_cache():
"""凌晨2点预热统计数据缓存"""
# 获取所有需要统计的租户
for tenant_id in get_all_tenants():
try:
# 计算统计数据
result = await calculate_statistics(tenant_id)
# 缓存到Redis(缓存到早上9点)
redis_client.setex(
f"user_statistics:{tenant_id}",
25200, # 7小时
json.dumps(result, default=str)
)
logging.info(f"Cache warmed for tenant {tenant_id}")
except Exception as e:
logging.error(f"Failed to warm cache for {tenant_id}: {e}")
方案选择建议
当前阶段(已实现):
- 使用内存缓存
- 适合单实例或小规模部署
- 快速上线,满足基本需求
推荐升级(长期方案):
- 迁移到Redis缓存
- 添加缓存预热机制
- 适合多实例、大规模部署
业务价值对比:
| 场景 | 内存缓存 | Redis缓存 |
|---|---|---|
| 单实例部署 | ✅ 够用 | ⚠️ 过度设计 |
| 多实例部署 | ❌ 重复计算 | ✅ 共享缓存 |
| 服务重启 | ❌ 缓存丢失 | ✅ 缓存保留 |
| 首次访问 | 17.5秒 | <0.1秒(预热后) |
| 重复访问 | <0.1秒 | <0.1秒 |
| 部署成本 | ✅ 无需额外组件 | ⚠️ 需要Redis |
最终建议:如果你的服务是多实例部署,或者预期访问量较大,强烈建议使用Redis缓存方案。5分钟的缓存时间配合凌晨预热,可以让99%的请求命中缓存,用户几乎感受不到任何延迟。
缓存效果对比:
- 首次查询(无缓存):17.5秒
- 首次查询(Redis预热后):<0.1秒
- 重复访问:<0.1秒
- 加速比:175倍!
性能对比与总结
让我用一个表格来展示整个优化过程:
| 版本 | 响应时间 | 提升幅度 | 核心策略 |
|---|---|---|---|
| 原始版本 | 超时/卡死 | - | 4表JOIN,1200万+记录 |
| 优化v1 | 32.9秒 | 可用 | 分步查询,串行批处理 |
| 优化v2 | 26.3秒 | ↓20% | 并发3批,10用户/批 |
| 优化v3 | 19.5秒 | ↓41% | 并发5批,5用户/批 |
| 优化v4 | 49.9秒 | ❌失败 | 完全无JOIN |
| 优化v5 | 17.5秒 | ✅↓47% | 并发15,单用户查询 |
| 缓存命中 | <0.1秒 | ↓99.7% | 内存缓存,5分钟过期 |
经验总结
经过这次优化实战,我总结了以下几点经验:
1. 性能瓶颈要用数据说话
不要凭感觉猜测瓶颈在哪里,而是要通过测试获取准确的数据。在这个案例中,我通过逐步测试:
- 网络连通性:58ms ✅
- 外部服务接口:140ms ✅
- 数据库查询:超时 ❌
准确定位到了真正的瓶颈。如果一开始就盲目优化ATS接口或网络,那就是南辕北辙了。
这一点在缓存方案选择上同样适用:我先用内存缓存快速验证了缓存的有效性(从17.5秒降到<0.1秒),证明了缓存策略可行后,再考虑升级到Redis。而不是一开始就引入Redis,增加不必要的复杂度。
2. 复杂查询要拆分,但不是越拆越好
一开始我认为JOIN是罪魁祸首,所以尝试完全避免JOIN。但v4的失败告诉我:JOIN本身不慢,多用户批量JOIN才慢。
关键洞察:
- 单用户JOIN(WHERE user_id = X):有索引,很快
- 多用户JOIN(WHERE user_id IN (...)):扫描数据量大,慢
- 超长IN语句(IN (1000个id)):比JOIN更慢
所以最佳方案是:单用户查询 + 高并发。
3. 并发不是越高越好,要考虑数据库压力
在优化过程中,我尝试了不同的并发数:
- 3个批次并发:安全,但提升有限
- 5个批次并发:效果不错
- 10个用户并发:单用户查询快,可以更高
- 15个用户并发:最佳平衡点
过高的并发会导致:
- 数据库连接池耗尽
- 查询相互竞争资源
- 反而降低整体性能
实际测试发现,15个并发是这个场景的最佳值。如果你的场景不同,需要实测找到最佳并发数。
4. 缓存策略要基于业务特性
内存缓存让重复请求从17.5秒降到<0.1秒,效果显著。但缓存方案的选择必须基于业务特性:
业务分析是关键:
- 数据实时性:这个统计面板的数据不需要秒级更新,5分钟延迟完全可接受
- 访问频率:管理员查看频率不高,但可能短时间内多次刷新
- 部署架构:多实例部署意味着内存缓存会重复计算
方案演进路径:
- 快速验证:先用内存缓存验证效果(当前)
- 生产优化:升级到Redis缓存(推荐)
- 极致优化:添加定时任务预热缓存
Redis缓存的业务价值:
- 多实例场景下,避免重复计算(3个实例节省2次×17.5秒)
- 配合凌晨定时任务,白天首次访问也能秒开
- 5分钟缓存时间对于统计场景来说,既保证了数据新鲜度,又大幅降低了数据库压力
这不是技术选择,而是业务需求驱动的架构决策。
5. 优化是渐进的过程,不要追求一步到位
回顾这次优化,从32.9秒到17.5秒,经历了5个版本的迭代。每个版本都基于前一版本的经验:
- v1验证了分步查询的可行性
- v2证明了并发的有效性
- v3找到了批次大小的平衡点
- v4的失败揭示了JOIN的真相
- v5最终找到了最佳方案
如果一开始就想找到完美方案,很可能会陷入分析瘫痪。不如先做一个可用的版本,然后逐步优化。
6. 监控和日志是优化的基础
在优化过程中,详细的日志帮了大忙:
logging.info(f"[get_user_statistics] Step 1 completed: {len(base_user_ids)} users found")
logging.info(f"[get_user_statistics] Processing batch {batch_num}/{total_batches}...")
logging.info(f"[get_user_statistics] Batch {batch_num} completed: {len(batch_results)} results")
这些日志让我清楚地看到:
- 每个步骤的耗时
- 哪个批次最慢
- 数据量分布
没有这些信息,很难找到真正的瓶颈。建议在关键路径上都添加日志和监控。
写在最后
这次优化让我深刻体会到:性能优化不是玄学,而是科学。通过系统的测试、数据分析和渐进式优化,我们成功将一个超时的接口优化到了17.5秒,缓存命中时更是达到<0.1秒。
更重要的是,这个过程中积累的经验:
- 如何定位性能瓶颈
- 如何选择优化策略
- 如何评估优化效果
- 如何避免过度优化
这些经验适用于各种性能优化场景。希望这篇文章能给你带来一些启发。如果你也遇到类似的性能问题,不妨试试这些方法。
最后,性能优化永无止境。如果未来有需求,还可以考虑:
- 升级到Redis缓存(强烈推荐):多实例部署必备,配合定时任务预热,首次访问也能秒开
- 添加数据库索引:对
AISearchTask.user_id和AISearchTaskItem.task_id建立联合索引(预计可降到10-12秒) - 定时任务预计算:凌晨2点定时计算并缓存,白天全部命中缓存(可降到<0.1秒)
- 前端分页加载:按团队或时间段分页,每页3-5秒
优先级建议:Redis缓存(立竿见影,多实例必备) > 数据库索引(持续优化) > 定时预计算(完美体验)
但现在17.5秒的首次加载 + <0.1秒的缓存命中,已经能够满足业务需求了。记住:够用就好,不要过度优化。而Redis缓存是从"够用"升级到"完美"的性价比最高的方案。