如何使用FastAPI部署运行深度学习模型

2025年02月19日 由 alex 发表 1888 0

图形处理单元(GPU)是人工智能工作流的支柱,驱动着诸如Pixel2Mesh和TripoSR等计算密集型深度学习模型。这些模型通常对GPU内存(VRAM)提出很高的要求,这使得在单GPU上同时运行多个实例变得具有挑战性。GPU过载会导致资源争用、崩溃和性能下降,这些问题会损害整个系统的效率和稳定性。


在这篇文章中,我们将探讨如何使用FastAPI和asyncio子进程来管理GPU工作负载,特别是针对单GPU设置并面向深度学习模型。通过将深度学习模型作为独立的子进程运行,我们确保视频随机存取内存(VRAM)的高效使用,并防止过载,从而实现更好的任务编排。


GPU工作负载的理想框架

FastAPI是一个高性能、异步框架,非常适合编排GPU工作负载。它对Python的asyncio的原生支持使其成为管理长时间运行任务(如深度学习推理)同时保持响应性的绝佳选择。其关键优势包括:

  • 异步架构:高效处理多个并发的I/O密集型任务,防止阻塞并避免性能瓶颈。
  • 可扩展的任务编排:允许非阻塞的任务编排,使你能够动态管理GPU工作负载并保持应用程序的响应性。
  • 生态系统集成:无缝支持用于子进程管理的asyncio,并利用现代Python特性,简化开发并提高可维护性。


将深度学习模型作为子进程运行

深度学习模型通常被打包为具有特定依赖项的独立脚本,设计为直接执行并对GPU提出显著需求。直接在FastAPI应用程序中运行这些脚本会带来几个挑战:

  • 依赖冲突:模型通常依赖于独特的包或库版本,当共享相同的Python环境时,这些可能会发生冲突。
  • GPU使用效率低下:在同一进程中运行多个模型会增加VRAM过度分配和资源争用的风险。
  • 复杂的重构:将独立脚本转换为可重用模块需要大量的重构、依赖管理和I/O机制的适应。


将模型作为单独的子进程运行可以通过隔离每个任务来解决这些问题,优化资源使用,确保容错能力,并实现并行执行。


6


为什么选择 asyncio.create_subprocess_exec?

在基于asyncio的应用程序中异步运行子进程时,asyncio.create_subprocess_exec() 是最佳选择。


  • 非阻塞执行:通过允许在后台运行任务的同时并发处理新请求,保持FastAPI的响应性。
  • 精细控制:能够精确管理执行参数,如参数、环境变量和GPU特定设置。
  • 故障隔离:确保一个任务中的故障不会波及系统的其他部分,简化调试和恢复过程。


并发应用的VRAM管理

在单GPU上运行模型时,管理GPU内存至关重要。不同的模型对VRAM的需求各不相同。轻量级模型可能只消耗GPU内存的一小部分,而大型模型则可能需要大量资源才能实现最佳性能。合理分配VRAM,以确保资源得到高效利用,防止过度分配和争用。


通过基于输入大小和批处理等因素来管理VRAM使用,我们可以主动分配内存。这些预测可以根据不同的业务规则和条件进行调整,例如图像分辨率或预处理复杂性。


示例解决方案

在本节中,我们展示了一种使用FastAPI和asyncio库在单GPU上以子进程形式运行深度学习模型的简单概念性方法。请注意,此代码仅用于演示目的。它概述了核心思想,而非提供一个完全可用于生产的解决方案。下面的图表中展示了该解决方案。


7


  1. HTTP请求→请求进入队列,而不是立即执行。
  2. 队列→任务在队列中等待,直到有可用的工作线程。
  3. 信号量限制执行→工作线程在处理任务前必须获取一个槽位,以确保受控的并发。
  4. 工作线程获取任务→一旦槽位空闲,工作线程就会获取下一个请求。
  5. GPU子进程→工作线程生成一个子进程来执行深度学习模型,等待其完成,然后释放其槽位。
  6. 响应→结果异步发送回客户端。


模型设置

在将深度学习模型部署为FastAPI服务之前,必须首先正确设置模型的环境。此过程因深度学习模型而异,通常在模型的仓库文档中概述,包括安装步骤、所需依赖项和GPU配置。


GPU资源初始化

