【教程】使用LangGraph AI代理构建动态订单管理系统

2025年01月21日 由 alex 发表 4762 0

在这篇文章中,我们将探索LangGraph(一个用于协调大型语言模型(LLM)复杂、多步骤工作流程的强大库),并将其应用于一个常见的电子商务问题:根据用户的查询决定是否下单或取消订单。


我们将逐步进行,详细解释每个概念,非常适合初学者以及那些希望使用LLM构建动态或循环工作流程的人,我还包含了数据集的链接供你尝试。


什么是LangGraph?

LangGraph是一个库,它为LangChain工作流程引入了基于图的方法。传统的管道通常从一个步骤线性地移动到另一个步骤,但现实世界中的任务经常需要分支、条件逻辑,甚至循环(重试失败的步骤、澄清用户输入等)。


LangGraph的关键特性:

  • 节点:单个任务或函数(例如,检查库存、计算运费)。
  • 边:定义节点之间的数据流和控制流。可以是条件性的。
  • 共享状态:每个节点可以返回数据,更新全局状态对象,避免手动传递数据。
  • 工具集成:轻松集成大型语言模型(LLM)可以调用的外部工具或函数。
  • 人工参与(可选):插入需要人工审核的节点。


问题陈述:订单管理

在这个场景中,用户的查询可以是关于下新订单或取消现有订单:

  • 下订单(PlaceOrder):检查商品可用性、计算运费并模拟支付。
  • 取消订单(CancelOrder):提取订单ID并将订单标记为已取消。


因为我们需要分支(决定是“下订单”还是“取消订单”),我们将使用LangGraph创建条件流:

  • 对查询进行分类。
  • 如果是下订单,则转到检查库存、运费和支付。
  • 如果是取消订单,则解析出订单ID并调用取消工具。


导入说明

以下是你提供的代码的确切第一部分,展示了导入和环境设置。我们在代码后添加了注释来解释每个部分。


# Import required libraries
import os
import pandas as pd
import random
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.runnables.graph import MermaidDrawMethod
from IPython.display import display, Image
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from typing import Dict, TypedDict
# Load environment variables
os.environ["OPENAI_API_KEY"] = "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"


langchain_core.tools、langchain_openai、ToolNode等:

  • tool(装饰器)将 Python 函数转换为 LLM 可以调用的“工具”。
  • ChatOpenAI是我们的 LLM 客户,与 GPT 模型进行对话。
  • ToolNodelanggraph.prebuilt是一个处理工具执行的预构建节点。
  • StateGraph,,,来自——MessagesState它们对于定义我们的工作流程至关重要。STARTENDlanggraph.graph
  • MermaidDrawMethod帮助将工作流程可视化为Mermaid.js图表​​。


数据加载和状态定义

数据链接:数据

在下一段代码中,我们加载CSV文件(用于库存和客户)并将它们转换为字典。我们还定义了我们的类型化字典State。


# Load datasets
inventory_df = pd.read_csv("inventory.csv")
customers_df = pd.read_csv("customers.csv")
# Convert datasets to dictionaries
inventory = inventory_df.set_index("item_id").T.to_dict()
customers = customers_df.set_index("customer_id").T.to_dict()


class State(TypedDict):
    query: str
    category: str
    next_node: str
    item_id: str
    order_status: str
    cost: str
    payment_status: str
    location: str
    quantity: int


CSV转字典:

库存和客户是通过item_id或customer_id作为键的字典。这使得像inventory[item_51]这样的查找变得很容易。


状态(State):

  • 类型化字典,这样我们就知道应该有哪些字段。例如,query(查询)、category(类别)、item_id(商品ID)等。
  • category通常是“PlaceOrder”(下单)或“CancelOrder”(取消订单)。
  • next_node可以存储下一个节点的名称,尽管我们依赖于图的边来进行转换。
  • 这有助于在一个对象中跟踪所有内容——库存检查、支付状态等。


创建工具和LLM集成

现在我们定义我们的LLM和工具。这里的主要工具是cancel_order,它使用LLM从查询中提取order_id。


@tool
def cancel_order(query: str) -> dict:
    """Simulate order cancelling"""
    order_id = llm.with_structured_output(method='json_mode').invoke(f'Extract order_id from the following text in json format: {query}')['order_id']
    #amount = query.get("amount")
    if not order_id:
        return {"error": "Missing 'order_id'."}
    return {"order_status": "Order stands cancelled"}


# Initialize LLM and bind tools
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tools_2 = [cancel_order]
llm_with_tools_2 = llm.bind_tools(tools_2)
tool_node_2 = ToolNode(tools_2)


@tool:

