FastAPI/Uvicorn 异步服务中使用 multiprocessing.Pool 导致任务取消的问题
问题现象
在使用 FastAPI + Uvicorn 开发文件上传服务时,处理大文件(>30MB)的 PDF 压缩功能出现了诡异的崩溃问题:
压缩结果: 86084.pdf | 原始大小: 33.30MB → 压缩后: 33.44MB
INFO: 10.231.67.203:52002 - "POST /upload_files HTTP/1.1" 200 OK
INFO: Shutting down
INFO: Finished server process [1]
任务被取消: 86084.pdf
asyncio.exceptions.CancelledError
关键特征:
- 小文件处理正常,不触发压缩流程时一切正常
- 大文件压缩后,服务器突然收到关闭信号
- 异步任务被
CancelledError中断 - HTTP 响应返回 500 错误
代码结构
服务启动方式
# Dockerfile
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7000"]
注意:这里没有使用 --workers 参数,只有单个工作进程。
异步处理流程
# main.py - 文件上传服务
async def download_and_upload_file(self, file_info: Dict, session: aiohttp.ClientSession) -> Dict:
async with self.semaphore:
# 1. 异步下载文件
tmp_path = await self.download_file(file_info['file_url'], session)
# 2. 检查文件大小,决定是否压缩
file_size = os.path.getsize(tmp_path)
if await self.should_compress_file(file_size):
# 3. 使用线程池执行压缩(避免阻塞事件循环)
compressed_path, was_compressed = await self.compress_file(tmp_path, file_info['file_name'])
# 4. 上传到存储
success = await self.upload_to_storage(upload_path, remote_path)
async def compress_file(self, file_path: str, original_filename: str) -> Tuple[str, bool]:
"""异步压缩文件"""
loop = asyncio.get_event_loop()
# 将同步压缩操作放到线程池中执行
return await loop.run_in_executor(
None, # 使用默认线程池
self._sync_compress_file,
file_path,
original_filename
)
压缩函数(问题代码)
# compress_img.py - PDF 压缩模块
from multiprocessing import Pool # ❌ 问题所在
def convert_pdf_to_png(file_path, zoom=2):
cpu = 4
vectors = [(i, cpu, file_path, img_save_dir) for i in range(cpu)]
# ❌ 在异步环境中使用 Pool 会导致严重问题
pool = Pool(cpu)
res = pool.map(render_page, vectors, 1)
# ... 后续处理
return png_list, img_save_dir
问题根源分析
1. asyncio + multiprocessing.Pool 的致命组合
multiprocessing.Pool 默认使用 fork 模式创建子进程,这会导致以下问题:
事件循环状态复制
fork会复制父进程的内存空间,包括 asyncio 的事件循环状态- 子进程中的事件循环处于不一致状态(父进程中的线程、文件描述符等无法正确复制)
- 子进程尝试操作这些状态时可能触发未定义行为
进程状态污染与资源冲突
执行路径:
uvicorn (asyncio 事件循环)
└─> asyncio.run_in_executor (线程池)
└─> compress_img (同步函数)
└─> multiprocessing.Pool (fork 子进程)
└─> render_page (在子进程中执行)
关键问题:Pool 使用 fork 模式时会发生:
文件描述符共享冲突
- fork 的子进程继承了父进程打开的所有文件描述符(socket、文件等)
- uvicorn 的 HTTP 连接、日志文件等会被子进程"共享"
- 子进程结束时可能错误地关闭这些描述符
- 导致父进程的 uvicorn 认为连接断开,触发 shutdown 流程
事件循环损坏
- fork 复制了 asyncio 事件循环的内部状态(但子进程没有运行事件循环的线程)
- 子进程中某些操作可能意外触发事件循环相关的清理代码
- 通过共享内存影响父进程的事件循环状态
从日志看,压缩成功完成并返回了 200 OK,但之后 uvicorn 突然 shutdown,说明是 Pool 的清理阶段影响了 uvicorn 的运行状态。
2. 资源竞争
当多个并发请求同时处理大文件时:
- 每个请求都会创建一个 4 进程的
Pool - 并发度为 10 时,最多可能同时存在 40 个子进程
- 大量进程创建/销毁,资源耗尽,调度混乱
3. 为什么小文件没问题?
小文件不会触发压缩逻辑,不会创建 Pool,因此不会暴露问题。这也是为什么问题难以复现和定位。
解决方案
核心原则
在异步环境中使用多进程,必须使用 spawn 模式,并优先使用 concurrent.futures.ProcessPoolExecutor。
修复代码
# compress_img.py - 修复后
import multiprocessing
from concurrent.futures import ProcessPoolExecutor # ✅ 使用 futures
def convert_pdf_to_png(file_path, zoom=2):
cpu = 4
vectors = [(i, cpu, file_path, img_save_dir) for i in range(cpu)]
# ✅ 使用 ProcessPoolExecutor + spawn 模式
with ProcessPoolExecutor(
max_workers=cpu,
mp_context=multiprocessing.get_context('spawn') # 强制使用 spawn
) as executor:
res = list(executor.map(render_page, vectors, chunksize=1))
png_list = []
for i in res:
png_list.extend(i)
png_list.sort(key=lambda x: int(x.split('.')[-2].split('_')[-1]))
return png_list, img_save_dir
为什么这样修复有效?
ProcessPoolExecutor 的优势
- 专为现代 Python 并发设计,与
asyncio兼容更好 - 使用
with语句管理生命周期,确保资源正确释放 - 更好的异常处理机制
spawn 模式的作用
- 创建全新的 Python 解释器进程,不复制父进程内存
- 子进程环境干净,没有父进程的事件循环状态
- 信号隔离更好,不会误触发父进程关闭
上下文管理器的重要性
with ProcessPoolExecutor(...) as executor:
# 执行任务
# 自动等待所有任务完成并清理资源
即使发生异常,也能保证进程池正确关闭,避免僵尸进程。
最佳实践总结
1. 异步服务中使用多进程的黄金法则
| 场景 | 推荐方案 | 避免方案 |
|---|---|---|
| FastAPI/Uvicorn 服务 | ProcessPoolExecutor + spawn | multiprocessing.Pool |
| asyncio 程序 | loop.run_in_executor(ProcessPoolExecutor, ...) | 直接调用 Pool |
| CPU 密集型任务 | 先用 run_in_executor 进入线程,再启动进程池 | 在协程中直接创建进程 |
2. 完整的异步 + 多进程模板
import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from typing import List, Any
# 子进程中执行的函数(必须在模块顶层定义)
def cpu_intensive_task(data):
# 执行计算密集型任务
return process_data(data)
class AsyncService:
def __init__(self):
# 可以创建一个长期存在的进程池(可选)
self.process_pool = ProcessPoolExecutor(
max_workers=4,
mp_context=multiprocessing.get_context('spawn')
)
async def process_batch(self, items: List[Any]) -> List[Any]:
"""异步批量处理"""
loop = asyncio.get_event_loop()
# 方案 1:使用服务级别的进程池
results = await loop.run_in_executor(
self.process_pool,
self._batch_process,
items
)
# 方案 2:临时创建进程池(适合偶尔执行的任务)
with ProcessPoolExecutor(
max_workers=4,
mp_context=multiprocessing.get_context('spawn')
) as executor:
results = await loop.run_in_executor(
executor,
self._batch_process,
items
)
return results
def _batch_process(self, items: List[Any]) -> List[Any]:
"""在线程池中执行(会进一步调用进程池)"""
with ProcessPoolExecutor(
max_workers=4,
mp_context=multiprocessing.get_context('spawn')
) as executor:
return list(executor.map(cpu_intensive_task, items))
async def cleanup(self):
"""清理资源"""
if hasattr(self, 'process_pool'):
self.process_pool.shutdown(wait=True)
3. 调试技巧
当遇到类似问题时,检查以下几点:
# 1. 检查是否在异步上下文中
import asyncio
if asyncio.current_task() is not None:
print("⚠️ 当前在异步上下文中,不要直接使用 Pool!")
# 2. 强制使用 spawn 模式
multiprocessing.set_start_method('spawn', force=True) # 全局设置
# 3. 捕获进程池相关异常
try:
with ProcessPoolExecutor(...) as executor:
result = executor.submit(task, data)
return result.result(timeout=300) # 设置超时
except Exception as e:
logger.error(f"进程池执行失败: {e}", exc_info=True)
4. 性能优化建议
# ❌ 不好:每次调用都创建进程池
async def process_file(file_path):
with ProcessPoolExecutor(max_workers=4, ...) as executor:
# 创建进程池开销大(~100ms)
result = await loop.run_in_executor(executor, compress, file_path)
# ✅ 更好:复用进程池
class Service:
def __init__(self):
self.executor = ProcessPoolExecutor(
max_workers=4,
mp_context=multiprocessing.get_context('spawn')
)
async def process_file(self, file_path):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
compress,
file_path
)
相关问题排查清单
如果你的 FastAPI/Uvicorn 服务出现以下症状:
- 处理某些请求时服务器突然关闭
- 日志中出现
asyncio.exceptions.CancelledError - 看到
INFO: Shutting down但没有手动停止服务 - 使用了
multiprocessing.Pool或multiprocessing.Process - CPU 密集型任务在异步函数中执行
- 使用了
uvicorn --workers > 1
立即检查:
- 搜索代码中的
from multiprocessing import Pool - 确认所有多进程操作都使用
spawn模式 - 改用
concurrent.futures.ProcessPoolExecutor - 确保多进程代码通过
run_in_executor调用
延伸阅读
总结
在 FastAPI/Uvicorn 等异步服务中使用多进程时:
- 永远不要直接使用
multiprocessing.Pool - 始终使用
concurrent.futures.ProcessPoolExecutor - 强制指定
mp_context=multiprocessing.get_context('spawn') - 通过
loop.run_in_executor桥接异步和多进程 - 使用
with语句管理进程池生命周期
这个问题的隐蔽性在于:只有在特定条件下(大文件、CPU 密集型任务)才会触发,而小规模测试往往无法发现。希望这篇文章能帮助你避免同样的坑。