在执行深度学习模型之前,我们必须分析可用的GPU内存,以确定在不超出VRAM限制的情况下,最大并发执行数量。例如,TriploSR每次运行需要6GB,因此启动过多实例存在内存不足的风险。为防止这种情况,我们使用NVIDIA的pynvml在启动时监控实时GPU内存。保留一个VRAM_BUFFER以避免内存完全耗尽,剩余的VRAM动态决定SEMAPHORE_SLOTS,将并发运行数量限制在安全范围内。


import asyncio
import pynvml
from pathlib import Path
QUEUE_MAX_SIZE = 5  
MODEL_EXEC_PATH = Path("/path/to/deep/learning/model/")
MODEL_VRAM_REQUIRED = 2048  
VRAM_BUFFER = 1024  
pynvml.nvmlInit()
GPU_HANDLE = pynvml.nvmlDeviceGetHandleByIndex(0)
MEM_INFO = pynvml.nvmlDeviceGetMemoryInfo(GPU_HANDLE)
AVAILABLE_VRAM_MB = MEM_INFO.free // (1024 ** 2)  
USABLE_VRAM_MB = max(AVAILABLE_VRAM_MB - VRAM_BUFFER, 0)
SEMAPHORE_SLOTS = max(USABLE_VRAM_MB // MODEL_VRAM_REQUIRED, 1)


管理应用程序生命周期

正确的启动和关闭可以防止孤儿进程和资源泄漏。FastAPI的生命周期事件在启动时初始化队列、信号量和工作线程,确保受控执行。在关闭时,它会优雅地停止工作线程并清理资源,防止未处理的任务。工作线程作为异步协程(asyncio.create_task())运行,保持FastAPI的非阻塞和高效性。


import asyncio
from contextlib import asynccontextmanager, suppress
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initializes workers on startup and ensures clean shutdown on exit."""
    if USABLE_VRAM_MB < MODEL_VRAM_REQUIRED:
        raise RuntimeError(
            "❌ Not enough VRAM available: %d MB usable, requires %d MB."
            % (USABLE_VRAM_MB, MODEL_VRAM_REQUIRED)
        )
    
    semaphore = asyncio.Semaphore(SEMAPHORE_SLOTS)
    jobs_queue = asyncio.Queue(maxsize=QUEUE_MAX_SIZE)
    stop_event = asyncio.Event()
    workers = [
        asyncio.create_task(worker_main(i, jobs_queue, semaphore, stop_event))
        for i in range(SEMAPHORE_SLOTS)
    ]
    logger.info(
        "? Server started with %d workers (Queue Size=%d)",
        SEMAPHORE_SLOTS, QUEUE_MAX_SIZE
    )
    app.state.jobs_queue = jobs_queue
    try:
        yield
    finally:
        logger.info("? Shutting down workers...")
        stop_event.set()
        for worker in workers:
            worker.cancel()
        with suppress(asyncio.CancelledError):
            await asyncio.gather(*workers, return_exceptions=True)
        logger.info("✅ All workers stopped.")


实现工作线程系统

为了有效管理VRAM限制,系统实现了一个基于工作线程的系统,其中请求进入一个先进先出(FIFO)队列(asyncio.Queue),并且仅在GPU资源可用时执行。工作线程按顺序处理任务,并且在执行前必须获取一个信号量槽位,以确保运行的模型数量不超过GPU的处理能力。


工作负载是I/O密集型的,因为工作线程启动GPU密集型任务并等待推理结果。asyncio.Queue和asyncio.Semaphore在单个事件循环内实现高效并发,保持FastAPI的响应性和非阻塞性。与增加内存开销和进程间通信成本的多进程不同,asyncio允许轻量级任务调度,以最小的资源使用最大化吞吐量。


import asyncio
async def worker_main(
    worker_id: int,
    jobs_queue: asyncio.Queue,
    semaphore: asyncio.Semaphore,
    stop_event: asyncio.Event
):
    """Processes model execution requests while ensuring VRAM availability."""
    while not stop_event.is_set():
        try:
            command, future = await jobs_queue.get()
            logger.info(
                "⚡️ Worker-%d processing: %s (Queue size=%d)",
                worker_id, command, jobs_queue.qsize()
            )
            async with semaphore:  
                try:
                    result = await execute_subprocess(command, MODEL_EXEC_PATH)
                    future.set_result(result)
                except Exception as e:
                    future.set_exception(e)
                    logger.error(
                        "❌ Worker-%d crashed! Shutting down application...", 
                        worker_id
                    )
                    stop_event.set()
                    break 
        except asyncio.CancelledError:
            break  
        finally:
            jobs_queue.task_done()


异步执行GPU应用程序

深度学习模型的执行可能需要数秒到数分钟的时间,因此如果同步运行,会阻塞FastAPI,使其无法处理新的请求。相反,工作线程将执行卸载到异步子进程中,确保模型执行是并行进行的,不会阻塞API的响应性。


import asyncio
from pathlib import Path
from typing import List

async def execute_subprocess(command: List[str], exec_path: Path):
    """Executes a command in a subprocess and returns its output."""
    process = await asyncio.create_subprocess_exec(
        *command,
        cwd=str(exec_path),
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await process.communicate()
    if process.returncode != 0:
        raise RuntimeError(
            f"Subprocess exited with {process.returncode}: {stderr.decode()}"
        )
    return stdout.decode()


处理API请求

端点将请求排入队列,而不是立即执行,确保任务仅在GPU资源可用时运行。如果队列已满,新的请求将被拒绝并返回HTTP 429状态码,以防止系统过载。每个请求都被分配一个Future对象,作为其结果的占位符。当工作线程完成执行时,Future对象被解析,API异步返回响应。这保证了先进先出的公平性,防止请求丢失,并确保FastAPI即使在重负载下也能保持响应性。


import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
app = FastAPI(lifespan=lifespan)
class CommandRequest(BaseModel):
    command: List[str]
@app.post("/run-script")
async def run_script(req: CommandRequest):
    """Handles script execution requests, ensuring VRAM availability."""
    jobs_queue: asyncio.Queue = app.state.jobs_queue
    if jobs_queue.full():
        raise HTTPException(
            status_code=429,
            detail="Queue is full. Try again later."
        )
    future = asyncio.get_running_loop().create_future()
    await jobs_queue.put((req.command, future))
    logger.info(
        "? Enqueued: %s (Queue size=%d)",
        req.command, jobs_queue.qsize()
    )
    try:
        return {"output": await future}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


最终思考与迈向生产的步骤

此解决方案通过结合VRAM感知执行、FIFO队列、信号量和异步子进程,优化了FastAPI中的单GPU内存管理。它确保了响应性,防止了内存溢出崩溃,并根据GPU可用性动态扩展。与固定工作线程模型不同,它高效平衡负载,强制执行公平性,并仅在资源允许时执行任务,从而最大化吞吐量,为深度学习推理提供了可扩展的基础。


模型特定适配:

  • 命令验证:FastAPI必须验证并限制请求,仅允许模型特定的命令,确保严格遵守每个模型的CLI结构。
  • 存储管理:由于模型以不同的方式存储输出,FastAPI应动态管理文件处理,确保结果根据模型的要求进行保存、组织和访问。
  • 响应处理:FastAPI必须确保输出一致地作为API响应、下载链接或存储引用返回,以适应每个模型的格式。


生产就绪的关键领域:

  • 用户上传图像支持:允许用户通过API上传图像进行自定义处理。
  • 多GPU支持:根据可用VRAM和负载均衡,在多个GPU之间动态分配任务,防止单个GPU成为瓶颈。
  • 错误韧性:实施任务重试、内存溢出处理和GPU故障恢复,以确保在高负载下的稳定性。
  • 状态持久性:存储任务队列和VRAM状态,以便在故障后启用系统恢复,防止任务丢失并提高可靠性。
  • 高级调度:引入基于优先级的调度、资源感知的任务分配和作业抢占,以优化执行。
  • 云扩展:扩展对AWS、GCP或本地集群的支持,以在多个GPU实例之间分布推理任务。
  • 实时监控:集成GPU监控工具,跟踪内存、温度和功耗,主动调整资源分配。


通过解决这些领域的问题,可以将这个初步框架转变为一个健壮、可扩展且生产就绪的解决方案,以最小的停机时间和最大的资源效率处理高需求任务。

文章来源:https://medium.com/runtime-revolution/running-deep-learning-models-as-applications-with-fastapi-ecac57239e64
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消