大多数机器学习服务教程都侧重于实时同步服务,这样可以立即响应预测请求。然而,这种方法在流量激增时会很吃力,对于长期运行的任务也不理想。它还需要更强大的机器才能快速响应,而且如果客户端或服务器出现故障,预测结果通常会丢失。
在这篇文中,我们将演示如何使用 Celery 和 Redis 作为异步 Worker 运行机器学习模型。我们将使用 Florence 2 基础模型,这是一个以性能惊人而著称的视觉语言模型。本教程将提供一个最小但功能强大的示例,你可以根据自己的用例进行调整和扩展。
我们解决方案的核心基于 Celery,它是一个 Python 库,为我们实现了客户端/工作逻辑。它允许我们将计算工作分配给多个 Worker,从而提高 ML 推断用例的可扩展性,以应对高负载和不可预测的负载。
流程如下:
让我们从一个简化的例子开始:
首先,运行 Redis:
docker run -p 6379:6379 redisp 6379:6379 redis
以下是 Worker 代码:
from celery import Celery
# Configure Celery to use Redis as the broker and backend
app = Celery(
"tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0"
)
# Define a simple task
@app.task
def add(x, y):
return x + y
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info"])
以及客户端代码:
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
print(f"{app.control.inspect().active()=}")
task_name = "tasks.add"
add = app.signature(task_name)
print("Gotten Task")
# Send a task to the worker
result = add.delay(4, 6)
print("Waiting for Task")
result.wait()
# Get the result
print(f"Result: {result.result}")
这样就得到了我们期望的结果: “结果:10
现在,让我们进入真正的用例: 服务Florence 2。
我们将构建一个多容器图像字幕应用程序,使用 Redis 进行任务队列,使用 Celery 进行任务分配,并使用本地卷或谷歌云存储进行潜在图像存储。该应用的设计包含几个核心组件:模型推理、任务分配、客户端交互和文件存储。
架构概述:
组件分解
1. 模型推理(model.py):
import os
from io import BytesIO
import requests
from google.cloud import storage
from loguru import logger
from modeling_florence2 import Florence2ForConditionalGeneration
from PIL import Image
from processing_florence2 import Florence2Processor
model = Florence2ForConditionalGeneration.from_pretrained(
"microsoft/Florence-2-base-ft"
)
processor = Florence2Processor.from_pretrained("microsoft/Florence-2-base-ft")
def download_image(url):
if url.startswith("http://") or url.startswith("https://"):
# Handle HTTP/HTTPS URLs
# ... (code to download image from URL) ...
elif url.startswith("gs://"):
# Handle Google Cloud Storage paths
# ... (code to download image from GCS) ...
else:
# Handle local file paths
# ... (code to open image from local path) ...
def run_inference(url, task_prompt):
# ... (code to download image using download_image function) ...
try:
# ... (code to open and process the image) ...
inputs = processor(text=task_prompt, images=image, return_tensors="pt")
except ValueError:
# ... (error handling) ...
# ... (code to generate captions using the model) ...
generated_ids = model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
# ... (model generation parameters) ...
)
# ... (code to decode generated captions) ...
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
# ... (code to post-process generated captions) ...
parsed_answer = processor.post_process_generation(
generated_text, task=task_prompt, image_size=(image.width, image.height)
)
return parsed_answer
协调图像字幕处理过程:
2. 任务分配 (worker.py):
import os
from celery import Celery
# ... other imports ...
# Get Redis URL from environment variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to use Redis as the broker and backend
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
# ... (Celery configurations) ...
@app.task(bind=True, max_retries=3)
def inference_task(self, url, task_prompt):
# ... (logging and error handling) ...
return run_inference(url, task_prompt)
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info", "--pool=solo"])
3. 客户端交互(client.py):
import os
from celery import Celery
# Get Redis URL from environment variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to use Redis as the broker and backend
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
def send_inference_task(url, task_prompt):
task = inference_task.delay(url, task_prompt)
print(f"Task sent with ID: {task.id}")
# Wait for the result
result = task.get(timeout=120)
return result
Docker 集成(docker-compose.yml):
你可以使用:
docker-compose up
就是这样!我们刚刚探索了使用 Celery、Redis 和 Florence 2 构建异步机器学习推理系统的综合指南。本教程演示了如何有效地使用 Celery 进行任务分配,使用 Redis 进行消息中介,以及使用 Florence 2 进行图像标注。通过采用异步工作流,你可以处理大量请求、提高性能并增强 ML 推断应用程序的整体弹性。通过所提供的 Docker Compose 设置,你只需一条命令就能独立运行整个系统。