现在,如果LLM决定需要取消订单,cancel_order函数就是一个它可以调用的工具。


提取order_id:

我们调用llm.with_structured_output(method='json_mode')来指示LLM返回JSON。然后,我们解析出'order_id'。


LLM初始化:

选择的模型是model="gpt-4o-mini",温度参数temperature=0以实现确定性响应。


绑定和ToolNode:

  • llm.bind_tools(tools_2)将我们的LLM与cancel_order工具连接起来。
  • ToolNode是一个专门化的节点,可以自动处理这些绑定的工具。


定义工作流程节点

现在我们将开始逐个定义节点。


模型调用节点


这些节点可以调用模型。


def call_model_2(state: MessagesState):
    """Use the LLM to decide the next step."""
    messages = state["messages"]
    response = llm_with_tools_2.invoke(str(messages))
    return {"messages": [response]}

def call_tools_2(state: MessagesState) -> Literal["tools_2", END]:
    """Route workflow based on tool calls."""
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
        return "tools_2"
    return END


  • call_model_2:接收对话(消息)并将其传递给绑定了工具的LLM。如果LLM触发了工具调用,我们将在call_tools_2中检测到这一点。
  • call_tools_2:检查LLM是否请求了工具调用(tool_calls)。如果是,我们将路由到“tools_2”,即ToolNode;否则,我们结束工作流程。


查询分类

在这里,我们定义了一个用于分类查询的节点:


def categorize_query(state: MessagesState) -> MessagesState:
    """Categorize user query into PlaceOrder or CancelOrder"""
    prompt = ChatPromptTemplate.from_template(
        "Categorize user query into PlaceOrder or CancelOrder"
        "Respond with either 'PlaceOrder', 'CancelOrder' Query: {state}"
    )
    chain = prompt | ChatOpenAI(temperature=0)
    category = chain.invoke({"state": state}).content
    
    return {"query":state,"category": category}


这个节点使用LLM来对用户的查询进行分类。返回值会在状态中设置“category”。


检查库存


def check_inventory(state: MessagesState) -> MessagesState:
    """Check if the requested item is in stock."""
    item_id = llm.with_structured_output(method='json_mode').invoke(f'Extract item_id from the following text in json format: {state}')['item_id']
    quantity = llm.with_structured_output(method='json_mode').invoke(f'Extract quantity from the following text in json format: {state}')['quantity']
    if not item_id or not quantity:
        return {"error": "Missing 'item_id' or 'quantity'."}
    if inventory.get(item_id, {}).get("stock", 0) >= quantity:
        print("IN STOCK")
        return {"status": "In Stock"}
    return {"query":state,"order_status": "Out of Stock"}


  • 尝试从对话中解析出item_id(商品ID)和quantity(数量)。
  • 检查inventory[item_id]["stock"]以确认库存可用性。


计算运费

我们已经定义了一个节点,用于计算特定客户的运费。


def compute_shipping(state: MessagesState) -> MessagesState:
    """Calculate shipping costs."""
    item_id = llm.with_structured_output(method='json_mode').invoke(f'Extract item_id from the following text in json format: {state}')['item_id']
    quantity = llm.with_structured_output(method='json_mode').invoke(f'Extract quantity from the following text in json format: {state}')['quantity']
    customer_id = llm.with_structured_output(method='json_mode').invoke(f'Extract customer_id from the following text in json format: {state}')['customer_id']
    location = customers[customer_id]['location']

    if not item_id or not quantity or not location:
        return {"error": "Missing 'item_id', 'quantity', or 'location'."}
    weight_per_item = inventory[item_id]["weight"]
    total_weight = weight_per_item * quantity
    rates = {"local": 5, "domestic": 10, "international": 20}
    cost = total_weight * rates.get(location, 10)
    print(cost,location)
    return {"query":state,"cost": f"${cost:.2f}"}


  • 从用户的查询中检索customer_id(客户ID),然后在customers字典中查找他们的位置。
  • 根据商品的重量、数量以及用户的位置计算运费。


处理支付

我们将定义一个用于处理支付的节点:


def process_payment(state: State) -> State:
    """Simulate payment processing."""
    cost = llm.with_structured_output(method='json_mode').invoke(f'Extract cost from the following text in json format: {state}')
    if not cost:
        return {"error": "Missing 'amount'."}
    print(f"PAYMENT PROCESSED: {cost} and order successfully placed!")
    payment_outcome = random.choice(["Success", "Failed"])
    return {"payment_status": payment_outcome}


  • 使用random.choice来模拟成功或失败。
  • 在生产系统中,你会集成一个真实的支付网关。


路由函数

现在,我们定义一个用于路由查询的节点:


def route_query_1(state: State) -> str:
    """Route the query based on its category."""
    print(state)
    if state["category"] == "PlaceOrder":
        return "PlaceOrder"
    elif state["category"] == "CancelOrder":
        return "CancelOrder"


决定接下来走哪条路径:“PlaceOrder”(下单)或“CancelOrder”(取消订单)。在LangGraph中,我们将“PlaceOrder”映射到CheckInventory节点,将“CancelOrder”映射到CancelOrder节点。


构建工作流程图

下面,我们创建一个StateGraph,添加节点,并定义边和条件边。


# Create the workflow
workflow = StateGraph(MessagesState)
#Add nodes
workflow.add_node("RouteQuery", categorize_query)
workflow.add_node("CheckInventory", check_inventory)
workflow.add_node("ComputeShipping", compute_shipping)
workflow.add_node("ProcessPayment", process_payment)
workflow.add_conditional_edges(
    "RouteQuery",
    route_query_1,
    {
        "PlaceOrder": "CheckInventory",
        "CancelOrder": "CancelOrder"
    }
)
workflow.add_node("CancelOrder", call_model_2)
workflow.add_node("tools_2", tool_node_2)

# Define edges
workflow.add_edge(START, "RouteQuery")
workflow.add_edge("CheckInventory", "ComputeShipping")
workflow.add_edge("ComputeShipping", "ProcessPayment")
workflow.add_conditional_edges("CancelOrder", call_tools_2)
workflow.add_edge("tools_2", "CancelOrder")
workflow.add_edge("ProcessPayment", END)


StateGraph(MessagesState):

我们指定MessagesState来存储对话数据。


节点:

  • RouteQuery是入口节点,用于分类用户的意图。
  • “CheckInventory”(检查库存)、“ComputeShipping”(计算运费)和“ProcessPayment”(处理支付)处理PlaceOrder流程。
  • “CancelOrder”(取消订单)和“tools_2”处理CancelOrder流程。


条件边:

调用workflow.add_conditional_edges("RouteQuery", route_query_1, ...)确保如果是“PlaceOrder”则移动到CheckInventory,如果是“CancelOrder”则移动到CancelOrder。


循环:

当用户选择“CancelOrder”时,我们检查LLM是否触发了工具调用(call_tools_2)。如果触发了,我们转到tools_2(ToolNode);工具调用后,它返回到“CancelOrder”,给LLM机会产生进一步的操作或结束。


结束:

“ProcessPayment”通向END,结束“PlaceOrder”路径。


可视化和测试工作流程

下一段代码将工作流程编译成一个代理,将其呈现为Mermaid图,并使用示例查询进行测试。


# Compile the workflow
agent = workflow.compile()
# Visualize the workflow
mermaid_graph = agent.get_graph()
mermaid_png = mermaid_graph.draw_mermaid_png(draw_method=MermaidDrawMethod.API)
display(Image(mermaid_png))
# Query the workflow
user_query = "I wish to cancel order_id 223"
for chunk in agent.stream(
    {"messages": [("user", user_query)]},
    stream_mode="values",
):
    chunk["messages"][-1].pretty_print() 
auser_query = "customer_id: customer_14 : I wish to place order for item_51 with order quantity as 4 and domestic"
for chunk in agent.stream(
    {"messages": [("user", auser_query)]},
    stream_mode="values",
):
    chunk["messages"][-1].pretty_print()


编译:

agent = workflow.compile() 将我们的节点/边定义转换为一个可执行的代理。


可视化:

我们获得一个Mermaid图(mermaid_png),可以在Jupyter Notebook中显示,用于调试或演示。


13


测试查询:

第一个测试:“我希望取消订单号223”应该路由到CancelOrder。


14


第二个测试:“客户ID:customer_14:我希望订购商品_51…”应该路由到下单工作流程。


15


结论

通过利用LangGraph,我们构建了一个基于用户意图来下单或取消订单的动态、分支工作流程。我们展示了:

  • 如何使用LLM节点(categorize_query)对查询进行分类。
  • 如何绑定工具(cancel_order)并将其集成到工作流程中。
  • 如何通过单独的节点来检查库存、计算运费和处理支付。
  • 如何使用Mermaid.js可视化整个工作流程。


这种方法具有可扩展性:你可以添加更多步骤(例如,地址验证、促销代码)或额外分支(例如,更新现有订单),而无需重写单体脚本。如果你需要循环来重试失败的支付或验证用户确认,LangGraph也能处理。

文章来源:https://medium.com/ai-advances/langgraph-building-a-dynamic-order-management-system-a-step-by-step-tutorial-0be56854fc91
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消