FastAPI/Uvicorn 异步服务中使用 multiprocessing.Pool 导致任务取消的问题

FastAPI/Uvicorn 异步服务中使用 multiprocessing.Pool 导致任务取消的问题

问题现象

在使用 FastAPI + Uvicorn 开发文件上传服务时,处理大文件(>30MB)的 PDF 压缩功能出现了诡异的崩溃问题:

压缩结果: 86084.pdf | 原始大小: 33.30MB → 压缩后: 33.44MB
INFO:     10.231.67.203:52002 - "POST /upload_files HTTP/1.1" 200 OK
INFO:     Shutting down
INFO:     Finished server process [1]
任务被取消: 86084.pdf
asyncio.exceptions.CancelledError

关键特征:

  • 小文件处理正常,不触发压缩流程时一切正常
  • 大文件压缩后,服务器突然收到关闭信号
  • 异步任务被 CancelledError 中断
  • HTTP 响应返回 500 错误

代码结构

服务启动方式

# Dockerfile
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7000"]

注意:这里没有使用 --workers 参数,只有单个工作进程。

异步处理流程

# main.py - 文件上传服务
async def download_and_upload_file(self, file_info: Dict, session: aiohttp.ClientSession) -> Dict:
    async with self.semaphore:
        # 1. 异步下载文件
        tmp_path = await self.download_file(file_info['file_url'], session)
        
        # 2. 检查文件大小,决定是否压缩
        file_size = os.path.getsize(tmp_path)
        if await self.should_compress_file(file_size):
            # 3. 使用线程池执行压缩(避免阻塞事件循环)
            compressed_path, was_compressed = await self.compress_file(tmp_path, file_info['file_name'])
        
        # 4. 上传到存储
        success = await self.upload_to_storage(upload_path, remote_path)

async def compress_file(self, file_path: str, original_filename: str) -> Tuple[str, bool]:
    """异步压缩文件"""
    loop = asyncio.get_event_loop()
    # 将同步压缩操作放到线程池中执行
    return await loop.run_in_executor(
        None,  # 使用默认线程池
        self._sync_compress_file,
        file_path,
        original_filename
    )

压缩函数(问题代码)

# compress_img.py - PDF 压缩模块
from multiprocessing import Pool  # ❌ 问题所在

def convert_pdf_to_png(file_path, zoom=2):
    cpu = 4
    vectors = [(i, cpu, file_path, img_save_dir) for i in range(cpu)]
    
    # ❌ 在异步环境中使用 Pool 会导致严重问题
    pool = Pool(cpu)
    res = pool.map(render_page, vectors, 1)
    # ... 后续处理
    return png_list, img_save_dir

问题根源分析

1. asyncio + multiprocessing.Pool 的致命组合

multiprocessing.Pool 默认使用 fork 模式创建子进程,这会导致以下问题:

事件循环状态复制

  • fork 会复制父进程的内存空间,包括 asyncio 的事件循环状态
  • 子进程中的事件循环处于不一致状态(父进程中的线程、文件描述符等无法正确复制)
  • 子进程尝试操作这些状态时可能触发未定义行为

进程状态污染与资源冲突

执行路径:

uvicorn (asyncio 事件循环)
  └─> asyncio.run_in_executor (线程池)
       └─> compress_img (同步函数)
            └─> multiprocessing.Pool (fork 子进程)
                 └─> render_page (在子进程中执行)

关键问题Pool 使用 fork 模式时会发生:

  1. 文件描述符共享冲突

    • fork 的子进程继承了父进程打开的所有文件描述符(socket、文件等)
    • uvicorn 的 HTTP 连接、日志文件等会被子进程"共享"
    • 子进程结束时可能错误地关闭这些描述符
    • 导致父进程的 uvicorn 认为连接断开,触发 shutdown 流程
  2. 事件循环损坏

    • fork 复制了 asyncio 事件循环的内部状态(但子进程没有运行事件循环的线程)
    • 子进程中某些操作可能意外触发事件循环相关的清理代码
    • 通过共享内存影响父进程的事件循环状态

从日志看,压缩成功完成并返回了 200 OK,但之后 uvicorn 突然 shutdown,说明是 Pool 的清理阶段影响了 uvicorn 的运行状态。

2. 资源竞争

当多个并发请求同时处理大文件时:

  • 每个请求都会创建一个 4 进程的 Pool
  • 并发度为 10 时,最多可能同时存在 40 个子进程
  • 大量进程创建/销毁,资源耗尽,调度混乱

3. 为什么小文件没问题?

小文件不会触发压缩逻辑,不会创建 Pool,因此不会暴露问题。这也是为什么问题难以复现和定位。

解决方案

核心原则

在异步环境中使用多进程,必须使用 spawn 模式,并优先使用 concurrent.futures.ProcessPoolExecutor

修复代码

# compress_img.py - 修复后
import multiprocessing
from concurrent.futures import ProcessPoolExecutor  # ✅ 使用 futures

def convert_pdf_to_png(file_path, zoom=2):
    cpu = 4
    vectors = [(i, cpu, file_path, img_save_dir) for i in range(cpu)]
    
    # ✅ 使用 ProcessPoolExecutor + spawn 模式
    with ProcessPoolExecutor(
        max_workers=cpu, 
        mp_context=multiprocessing.get_context('spawn')  # 强制使用 spawn
    ) as executor:
        res = list(executor.map(render_page, vectors, chunksize=1))
    
    png_list = []
    for i in res:
        png_list.extend(i)
    png_list.sort(key=lambda x: int(x.split('.')[-2].split('_')[-1]))
    return png_list, img_save_dir

