人工智能与企业管理领域的游戏规则改变者是将人工智能代理与广泛使用的通信工具相结合。想象一下,通过一个熟悉的聊天界面,您可以直接通过WhatsApp与您的企业管理或个人助理AI进行实时数据请求、更新和任务自动化交互。
在我们关于创建AI驱动的商业管理器的文章中,我将引导您完成将AI代理连接到WhatsApp的步骤,以增强其功能和覆盖范围。我们的目标是实现一个能够与所有相关数据库表交互,甚至能够自行创建表和所有必要工具的AI助理。作为主要展示,我将专注于商业用例,如跟踪费用、发票等。然而,您可以轻松地应用相同的逻辑来创建,例如,一个跟踪您的任务、项目和想法的个人助理。
在前两篇文章中使用的一些解决方法在当时对于可靠的工具调用是必要的,但由于性能更好的模型(如GPT-4o和GPT-4o-mini)的出现,它们现在已不再需要。我发现,在依赖像LangChain或更具体地说,像LangGraph这样用于深度定制代理工作流程的框架之前,了解如何从零开始构建某些东西是很有用的。
现在,我们必须退一步,首先关注基础设施。我认为在大多数项目中,尤其是在AI软件项目中,在陷入功能蔓延之前,首先创建一个可工作的端到端产品是一个好习惯。
本文范围
按照惯例,让我们首先定义本文的范围:
1、使用MetaAPI将AI代理与WhatsApp集成
设置和配置MetaAPI以实现WhatsApp Business集成。
确保AI代理可以通过WhatsApp发送和接收消息。
2、设置Webhooks并使用Ngrok在本地运行
创建一个FastAPI应用程序来处理来自WhatsApp的webhook事件。
使用ngrok将本地FastAPI服务器暴露给互联网。
配置MetaAPI以将webhook事件发送到ngrok URL
由于我们正在向可部署的服务器迈进,因此我们还需要调整项目架构。我们基本上是在实现一个FastAPI服务器,因此,我首选的仓库结构是领域驱动设计(DDD),或者更倾向于DDD。
设置WhatsApp Cloud API
首先,您需要熟悉Meta提供的Cloud API。您可以使用像Twilio这样的SaaS产品来实现相同的结果,这些产品提供了更用户友好的集成。然而,由于最近的数据泄露事件以及出于成本效益的考虑,我更喜欢使用Meta提供的根API。
先决条件
在开始之前,您需要按照以下步骤注册一个Meta开发者帐户:如何开设Meta开发者帐户。在此过程中,您需要使用电话号码进行验证。请注意,这不会是您最终WhatsApp客户端的电话号码。相反,平台将为您分配一个测试电话号码,以后可以更改为另一个电话号码。
注册后,转到您的仪表板并创建一个应用程序。
此外,您将需要一个与您的真实帐户相关联的Meta商业帐户(MBA),或者您可以创建一个新的帐户以链接到您的MBA。您也可以跳过此步骤,因为在下一步中,系统将自动提示您链接或创建一个MBA。
向您的应用程序添加WhatsApp产品
在您的Meta开发者帐户中创建应用程序后,系统将提示您向其中添加产品。在这里,您要选择WhatsApp并按照设置过程进行操作。如果您还没有这样做,请在此处创建一个Meta商业帐户。完成后,您将拥有一个测试WhatsApp商业帐户和一个测试电话号码。
添加接收者号码
在应用仪表板的左侧菜单中,导航至WhatsApp > API设置,在下面的发送和接收消息部分中,选择“至”字段并选择“管理电话号码列表”。在这里,您可以添加一个允许从您的测试电话号码发送和接收消息的电话号码。这理想上应该是您自己的电话号码,因为您想要测试您的应用程序。在将此WhatsApp API帐户链接到真实号码之前,您最多只能添加5个接收者号码。
在WhatsApp > API设置中,您现在可以通过在“从”字段中填写您的测试电话号码,在“至”字段中填写您的接收者号码(您自己的电话号码)来发送测试消息。
生成访问令牌。这是您的WHATSAPP_API_TOKEN,我们将在后面需要它。
我们已经按照要求成功设置了Cloud API。下一步是创建一个Webhook,以便与我们的AI助手应用程序进行通信。
为了实现这一点,我们需要在后端应用程序中创建并提供一个端点。这意味着我们的Python后端必须通过一个URL进行访问。这个URL将作为Webhook端点,AI助手可以通过它发送和接收数据。
创建FastAPI端点
为了被Webhook接受,我们的根端点必须验证Webhook在添加我们的URL时发送的特定GET请求。Webhook将发送三个查询参数:
hub.mode、hub.challenge、hub.verify.token。
验证令牌是在Cloud API中创建Webhook时定义的。您的后端应该验证这个令牌是否与您定义的一致,并返回hub.challenge对象作为响应。请确保首先使用pip install fastapi uvicorn安装FastAPI和Uvicorn。
创建main.py文件
创建一个名为main.py的文件,内容如下:
from fastapi import FastAPI, Query, HTTPException
VERIFICATION_TOKEN = "abcdefg12345"
app = FastAPI()
@app.get("/")
def verify_whatsapp(
hub_mode: str = Query("subscribe", description="The mode of the webhook", alias="hub.mode"),
hub_challenge: int = Query(..., description="The challenge to verify the webhook", alias="hub.challenge"),
hub_verify_token: str = Query(..., description="The verification token", alias="hub.verify_token"),
):
if hub_mode == "subscribe" and hub_verify_token == VERIFICATION_TOKEN:
return hub_challenge
raise HTTPException(status_code=403, detail="Invalid verification token")
@app.get("/health")
def health():
return {"status": "healthy"}
@app.get("/readiness")
def readiness():
return {"status": "ready"}
在第三行中,您可以定义一个VERIFICATION_TOKEN,该令牌稍后将由Webhook用于验证后端是否由您控制。在这个例子中,我们将其定义为"abcdefg12345",但您可以自定义自己的令牌。
运行应用程序
使用Uvicorn运行应用程序:
uvicorn main:app --reload
本地提供API服务
您的后端现在本地运行在http://localhost:8000和/或http://127.0.0.1:8000上。
我们现在提供以下端点:
您可以使用健康检查端点来检查您的应用程序是否正在运行。在浏览器中打开http://127.0.0.1:8000/health,您应该能看到:{"status": "healthy"}
使用Ngrok运行代理服务器
由于我们的服务器是本地运行的,WhatsApp Webhook无法调用端点进行验证。我们需要一个可以被Webhook使用的公共URL。有两种选择:将应用程序部署到云服务器或创建一个代理服务器隧道。由于我们仍处于开发过程中,我们将选择第二种方案。
> ngrok config add-authtoken $YOUR-AUTHENTICATION_TOKEN
> ngrok http http://localhost:8000
Forwarding https://<random-string>.ngrok.io -> http://localhost:8000
通过Ngrok提供的公共URL,您现在可以访问您的本地服务器。您应该会看到类似这样的内容:
Forwarding https://<random-string>.ngrok.io -> http://localhost:8000
使用Ngrok提供的HTTPS URL来配置Webhook。
实现Webhook
现在,让我们回到Meta的Cloud API来实现所需的Webhook。
您现在应该能够在您的Python后端服务器上接收WhatsApp消息了。
理解Webhook和对话
Webhook是HTTP回调,当发生某些事件(如新消息或状态更改)时,它使程序能够接收实时更新。Webhook通过向预先配置的URL(在我们的情况下是Ngrok代理服务器URL)发送包含事件数据的HTTP请求,使系统集成和自动化成为可能。
在WhatsApp API上,“对话”在以下情况下开始:
用户发送消息:这会打开一个24小时窗口,在此期间,您可以回复包括文本、图像或其他媒体的消息,而无需额外费用。
企业主动联系:如果最近没有收到用户消息(没有打开的24小时窗口),您的AI助手必须使用预先批准的模板消息来开始对话。您可以添加自定义模板,但它们需要得到Meta的批准。
只要用户继续回复,24小时窗口就会随着每条新消息而重置。这使得无需额外费用即可进行连续交互成为可能。
构建接收消息端点
现在,我们能够在后端接收消息。由于我们订阅了消息对象,因此每次向您的测试号码发送消息时,Webhook都会向您在上一步中定义的回调URL创建一个POST请求。接下来,我们需要在FastAPI应用程序中为POST请求构建一个端点。
首先,让我们定义要求:
定义模型和架构
我们将从Webhook接收一个负载。
我更喜欢使用Pydantic编写代码,以为我的Python代码添加类型安全性。此外,类型注释和Pydantic是FastAPI应用程序的最佳匹配。因此,让我们首先定义在端点中使用的模型:
# app/schema.py
from typing import List, Optional
from pydantic import BaseModel, Field
class Profile(BaseModel):
name: str
class Contact(BaseModel):
profile: Profile
wa_id: str
class Text(BaseModel):
body: str
class Image(BaseModel):
mime_type: str
sha256: str
id: str
class Audio(BaseModel):
mime_type: str
sha256: str
id: str
voice: bool
class Message(BaseModel):
from_: str = Field(..., alias="from")
id: str
timestamp: str
text: Text | None = None
image: Image | None = None
audio: Audio | None = None
type: str
class Metadata(BaseModel):
display_phone_number: str
phone_number_id: str
class Value(BaseModel):
messaging_product: str
metadata: Metadata
contacts: List[Contact] | None = None
messages: List[Message] | None = None
class Change(BaseModel):
value: Value
field: str
statuses: List[dict] | None = None
class Entry(BaseModel):
id: str
changes: List[Change]
class Payload(BaseModel):
object: str
entry: List[Entry]
class User(BaseModel):
id: int
first_name: str
last_name: str
phone: str
role: str
class UserMessage(BaseModel):
user: User
message: str | None = None
image: Image | None = None
audio: Audio | None = None
解析传入消息
接下来,我们将为在FastAPI中使用依赖注入创建一些辅助函数:
# app/main.py
from app.domain import message_service
def parse_message(payload: Payload) -> Message | None:
if not payload.entry[0].changes[0].value.messages:
return None
return payload.entry[0].changes[0].value.messages[0]
def get_current_user(message: Annotated[Message, Depends(parse_message)]) -> User | None:
if not message:
return None
return message_service.authenticate_user_by_phone_number(message.from_)
def parse_audio_file(message: Annotated[Message, Depends(parse_message)]) -> Audio | None:
if message and message.type == "audio":
return message.audio
return None
def parse_image_file(message: Annotated[Message, Depends(parse_message)]) -> Image | None:
if message and message.type == "image":
return message.image
return None
def message_extractor(
message: Annotated[Message, Depends(parse_message)],
audio: Annotated[Audio, Depends(parse_audio_file)],
):
if audio:
return message_service.transcribe_audio(audio)
if message and message.text:
return message.text.body
return None
此处我们从域层导入了一个模块。整个 message_service 脚本是我们放置此实现中所有特定于域的代码的地方,例如 authenticate_user_by_phone_number 和 transcribe_audio。
实现 POST 端点
# app/main.py
import threading
from typing_extensions import Annotated
from fastapi import APIRouter, Query, HTTPException, Depends
from app.domain import message_service
from app.schema import Payload, Message, Audio, Image, User
# ... rest of the code ...
@app.post("/", status_code=200)
def receive_whatsapp(
user: Annotated[User, Depends(get_current_user)],
user_message: Annotated[str, Depends(message_extractor)],
image: Annotated[Image, Depends(parse_image_file)],
):
if not user and not user_message and not image:
return {"status": "ok"}
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
if image:
return print("Image received")
if user_message:
thread = threading.Thread(
target=message_service.respond_and_send_message,
args=(user_message, user)
)
thread.daemon = True
thread.start()
return {"status": "ok"}
POST 端点实现:这个端点处理传入的 POST 请求。它会检查用户、消息或图像是否有效。如果都不有效,它会简单地向 CloudAPI 返回一个状态消息。如果用户未通过身份验证,它会引发一个带有 401 状态码的 HTTPException。
处理图像和消息:如果接收到图像,我们会简单地使用 stdout 打印一条消息作为未来图像处理功能的占位符。如果接收到文本消息,则会使用单独的线程异步处理该消息,以避免阻塞主应用程序线程。会调用 message_service.respond_and_send_message 函数来根据 LLM-Agent 工作流处理消息。
使用线程池处理 Webhook 的解释:WhatsApp 会在收到 200 响应之前不断重新发送 webhook,因此使用线程池可以确保消息处理不会阻塞 webhook 响应。
消息服务
在我们之前定义端点的表示层中,我们使用了一些 message_service 函数,接下来需要定义这些函数。具体来说,我们需要实现处理音频负载、转录音频、验证用户身份,以及最终调用我们的代理并发送响应的功能。我们将所有这些功能放置在 domain/message_service.py 中。在生产环境中,随着应用程序的扩展,我建议将它们进一步拆分为例如 transcription_service.py、message_service.py 和 authentication_service.py。
在本节的多个函数中,我们将向 Meta API(如 "https://graph.facebook.com/...")发送请求。在这些请求中,我们都需要包含带有我们在步骤 1.3 中创建的 WHATSAPP_API_KEY 的授权头,作为 Bearer 令牌。我通常将 API 密钥和令牌存储在 .env 文件中,并使用 Python 的 dotenv 库来访问它们。我们还会使用带有您的 OPENAI_API_KEY 的 OpenAI 客户端,该密钥也可以存储在 .env 文件中。
但为了简化,让我们只是在 message_service.py 脚本的顶部放置并初始化它们,如下所示:
import os
import json
import requests
from typing import BinaryIO
WHATSAPP_API_KEY = "YOUR_ACCESS_TOKEN"
llm = OpenAI(api_key="YOUR_OPENAI_API_KEY")
将“YOUR_ACCESS_TOKEN”替换为您在步骤1.3中实际创建的访问令牌。
处理和转录音频文件
处理来自WhatsApp webhook的语音记录并不像看起来那么简单。首先,重要的是要知道传入的webhook只告诉我们数据类型和一个对象ID。因此,它不包含二进制音频文件。我们必须首先使用Meta的Graph API下载音频文件。为了下载我们接收到的音频,我们需要进行两个顺序请求。第一个是带有object_id的GET请求,用于获取下载URL。这个下载URL是我们第二个GET请求的目标。
def download_file_from_facebook(file_id: str, file_type: str, mime_type: str) -> str | None:
# First GET request to retrieve the download URL
url = f"https://graph.facebook.com/v19.0/{file_id}"
headers = {"Authorization": f"Bearer {WHATSAPP_API_KEY}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
download_url = response.json().get('url')
# Second GET request to download the file
response = requests.get(download_url, headers=headers)
if response.status_code == 200:
# Extract file extension from mime_type
file_extension = mime_type.split('/')[-1].split(';')[0]
# Create file_path with extension
file_path = f"{file_id}.{file_extension}"
with open(file_path, 'wb') as file:
file.write(response.content)
if file_type == "image" or file_type == "audio":
return file_path
raise ValueError(f"Failed to download file. Status code: {response.status_code}")
raise ValueError(f"Failed to retrieve download URL. Status code: {response.status_code}")
在这里,我们基本上是通过对象ID和文件扩展名作为文件路径来获取下载URL,并将文件下载到文件系统中。如果某个步骤出错,我们会抛出一个ValueError,指出错误发生的位置。
接下来,我们只需定义一个函数,该函数接收音频二进制数据并使用Whisper进行转录:
def transcribe_audio_file(audio_file: BinaryIO) -> str:
if not audio_file:
return "No audio file provided"
try:
transcription = llm.audio.transcriptions.create(
file=audio_file,
model="whisper-1",
response_format="text"
)
return transcription
except Exception as e:
raise ValueError("Error transcribing audio") from e
最后,让我们将下载和转录功能结合起来:
def transcribe_audio(audio: Audio) -> str:
file_path = download_file_from_facebook(audio.id, "audio", audio.mime_type)
with open(file_path, 'rb') as audio_binary:
transcription = transcribe_audio_file(audio_binary)
try:
os.remove(file_path)
except Exception as e:
print(f"Failed to delete file: {e}")
return transcription
用户认证
在使用Meta提供的测试号码时,我们必须预先定义我们的聊天机器人可以向哪些号码发送消息。我不太确定,也没有测试过是否任何号码都可以向我们的聊天机器人发送消息。但无论如何,一旦我们切换到自定义号码,我们就不希望任何人都能执行我们的代理聊天机器人。因此,我们需要一种方法来验证用户身份。我们有几种方法可以做到这一点。首先,我们必须考虑在哪里存储用户信息。例如,我们可以使用像PostgreSQL这样的关系型数据库,或者使用像Firestore这样的非关系型数据库。我们也可以在文件系统中预定义用户,比如在JSON文件或.env文件中。在本文中,我将采用最简单的方式,在我们的认证函数中硬编码用户列表。
列表项具有步骤5.1中定义的User模型的结构。因此,一个用户由ID、名字、姓氏和电话号码组成。在我们的代理工作流中,我们还没有实现角色系统。但在大多数有不同用户的使用场景中,比如在小企业助理的示例案例中,不同的用户将拥有不同的权限和访问范围。现在,我们只是将“default”作为占位符角色传递。
def authenticate_user_by_phone_number(phone_number: str) -> User | None:
allowed_users = [
{"id": 1, "phone": "+1234567890", "first_name": "John", "last_name": "Doe", "role": "default"},
{"id": 2, "phone": "+0987654321", "first_name": "Jane", "last_name": "Smith", "role": "default"}
]
for user in allowed_users:
if user["phone"] == phone_number:
return User(**user)
return None
所以,只需验证电话号码是否在我们的allowed_users列表中,如果在,就返回该用户。否则,我们返回None。如果你查看步骤5.3中的端点,你会看到如果用户为None,我们会抛出一个错误,以防止进一步处理未经授权的用户消息。
发送消息
现在,在我们实际调用代理之前,最后一个辅助函数是send_whatsapp_message。由于Meta特定的WhatsApp API逻辑,我在这个函数中包含了两种模式。
基本上,你不被允许向用户发送自定义消息作为对话的开始。这意味着如果用户先开始对话并向聊天机器人发送消息,你可以回复一条个性化的文本消息。否则,如果你想让聊天机器人发起对话,你只能使用经批准的模板,比如“Hello World”模板。
同样重要的是,当我们谈论Meta的逻辑时,对话开始后,会打开一个24小时的对话窗口,在这个窗口内你可以向该用户发送消息。这个对话窗口也是被计费的,而不是单独的消息。根据对话的类型(如营销、支持等),情况会变得更复杂一些。
你也可以自己定义一个模板并让Meta批准。我目前还没有这样做,所以为了测试我们是否可以从后端向用户发送消息,我使用了“Hello World”模板。如果你添加了一些自定义的经批准的模板,你也可以使用这个函数将它们发送给用户。
回到代码上。为了发送消息,我们发起一个POST请求,并定义一个包含文本正文或模板的负载:
def send_whatsapp_message(to, message, template=True):
url = f"https://graph.facebook.com/v18.0/289534840903017/messages"
headers = {
"Authorization": f"Bearer " + WHATSAPP_API_KEY,
"Content-Type": "application/json"
}
if not template:
data = {
"messaging_product": "whatsapp",
"preview_url": False,
"recipient_type": "individual",
"to": to,
"type": "text",
"text": {
"body": message
}
}
else:
data = {
"messaging_product": "whatsapp",
"to": to,
"type": "template",
"template": {
"name": "hello_world",
"language": {
"code": "en_US"
}
}
}
response = requests.post(url, headers=headers, data=json.dumps(data))
return response.json()
调用我们的代理
最后,我们可以将我们之前示例中的代理集成进来。在这一阶段,你也可以集成你的自定义代理,比如Langchain AgentExecutor、Langgraph AgentWorkflow等。
因此,我们将在每条传入消息上调用的主要函数是respond_and_send_message,它接收user_message字符串并将其作为输入对象传递给我们的代理工作流。
# app/domain/message_service.py
import json
import requests
from app.domain.agents.routing_agent import RoutingAgent
from app.schema import User
def respond_and_send_message(user_message: str, user: User):
agent = RoutingAgent()
response = agent.run(user_message, user.id)
send_whatsapp_message(user.phone, response, template=False)
调用我们的代理后,我们会得到一个响应消息,我们想使用send_whatsapp_message函数将其发送回给用户。
总结
将 WhatsApp 的覆盖范围和可用性与 LLM 相结合,对于企业和个人用例来说都是一个巨大的胜利。无论您是想要个人助理还是成熟的商业工具,本文都会为您提供实现目标的途径。不断改进、不断突破界限 — 这只是您可以构建的开始。
相关文章: