在这篇文章中,我们将探索LangGraph(一个用于协调大型语言模型(LLM)复杂、多步骤工作流程的强大库),并将其应用于一个常见的电子商务问题:根据用户的查询决定是否下单或取消订单。
我们将逐步进行,详细解释每个概念,非常适合初学者以及那些希望使用LLM构建动态或循环工作流程的人,我还包含了数据集的链接供你尝试。
什么是LangGraph?
LangGraph是一个库,它为LangChain工作流程引入了基于图的方法。传统的管道通常从一个步骤线性地移动到另一个步骤,但现实世界中的任务经常需要分支、条件逻辑,甚至循环(重试失败的步骤、澄清用户输入等)。
LangGraph的关键特性:
问题陈述:订单管理
在这个场景中,用户的查询可以是关于下新订单或取消现有订单:
因为我们需要分支(决定是“下订单”还是“取消订单”),我们将使用LangGraph创建条件流:
导入说明
以下是你提供的代码的确切第一部分,展示了导入和环境设置。我们在代码后添加了注释来解释每个部分。
# Import required libraries
import os
import pandas as pd
import random
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.runnables.graph import MermaidDrawMethod
from IPython.display import display, Image
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from typing import Dict, TypedDict
# Load environment variables
os.environ["OPENAI_API_KEY"] = "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"
langchain_core.tools、langchain_openai、ToolNode等:
数据加载和状态定义
数据链接:数据
在下一段代码中,我们加载CSV文件(用于库存和客户)并将它们转换为字典。我们还定义了我们的类型化字典State。
# Load datasets
inventory_df = pd.read_csv("inventory.csv")
customers_df = pd.read_csv("customers.csv")
# Convert datasets to dictionaries
inventory = inventory_df.set_index("item_id").T.to_dict()
customers = customers_df.set_index("customer_id").T.to_dict()
class State(TypedDict):
query: str
category: str
next_node: str
item_id: str
order_status: str
cost: str
payment_status: str
location: str
quantity: int
CSV转字典:
库存和客户是通过item_id或customer_id作为键的字典。这使得像inventory[item_51]这样的查找变得很容易。
状态(State):
创建工具和LLM集成
现在我们定义我们的LLM和工具。这里的主要工具是cancel_order,它使用LLM从查询中提取order_id。
@tool
def cancel_order(query: str) -> dict:
"""Simulate order cancelling"""
order_id = llm.with_structured_output(method='json_mode').invoke(f'Extract order_id from the following text in json format: {query}')['order_id']
#amount = query.get("amount")
if not order_id:
return {"error": "Missing 'order_id'."}
return {"order_status": "Order stands cancelled"}
# Initialize LLM and bind tools
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tools_2 = [cancel_order]
llm_with_tools_2 = llm.bind_tools(tools_2)
tool_node_2 = ToolNode(tools_2)
@tool:
现在,如果LLM决定需要取消订单,cancel_order函数就是一个它可以调用的工具。
提取order_id:
我们调用llm.with_structured_output(method='json_mode')来指示LLM返回JSON。然后,我们解析出'order_id'。
LLM初始化:
选择的模型是model="gpt-4o-mini",温度参数temperature=0以实现确定性响应。
绑定和ToolNode:
定义工作流程节点
现在我们将开始逐个定义节点。
模型调用节点
这些节点可以调用模型。
def call_model_2(state: MessagesState):
"""Use the LLM to decide the next step."""
messages = state["messages"]
response = llm_with_tools_2.invoke(str(messages))
return {"messages": [response]}
def call_tools_2(state: MessagesState) -> Literal["tools_2", END]:
"""Route workflow based on tool calls."""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tools_2"
return END
查询分类
在这里,我们定义了一个用于分类查询的节点:
def categorize_query(state: MessagesState) -> MessagesState:
"""Categorize user query into PlaceOrder or CancelOrder"""
prompt = ChatPromptTemplate.from_template(
"Categorize user query into PlaceOrder or CancelOrder"
"Respond with either 'PlaceOrder', 'CancelOrder' Query: {state}"
)
chain = prompt | ChatOpenAI(temperature=0)
category = chain.invoke({"state": state}).content
return {"query":state,"category": category}
这个节点使用LLM来对用户的查询进行分类。返回值会在状态中设置“category”。
检查库存
def check_inventory(state: MessagesState) -> MessagesState:
"""Check if the requested item is in stock."""
item_id = llm.with_structured_output(method='json_mode').invoke(f'Extract item_id from the following text in json format: {state}')['item_id']
quantity = llm.with_structured_output(method='json_mode').invoke(f'Extract quantity from the following text in json format: {state}')['quantity']
if not item_id or not quantity:
return {"error": "Missing 'item_id' or 'quantity'."}
if inventory.get(item_id, {}).get("stock", 0) >= quantity:
print("IN STOCK")
return {"status": "In Stock"}
return {"query":state,"order_status": "Out of Stock"}
计算运费
我们已经定义了一个节点,用于计算特定客户的运费。
def compute_shipping(state: MessagesState) -> MessagesState:
"""Calculate shipping costs."""
item_id = llm.with_structured_output(method='json_mode').invoke(f'Extract item_id from the following text in json format: {state}')['item_id']
quantity = llm.with_structured_output(method='json_mode').invoke(f'Extract quantity from the following text in json format: {state}')['quantity']
customer_id = llm.with_structured_output(method='json_mode').invoke(f'Extract customer_id from the following text in json format: {state}')['customer_id']
location = customers[customer_id]['location']
if not item_id or not quantity or not location:
return {"error": "Missing 'item_id', 'quantity', or 'location'."}
weight_per_item = inventory[item_id]["weight"]
total_weight = weight_per_item * quantity
rates = {"local": 5, "domestic": 10, "international": 20}
cost = total_weight * rates.get(location, 10)
print(cost,location)
return {"query":state,"cost": f"${cost:.2f}"}
处理支付
我们将定义一个用于处理支付的节点:
def process_payment(state: State) -> State:
"""Simulate payment processing."""
cost = llm.with_structured_output(method='json_mode').invoke(f'Extract cost from the following text in json format: {state}')
if not cost:
return {"error": "Missing 'amount'."}
print(f"PAYMENT PROCESSED: {cost} and order successfully placed!")
payment_outcome = random.choice(["Success", "Failed"])
return {"payment_status": payment_outcome}
路由函数
现在,我们定义一个用于路由查询的节点:
def route_query_1(state: State) -> str:
"""Route the query based on its category."""
print(state)
if state["category"] == "PlaceOrder":
return "PlaceOrder"
elif state["category"] == "CancelOrder":
return "CancelOrder"
决定接下来走哪条路径:“PlaceOrder”(下单)或“CancelOrder”(取消订单)。在LangGraph中,我们将“PlaceOrder”映射到CheckInventory节点,将“CancelOrder”映射到CancelOrder节点。
构建工作流程图
下面,我们创建一个StateGraph,添加节点,并定义边和条件边。
# Create the workflow
workflow = StateGraph(MessagesState)
#Add nodes
workflow.add_node("RouteQuery", categorize_query)
workflow.add_node("CheckInventory", check_inventory)
workflow.add_node("ComputeShipping", compute_shipping)
workflow.add_node("ProcessPayment", process_payment)
workflow.add_conditional_edges(
"RouteQuery",
route_query_1,
{
"PlaceOrder": "CheckInventory",
"CancelOrder": "CancelOrder"
}
)
workflow.add_node("CancelOrder", call_model_2)
workflow.add_node("tools_2", tool_node_2)
# Define edges
workflow.add_edge(START, "RouteQuery")
workflow.add_edge("CheckInventory", "ComputeShipping")
workflow.add_edge("ComputeShipping", "ProcessPayment")
workflow.add_conditional_edges("CancelOrder", call_tools_2)
workflow.add_edge("tools_2", "CancelOrder")
workflow.add_edge("ProcessPayment", END)
StateGraph(MessagesState):
我们指定MessagesState来存储对话数据。
节点:
条件边:
调用workflow.add_conditional_edges("RouteQuery", route_query_1, ...)确保如果是“PlaceOrder”则移动到CheckInventory,如果是“CancelOrder”则移动到CancelOrder。
循环:
当用户选择“CancelOrder”时,我们检查LLM是否触发了工具调用(call_tools_2)。如果触发了,我们转到tools_2(ToolNode);工具调用后,它返回到“CancelOrder”,给LLM机会产生进一步的操作或结束。
结束:
“ProcessPayment”通向END,结束“PlaceOrder”路径。
可视化和测试工作流程
下一段代码将工作流程编译成一个代理,将其呈现为Mermaid图,并使用示例查询进行测试。
# Compile the workflow
agent = workflow.compile()
# Visualize the workflow
mermaid_graph = agent.get_graph()
mermaid_png = mermaid_graph.draw_mermaid_png(draw_method=MermaidDrawMethod.API)
display(Image(mermaid_png))
# Query the workflow
user_query = "I wish to cancel order_id 223"
for chunk in agent.stream(
{"messages": [("user", user_query)]},
stream_mode="values",
):
chunk["messages"][-1].pretty_print()
auser_query = "customer_id: customer_14 : I wish to place order for item_51 with order quantity as 4 and domestic"
for chunk in agent.stream(
{"messages": [("user", auser_query)]},
stream_mode="values",
):
chunk["messages"][-1].pretty_print()
编译:
agent = workflow.compile() 将我们的节点/边定义转换为一个可执行的代理。
可视化:
我们获得一个Mermaid图(mermaid_png),可以在Jupyter Notebook中显示,用于调试或演示。
测试查询:
第一个测试:“我希望取消订单号223”应该路由到CancelOrder。
第二个测试:“客户ID:customer_14:我希望订购商品_51…”应该路由到下单工作流程。
结论
通过利用LangGraph,我们构建了一个基于用户意图来下单或取消订单的动态、分支工作流程。我们展示了:
这种方法具有可扩展性:你可以添加更多步骤(例如,地址验证、促销代码)或额外分支(例如,更新现有订单),而无需重写单体脚本。如果你需要循环来重试失败的支付或验证用户确认,LangGraph也能处理。