为什么这样修复有效?

ProcessPoolExecutor 的优势

  • 专为现代 Python 并发设计,与 asyncio 兼容更好
  • 使用 with 语句管理生命周期,确保资源正确释放
  • 更好的异常处理机制

spawn 模式的作用

  • 创建全新的 Python 解释器进程,不复制父进程内存
  • 子进程环境干净,没有父进程的事件循环状态
  • 信号隔离更好,不会误触发父进程关闭

上下文管理器的重要性

with ProcessPoolExecutor(...) as executor:
    # 执行任务
# 自动等待所有任务完成并清理资源

即使发生异常,也能保证进程池正确关闭,避免僵尸进程。

最佳实践总结

1. 异步服务中使用多进程的黄金法则

场景推荐方案避免方案
FastAPI/Uvicorn 服务ProcessPoolExecutor + spawnmultiprocessing.Pool
asyncio 程序loop.run_in_executor(ProcessPoolExecutor, ...)直接调用 Pool
CPU 密集型任务先用 run_in_executor 进入线程,再启动进程池在协程中直接创建进程

2. 完整的异步 + 多进程模板

import asyncio
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from typing import List, Any

# 子进程中执行的函数(必须在模块顶层定义)
def cpu_intensive_task(data):
    # 执行计算密集型任务
    return process_data(data)

class AsyncService:
    def __init__(self):
        # 可以创建一个长期存在的进程池(可选)
        self.process_pool = ProcessPoolExecutor(
            max_workers=4,
            mp_context=multiprocessing.get_context('spawn')
        )
    
    async def process_batch(self, items: List[Any]) -> List[Any]:
        """异步批量处理"""
        loop = asyncio.get_event_loop()
        
        # 方案 1:使用服务级别的进程池
        results = await loop.run_in_executor(
            self.process_pool,
            self._batch_process,
            items
        )
        
        # 方案 2:临时创建进程池(适合偶尔执行的任务)
        with ProcessPoolExecutor(
            max_workers=4,
            mp_context=multiprocessing.get_context('spawn')
        ) as executor:
            results = await loop.run_in_executor(
                executor,
                self._batch_process,
                items
            )
        
        return results
    
    def _batch_process(self, items: List[Any]) -> List[Any]:
        """在线程池中执行(会进一步调用进程池)"""
        with ProcessPoolExecutor(
            max_workers=4,
            mp_context=multiprocessing.get_context('spawn')
        ) as executor:
            return list(executor.map(cpu_intensive_task, items))
    
    async def cleanup(self):
        """清理资源"""
        if hasattr(self, 'process_pool'):
            self.process_pool.shutdown(wait=True)

3. 调试技巧

当遇到类似问题时,检查以下几点:

# 1. 检查是否在异步上下文中
import asyncio
if asyncio.current_task() is not None:
    print("⚠️ 当前在异步上下文中,不要直接使用 Pool!")

# 2. 强制使用 spawn 模式
multiprocessing.set_start_method('spawn', force=True)  # 全局设置

# 3. 捕获进程池相关异常
try:
    with ProcessPoolExecutor(...) as executor:
        result = executor.submit(task, data)
        return result.result(timeout=300)  # 设置超时
except Exception as e:
    logger.error(f"进程池执行失败: {e}", exc_info=True)

4. 性能优化建议

# ❌ 不好:每次调用都创建进程池
async def process_file(file_path):
    with ProcessPoolExecutor(max_workers=4, ...) as executor:
        # 创建进程池开销大(~100ms)
        result = await loop.run_in_executor(executor, compress, file_path)

# ✅ 更好:复用进程池
class Service:
    def __init__(self):
        self.executor = ProcessPoolExecutor(
            max_workers=4,
            mp_context=multiprocessing.get_context('spawn')
        )
    
    async def process_file(self, file_path):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor, 
            compress, 
            file_path
        )

相关问题排查清单

如果你的 FastAPI/Uvicorn 服务出现以下症状:

  • 处理某些请求时服务器突然关闭
  • 日志中出现 asyncio.exceptions.CancelledError
  • 看到 INFO: Shutting down 但没有手动停止服务
  • 使用了 multiprocessing.Poolmultiprocessing.Process
  • CPU 密集型任务在异步函数中执行
  • 使用了 uvicorn --workers > 1

立即检查:

  1. 搜索代码中的 from multiprocessing import Pool
  2. 确认所有多进程操作都使用 spawn 模式
  3. 改用 concurrent.futures.ProcessPoolExecutor
  4. 确保多进程代码通过 run_in_executor 调用

延伸阅读

总结

在 FastAPI/Uvicorn 等异步服务中使用多进程时:

  1. 永远不要直接使用 multiprocessing.Pool
  2. 始终使用 concurrent.futures.ProcessPoolExecutor
  3. 强制指定 mp_context=multiprocessing.get_context('spawn')
  4. 通过 loop.run_in_executor 桥接异步和多进程
  5. 使用 with 语句管理进程池生命周期

这个问题的隐蔽性在于:只有在特定条件下(大文件、CPU 密集型任务)才会触发,而小规模测试往往无法发现。希望这篇文章能帮助你避免同样的坑。