LangChain 是利用 Lardge 语言模型构建应用程序的领先框架之一。有了 LangChain 表达式语言(LCEL),定义和执行分步行动序列(也称为链)就变得简单多了。用更专业的术语来说,LangChain 允许我们创建 DAG。
随着 LLM 应用程序,尤其是 LLM 代理的发展,我们已经开始使用 LLM,不仅用于执行,还用作推理引擎。这种转变引入了经常涉及重复(循环)和复杂条件的交互。在这种情况下,LCEL 是不够的,因此 LangChain 实现了一个新模块--LangGraph。
LangGraph(顾名思义)将所有交互建模为循环图。通过这些图,可以开发高级工作流和具有多个循环和 if 语句的交互,从而使其成为创建代理和多代理工作流的便捷工具。
在本文中,我将探讨 LangGraph 的主要特性和功能,包括多代理应用。我们将构建一个能回答不同类型问题的系统,并深入探讨如何实现人在环中的设置。
LangGraph 基础知识
LangGraph 是 LangChain 生态系统的一部分,因此我们将继续使用众所周知的概念,如提示模板、工具等。不过,LangGraph 还带来了一些额外的概念。让我们来讨论一下。
创建 LangGraph 是为了定义循环图。图形由以下元素组成:
另一个重要概念是图的状态。状态是图形组件间协作的基础元素。它代表了图的快照,任何部分(无论是节点还是边)都可以在执行过程中访问和修改,以检索或更新信息。
此外,状态在持久性方面起着至关重要的作用。它在每一步后都会自动保存,允许你在任何时候暂停和恢复执行。这一功能支持开发更复杂的应用,例如需要纠错或包含人机交互的应用。
单一代理工作流程
从零开始构建代理
让我们从简单开始,尝试使用 LangGraph 来处理一个基本用例--带有工具的代理。
在这个示例中,让我们创建一个可以根据数据库中的表格自动生成文档的应用程序。在为数据源创建文档时,它可以为我们节省大量时间。
我们首先要为代理定义工具。由于在本例中我将使用 ClickHouse 数据库,因此我定义了一个函数来执行任何查询。如果你愿意,也可以使用其他数据库,因为我们不会依赖于任何特定的数据库功能。
CH_HOST = 'http://localhost:8123' # default address
import requests
def get_clickhouse_data(query, host = CH_HOST, connection_timeout = 1500):
r = requests.post(host, params = {'query': query},
timeout = connection_timeout)
if r.status_code == 200:
return r.text
else:
return 'Database returned the following error:\n' + r.text
让 LLM 工具既可靠又不易出错至关重要。如果数据库返回错误,我会向 LLM 提供反馈,而不是抛出异常并停止执行。这样,LLM 代理就有机会修复错误并再次调用函数。
让我们定义一个名为 execute_sql 的工具,它可以执行任何 SQL 查询。我们使用 pydantic 来指定工具的结构,确保 LLM 代理掌握有效使用工具所需的全部信息。
from langchain_core.tools import tool
from pydantic.v1 import BaseModel, Field
from typing import Optional
class SQLQuery(BaseModel):
query: str = Field(description="SQL query to execute")
@tool(args_schema = SQLQuery)
def execute_sql(query: str) -> str:
"""Returns the result of SQL query execution"""
return get_clickhouse_data(query)
我们可以打印创建工具的参数,查看传递给 LLM 的信息。
print(f'''
name: {execute_sql.name}
description: {execute_sql.description}
arguments: {execute_sql.args}
''')
# name: execute_sql
# description: Returns the result of SQL query execution
# arguments: {'query': {'title': 'Query', 'description':
# 'SQL query to execute', 'type': 'string'}}
我们已经建立了必要的工具,现在可以继续定义 LLM 代理了。如上文所述,LangGraph 中代理的基石是其状态,它可以在图的不同部分之间共享信息。
我们当前的示例相对简单。因此,我们只需要存储消息的历史记录。让我们来定义代理状态。
# useful imports
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
# defining agent state
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]
我们在 AgentState 中定义了一个参数--messages,这是一个 AnyMessage 类对象的列表。此外,我们还使用 operator.add (reducer) 对其进行了注解。该注解确保每次节点返回消息时,都会将其添加到状态中的现有列表中。如果没有这个操作符,每条新消息都会替换之前的值,而不是添加到列表中。
下一步是定义代理本身。让我们从 __init__ 函数开始。我们将为代理指定三个参数:模型、工具列表和系统提示。
class SQLAgent:
# initialising the object
def __init__(self, model, tools, system_prompt = ""):
self.system_prompt = system_prompt
# initialising graph with a state
graph = StateGraph(AgentState)
# adding nodes
graph.add_node("llm", self.call_llm)
graph.add_node("function", self.execute_function)
graph.add_conditional_edges(
"llm",
self.exists_function_calling,
{True: "function", False: END}
)
graph.add_edge("function", "llm")
# setting starting point
graph.set_entry_point("llm")
self.graph = graph.compile()
self.tools = {t.name: t for t in tools}
self.model = model.bind_tools(tools)
在初始化函数中,我们概述了我们的图形结构,其中包括两个节点:LLM 和 Action。节点是实际操作,因此我们有与之相关的函数。我们稍后将定义函数。
此外,我们还有一条条件边,它决定我们是否需要执行函数或生成最终答案。对于这条边,我们需要指定前一个节点(在我们的例子中是 llm)、一个决定下一步的函数,以及基于函数输出(格式化为字典)的后续步骤的映射。如果 exists_function_calling 返回 True,我们将继续执行到函数节点。否则,执行将在特殊的 END 节点结束,该节点标志着流程的结束。
我们在 function 和 llm 之间添加了一条边。它只是连接这两个步骤,执行时不附加任何条件。
定义了主结构后,就可以创建上述所有函数了。第一个函数是 call_llm。该函数将执行 LLM 并返回结果。
代理状态将自动传递给函数,因此我们可以使用保存的系统提示并从中建模。
class SQLAgent:
<...>
def call_llm(self, state: AgentState):
messages = state['messages']
# adding system prompt if it's defined
if self.system_prompt:
messages = [SystemMessage(content=self.system_prompt)] + messages
# calling LLM
message = self.model.invoke(messages)
return {'messages': [message]}
因此,我们的函数会返回一个用于更新代理状态的字典。由于我们使用 operator.add 作为状态的还原器,返回的消息将被追加到存储在状态中的消息列表中。
我们需要的下一个函数是 execute_function,它将运行我们的工具。如果 LLM 代理决定调用某个工具,我们将在 themessage.tool_calls 参数中看到它。
class SQLAgent:
<...>
def execute_function(self, state: AgentState):
tool_calls = state['messages'][-1].tool_calls
results = []
for tool in tool_calls:
# checking whether tool name is correct
if not t['name'] in self.tools:
# returning error to the agent
result = "Error: There's no such tool, please, try again"
else:
# getting result from the tool
result = self.tools[t['name']].invoke(t['args'])
results.append(
ToolMessage(
tool_call_id=t['id'],
name=t['name'],
content=str(result)
)
)
return {'messages': results}
在该函数中,我们遍历 LLM 返回的工具调用,并调用这些工具或返回错误信息。最后,我们的函数将返回包含单个关键信息的字典,用于更新图状态。
现在只剩下一个函数--条件边的函数,它定义了我们是否需要执行工具或提供最终结果。这个函数非常简单。我们只需检查最后一条信息是否包含任何工具调用。
class SQLAgent:
<...>
def exists_function_calling(self, state: AgentState):
result = state['messages'][-1]
return len(result.tool_calls) > 0
是时候为它创建一个代理和 LLM 模型了。我将使用新的 OpenAI GPT 4o mini 模型(doc),因为它比 GPT 3.5 更便宜,性能也更好。
import os
# setting up credentioals
os.environ["OPENAI_MODEL_NAME"]='gpt-4o-mini'
os.environ["OPENAI_API_KEY"] = '<your_api_key>'
# system prompt
prompt = '''You are a senior expert in SQL and data analysis.
So, you can help the team to gather needed data to power their decisions.
You are very accurate and take into account all the nuances in data.
Your goal is to provide the detailed documentation for the table in database
that will help users.'''
model = ChatOpenAI(model="gpt-4o-mini")
doc_agent = SQLAgent(model, [execute_sql], system=prompt)
LangGraph 为我们提供了相当方便的图形可视化功能。要使用它,你需要安装 pygraphviz 。
对于使用 M1/M2 芯片的 Mac 来说,这有点麻烦,所以这里为你提供了一些小窍门:
! brew install graphviz
! python3 -m pip install -U --no-cache-dir \
--config-settings="--global-option=build_ext" \
--config-settings="--global-option=-I$(brew --prefix graphviz)/include/" \
--config-settings="--global-option=-L$(brew --prefix graphviz)/lib/" \
pygraphviz
在弄清安装方法后,下面是我们的图表。
from IPython.display import Image
Image(doc_agent.graph.get_graph().draw_png())
正如你所看到的,我们的图形具有周期性。用 LCEL 来实现这样的功能将非常具有挑战性。
最后,是时候执行我们的代理了。我们需要以 HumanMessage 的形式传递包含问题的初始信息集。
messages = [HumanMessage(content="What info do we have in ecommerce_db.users table?")]
result = doc_agent.graph.invoke({"messages": messages})
在结果变量中,我们可以看到执行过程中生成的所有信息。该过程按预期运行:
代理决定通过查询 describe ecommerce.db_users 调用函数。
然后,LLM 处理了来自工具的信息,并提供了用户友好的答案。
result['messages']
# [
# HumanMessage(content='What info do we have in ecommerce_db.users table?'),
# AIMessage(content='', tool_calls=[{'name': 'execute_sql', 'args': {'query': 'DESCRIBE ecommerce_db.users;'}, 'id': 'call_qZbDU9Coa2tMjUARcX36h0ax', 'type': 'tool_call'}]),
# ToolMessage(content='user_id\tUInt64\t\t\t\t\t\ncountry\tString\t\t\t\t\t\nis_active\tUInt8\t\t\t\t\t\nage\tUInt64\t\t\t\t\t\n', name='execute_sql', tool_call_id='call_qZbDU9Coa2tMjUARcX36h0ax'),
# AIMessage(content='The `ecommerce_db.users` table contains the following columns: <...>')
# ]
这是最终结果。看起来还不错。
print(result['messages'][-1].content)
# The `ecommerce_db.users` table contains the following columns:
# 1. **user_id**: `UInt64` - A unique identifier for each user.
# 2. **country**: `String` - The country where the user is located.
# 3. **is_active**: `UInt8` - Indicates whether the user is active (1) or inactive (0).
# 4. **age**: `UInt64` - The age of the user.
使用预制代理
我们已经学会了如何从头开始构建代理。不过,我们也可以利用 LangGraph 的内置功能来完成类似的简单任务。
我们可以使用预构建的 ReAct 代理来获得类似的结果:一个可以使用工具的代理。
from langgraph.prebuilt import create_react_agent
prebuilt_doc_agent = create_react_agent(model, [execute_sql],
state_modifier = system_prompt)
它与我们之前构建的是同一个代理。我们稍后将试用它,但首先,我们需要了解另外两个重要概念:持久性和流。
持久性和流式传输
持久性是指在不同交互中保持上下文的能力。当应用程序可以从用户那里获得额外输入时,这对于代理用例来说至关重要。
LangGraph会在每一步之后自动保存状态,允许你暂停或继续执行。这一功能支持高级业务逻辑的实现,如错误恢复或人机交互。
添加持久性的最简单方法是使用内存 SQLite 数据库。
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")
对于现成的代理,我们可以在创建代理时将内存作为参数传递。
prebuilt_doc_agent = create_react_agent(model, [execute_sql],
checkpointer=memory)
如果使用自定义代理,则需要在编译图形时将内存作为校验指针传递。
class SQLAgent:
def __init__(self, model, tools, system_prompt = ""):
<...>
self.graph = graph.compile(checkpointer=memory)
<...>
让我们执行代理,探索 LangGraph 的另一项功能:流式传输。通过流式处理,我们可以将每一步的执行结果作为流中的单独事件接收。当需要同时处理多个对话(或线程)时,这一功能对于生产应用程序来说至关重要。
LangGraph 不仅支持事件流,还支持令牌级流。我脑海中令牌流的唯一用例是逐字实时显示答案(类似于 ChatGPT 的实现)。
让我们尝试在新的预构建代理中使用流。我还将对消息使用 pretty_print 函数,以使结果更具可读性。
# defining thread
thread = {"configurable": {"thread_id": "1"}}
messages = [HumanMessage(content="What info do we have in ecommerce_db.users table?")]
for event in prebuilt_doc_agent.stream({"messages": messages}, thread):
for v in event.values():
v['messages'][-1].pretty_print()
# ================================== Ai Message ==================================
# Tool Calls:
# execute_sql (call_YieWiChbFuOlxBg8G1jDJitR)
# Call ID: call_YieWiChbFuOlxBg8G1jDJitR
# Args:
# query: SELECT * FROM ecommerce_db.users LIMIT 1;
# ================================= Tool Message =================================
# Name: execute_sql
# 1000001 United Kingdom 0 70
#
# ================================== Ai Message ==================================
#
# The `ecommerce_db.users` table contains at least the following information for users:
#
# - **User ID** (e.g., `1000001`)
# - **Country** (e.g., `United Kingdom`)
# - **Some numerical value** (e.g., `0`)
# - **Another numerical value** (e.g., `70`)
#
# The specific meaning of the numerical values and additional columns
# is not clear from the single row retrieved. Would you like more details
# or a broader query?
有趣的是,代理并不能提供足够好的结果。由于代理没有查找表格模式,它很难猜出所有列的含义。我们可以通过在同一线程中使用后续问题来改进结果。
followup_messages = [HumanMessage(content="I would like to know the column names and types. Maybe you could look it up in database using describe.")]
for event in prebuilt_doc_agent.stream({"messages": followup_messages}, thread):
for v in event.values():
v['messages'][-1].pretty_print()
# ================================== Ai Message ==================================
# Tool Calls:
# execute_sql (call_sQKRWtG6aEB38rtOpZszxTVs)
# Call ID: call_sQKRWtG6aEB38rtOpZszxTVs
# Args:
# query: DESCRIBE ecommerce_db.users;
# ================================= Tool Message =================================
# Name: execute_sql
#
# user_id UInt64
# country String
# is_active UInt8
# age UInt64
#
# ================================== Ai Message ==================================
#
# The `ecommerce_db.users` table has the following columns along with their data types:
#
# | Column Name | Data Type |
# |-------------|-----------|
# | user_id | UInt64 |
# | country | String |
# | is_active | UInt8 |
# | age | UInt64 |
#
# If you need further information or assistance, feel free to ask!
这一次,我们从代理那里得到了完整的答案。因为我们提供了相同的线程,所以代理能够从之前的讨论中获得上下文。这就是持久性的工作原理。
让我们换一个线程,提出同样的后续问题。
new_thread = {"configurable": {"thread_id": "42"}}
followup_messages = [HumanMessage(content="I would like to know the column names and types. Maybe you could look it up in database using describe.")]
for event in prebuilt_doc_agent.stream({"messages": followup_messages}, new_thread):
for v in event.values():
v['messages'][-1].pretty_print()
# ================================== Ai Message ==================================
# Tool Calls:
# execute_sql (call_LrmsOGzzusaLEZLP9hGTBGgo)
# Call ID: call_LrmsOGzzusaLEZLP9hGTBGgo
# Args:
# query: DESCRIBE your_table_name;
# ================================= Tool Message =================================
# Name: execute_sql
#
# Database returned the following error:
# Code: 60. DB::Exception: Table default.your_table_name does not exist. (UNKNOWN_TABLE) (version 23.12.1.414 (official build))
#
# ================================== Ai Message ==================================
#
# It seems that the table `your_table_name` does not exist in the database.
# Could you please provide the actual name of the table you want to describe?
该代理缺乏回答我们问题所需的上下文,这并不奇怪。线程的设计目的是隔离不同的对话,确保每个线程保持自己的上下文。
在实际应用中,内存管理至关重要。对话可能会变得非常冗长,在某些时候,每次都将整个历史记录传给 LLM 是不现实的。因此,值得对信息进行修剪或过滤。
多代理系统
作为多代理工作流程的一个示例,我想建立一个可以处理来自不同领域问题的应用程序。我们将有一组专家代理,每个代理都擅长处理不同类型的问题,还有一个路由器代理,它将为每个查询找到最合适的专家。这样的应用有许多潜在用例:从自动客户支持到回答同事在内部聊天中提出的问题。
首先,我们需要创建代理状态--帮助代理共同解决问题的信息。我将使用以下字段:
class MultiAgentState(TypedDict):
question: str
question_type: str
answer: str
feedback: str
我没有使用任何还原器,因此我们的状态将只存储每个字段的最新版本。
然后,让我们创建一个路由器节点。它将是一个简单的 LLM 模型,用于定义问题的类别(数据库、LangChain 或一般问题)。
question_category_prompt = '''You are a senior specialist of analytical support. Your task is to classify the incoming questions.
Depending on your answer, question will be routed to the right team, so your task is crucial for our team.
There are 3 possible question types:
- DATABASE - questions related to our database (tables or fields)
- LANGCHAIN- questions related to LangGraph or LangChain libraries
- GENERAL - general questions
Return in the output only one word (DATABASE, LANGCHAIN or GENERAL).
'''
def router_node(state: MultiAgentState):
messages = [
SystemMessage(content=question_category_prompt),
HumanMessage(content=state['question'])
]
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke(messages)
return {"question_type": response.content}
现在我们有了第一个节点--路由器,让我们构建一个简单的图来测试工作流。
memory = SqliteSaver.from_conn_string(":memory:")
builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)
builder.set_entry_point("router")
builder.add_edge('router', END)
graph = builder.compile(checkpointer=memory)
让我们用不同类型的问题来测试我们的工作流程,看看它在实际操作中的表现如何。这将帮助我们评估路由器代理是否能正确地将问题分配给相应的专家代理。
thread = {"configurable": {"thread_id": "1"}}
for s in graph.stream({
'question': "Does LangChain support Ollama?",
}, thread):
print(s)
# {'router': {'question_type': 'LANGCHAIN'}}
thread = {"configurable": {"thread_id": "2"}}
for s in graph.stream({
'question': "What info do we have in ecommerce_db.users table?",
}, thread):
print(s)
# {'router': {'question_type': 'DATABASE'}}
thread = {"configurable": {"thread_id": "3"}}
for s in graph.stream({
'question': "How are you?",
}, thread):
print(s)
# {'router': {'question_type': 'GENERAL'}}
运行良好。我建议你逐步构建复杂图形,并独立测试每一步。采用这种方法,可以确保每次迭代都能按预期运行,并节省大量调试时间。
接下来,让我们为专家代理创建节点。我们将使用 ReAct 代理和之前创建的 SQL 工具作为数据库代理。
# database expert
sql_expert_system_prompt = '''
You are an expert in SQL, so you can help the team
to gather needed data to power their decisions.
You are very accurate and take into account all the nuances in data.
You use SQL to get the data before answering the question.
'''
def sql_expert_node(state: MultiAgentState):
model = ChatOpenAI(model="gpt-4o-mini")
sql_agent = create_react_agent(model, [execute_sql],
state_modifier = sql_expert_system_prompt)
messages = [HumanMessage(content=state['question'])]
result = sql_agent.invoke({"messages": messages})
return {'answer': result['messages'][-1].content}
对于与 LangChain 相关的问题,我们将使用 ReAct 代理。为了让代理能够回答有关图书馆的问题,我们将为其配备一个搜索引擎工具。为此,我选择了 Tavily,因为它能提供针对法律硕士应用优化的搜索结果。
如果你没有账户,可以免费注册使用 Tavily(每月最多 1K 个请求)。要开始使用,你需要在环境变量中指定 Tavily API 密钥。
# search expert
from langchain_community.tools.tavily_search import TavilySearchResults
os.environ["TAVILY_API_KEY"] = 'tvly-...'
tavily_tool = TavilySearchResults(max_results=5)
search_expert_system_prompt = '''
You are an expert in LangChain and other technologies.
Your goal is to answer questions based on results provided by search.
You don't add anything yourself and provide only information baked by other sources.
'''
def search_expert_node(state: MultiAgentState):
model = ChatOpenAI(model="gpt-4o-mini")
sql_agent = create_react_agent(model, [tavily_tool],
state_modifier = search_expert_system_prompt)
messages = [HumanMessage(content=state['question'])]
result = sql_agent.invoke({"messages": messages})
return {'answer': result['messages'][-1].content}
对于一般问题,我们将利用简单的 LLM 模型,而不使用特定工具。
# general model
general_prompt = '''You're a friendly assistant and your goal is to answer general questions.
Please, don't provide any unchecked information and just tell that you don't know if you don't have enough info.
'''
def general_assistant_node(state: MultiAgentState):
messages = [
SystemMessage(content=general_prompt),
HumanMessage(content=state['question'])
]
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke(messages)
return {"answer": response.content}
最后一点是路由的条件函数。这一点非常简单,我们只需从路由器节点定义的状态中传播问题类型即可。
def route_question(state: MultiAgentState):
return state['question_type']
现在,是时候创建我们的图表了。
builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)
builder.add_node('database_expert', sql_expert_node)
builder.add_node('langchain_expert', search_expert_node)
builder.add_node('general_assistant', general_assistant_node)
builder.add_conditional_edges(
"router",
route_question,
{'DATABASE': 'database_expert',
'LANGCHAIN': 'langchain_expert',
'GENERAL': 'general_assistant'}
)
builder.set_entry_point("router")
builder.add_edge('database_expert', END)
builder.add_edge('langchain_expert', END)
builder.add_edge('general_assistant', END)
graph = builder.compile(checkpointer=memory)
现在,我们可以在几个问题上对设置进行测试,看看它的性能如何。
thread = {"configurable": {"thread_id": "2"}}
results = []
for s in graph.stream({
'question': "What info do we have in ecommerce_db.users table?",
}, thread):
print(s)
results.append(s)
print(results[-1]['database_expert']['answer'])
# The `ecommerce_db.users` table contains the following columns:
# 1. **User ID**: A unique identifier for each user.
# 2. **Country**: The country where the user is located.
# 3. **Is Active**: A flag indicating whether the user is active (1 for active, 0 for inactive).
# 4. **Age**: The age of the user.
# Here are some sample entries from the table:
#
# | User ID | Country | Is Active | Age |
# |---------|----------------|-----------|-----|
# | 1000001 | United Kingdom | 0 | 70 |
# | 1000002 | France | 1 | 87 |
# | 1000003 | France | 1 | 88 |
# | 1000004 | Germany | 1 | 25 |
# | 1000005 | Germany | 1 | 48 |
#
# This gives an overview of the user data available in the table.
它给出了数据库相关问题的相关结果。让我们试着问一下 LangChain。
thread = {"configurable": {"thread_id": "42"}}
results = []
for s in graph.stream({
'question': "Does LangChain support Ollama?",
}, thread):
print(s)
results.append(s)
print(results[-1]['langchain_expert']['answer'])
# Yes, LangChain supports Ollama. Ollama allows you to run open-source
# large language models, such as Llama 2, locally, and LangChain provides
# a flexible framework for integrating these models into applications.
# You can interact with models run by Ollama using LangChain, and there are
# specific wrappers and tools available for this integration.
#
# For more detailed information, you can visit the following resources:
# - [LangChain and Ollama Integration](https://js.langchain.com/v0.1/docs/integrations/llms/ollama/)
# - [ChatOllama Documentation](https://js.langchain.com/v0.2/docs/integrations/chat/ollama/)
# - [Medium Article on Ollama and LangChain](https://medium.com/@abonia/ollama-and-langchain-run-llms-locally-900931914a46)
很明显,Tavily 的搜索对法律硕士申请很有效。
增加人机互动
我们已经出色地创建了一个回答问题的工具。不过,在很多情况下,让人工参与到环路中来批准建议的操作或提供额外的反馈是有益的。让我们添加一个步骤,在将最终结果返回给用户之前收集人工反馈。
最简单的方法是增加两个节点:
让我们创建这些节点:
def human_feedback_node(state: MultiAgentState):
pass
editor_prompt = '''You're an editor and your goal is to provide the final answer to the customer, taking into account the feedback.
You don't add any information on your own. You use friendly and professional tone.
In the output please provide the final answer to the customer without additional comments.
Here's all the information you need.
Question from customer:
----
{question}
----
Draft answer:
----
{answer}
----
Feedback:
----
{feedback}
----
'''
def editor_node(state: MultiAgentState):
messages = [
SystemMessage(content=editor_prompt.format(question = state['question'], answer = state['answer'], feedback = state['feedback']))
]
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke(messages)
return {"answer": response.content}
让我们将这些节点添加到图中。此外,我们还需要在人类节点前引入一个中断,以确保流程暂停以听取人类的反馈。
builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)
builder.add_node('database_expert', sql_expert_node)
builder.add_node('langchain_expert', search_expert_node)
builder.add_node('general_assistant', general_assistant_node)
builder.add_node('human', human_feedback_node)
builder.add_node('editor', editor_node)
builder.add_conditional_edges(
"router",
route_question,
{'DATABASE': 'database_expert',
'LANGCHAIN': 'langchain_expert',
'GENERAL': 'general_assistant'}
)
builder.set_entry_point("router")
builder.add_edge('database_expert', 'human')
builder.add_edge('langchain_expert', 'human')
builder.add_edge('general_assistant', 'human')
builder.add_edge('human', 'editor')
builder.add_edge('editor', END)
graph = builder.compile(checkpointer=memory, interrupt_before = ['human'])
现在,当我们运行图形时,执行将在人类节点之前停止。
thread = {"configurable": {"thread_id": "2"}}
for event in graph.stream({
'question': "What are the types of fields in ecommerce_db.users table?",
}, thread):
print(event)
# {'question_type': 'DATABASE', 'question': 'What are the types of fields in ecommerce_db.users table?'}
# {'router': {'question_type': 'DATABASE'}}
# {'database_expert': {'answer': 'The `ecommerce_db.users` table has the following fields:\n\n1. **user_id**: UInt64\n2. **country**: String\n3. **is_active**: UInt8\n4. **age**: UInt64'}}
让我们了解客户的意见,并根据反馈更新状态。
user_input = input("Do I need to change anything in the answer?")
# Do I need to change anything in the answer?
# It looks wonderful. Could you only make it a bit friendlier please?
graph.update_state(thread, {"feedback": user_input}, as_node="human")
我们可以检查该状态,以确认反馈已被填充,并且序列中的下一个节点是编辑器。
print(graph.get_state(thread).values['feedback'])
# It looks wonderful. Could you only make it a bit friendlier please?
print(graph.get_state(thread).next)
# ('editor',)
我们可以继续执行。如果输入 “None”,进程将从暂停的位置继续执行。
for event in graph.stream(None, thread, stream_mode="values"):
print(event)
print(event['answer'])
# Hello! The `ecommerce_db.users` table has the following fields:
# 1. **user_id**: UInt64
# 2. **country**: String
# 3. **is_active**: UInt8
# 4. **age**: UInt64
# Have a nice day!
编辑考虑了我们的反馈意见,在我们的最终信息中添加了一些礼貌用语。这真是一个神奇的结果!
我们可以通过为编辑器配备 “人性化 ”工具,以更具代理性的方式实现 “人环互动”。
让我们调整一下编辑器。我稍微改动了一下提示,并为代理添加了工具。
from langchain_community.tools import HumanInputRun
human_tool = HumanInputRun()
editor_agent_prompt = '''You're an editor and your goal is to provide the final answer to the customer, taking into the initial question.
If you need any clarifications or need feedback, please, use human. Always reach out to human to get the feedback before final answer.
You don't add any information on your own. You use friendly and professional tone.
In the output please provide the final answer to the customer without additional comments.
Here's all the information you need.
Question from customer:
----
{question}
----
Draft answer:
----
{answer}
----
'''
model = ChatOpenAI(model="gpt-4o-mini")
editor_agent = create_react_agent(model, [human_tool])
messages = [SystemMessage(content=editor_agent_prompt.format(question = state['question'], answer = state['answer']))]
editor_result = editor_agent.invoke({"messages": messages})
# Is the draft answer complete and accurate for the customer's question about the types of fields in the ecommerce_db.users table?
# Yes, but could you please make it friendlier.
print(editor_result['messages'][-1].content)
# The `ecommerce_db.users` table has the following fields:
# 1. **user_id**: UInt64
# 2. **country**: String
# 3. **is_active**: UInt8
# 4. **age**: UInt64
#
# If you have any more questions, feel free to ask!
于是,编辑向用户提出了这样一个问题:"对于客户提出的关于 ecommerce_db.users 表中字段类型的问题,该回答草案是否完整、准确?收到反馈后,编辑改进了答案,使其更方便用户使用。
让我们更新主图,加入新的代理,而不是使用两个独立的节点。有了这种方法,我们就不再需要中断了。
def editor_agent_node(state: MultiAgentState):
model = ChatOpenAI(model="gpt-4o-mini")
editor_agent = create_react_agent(model, [human_tool])
messages = [SystemMessage(content=editor_agent_prompt.format(question = state['question'], answer = state['answer']))]
result = editor_agent.invoke({"messages": messages})
return {'answer': result['messages'][-1].content}
builder = StateGraph(MultiAgentState)
builder.add_node("router", router_node)
builder.add_node('database_expert', sql_expert_node)
builder.add_node('langchain_expert', search_expert_node)
builder.add_node('general_assistant', general_assistant_node)
builder.add_node('editor', editor_agent_node)
builder.add_conditional_edges(
"router",
route_question,
{'DATABASE': 'database_expert',
'LANGCHAIN': 'langchain_expert',
'GENERAL': 'general_assistant'}
)
builder.set_entry_point("router")
builder.add_edge('database_expert', 'editor')
builder.add_edge('langchain_expert', 'editor')
builder.add_edge('general_assistant', 'editor')
builder.add_edge('editor', END)
graph = builder.compile(checkpointer=memory)
thread = {"configurable": {"thread_id": "42"}}
results = []
for event in graph.stream({
'question': "What are the types of fields in ecommerce_db.users table?",
}, thread):
print(event)
results.append(event)
该图的工作原理与前一个类似。我个人更喜欢这种方法,因为它利用了工具,使解决方案更加灵活。例如,代理可以多次联系人类,并根据需要完善问题。
就是这样。我们已经构建了一个多代理系统,可以回答来自不同领域的问题,并考虑到人类的反馈。
总结
在本文中,我们探讨了 LangGraph 库及其在构建单个和多个代理工作流中的应用。我们研究了它的一系列功能,现在是总结其优缺点的时候了。此外,将 LangGraph 与 CrewAI 进行比较也很有用。
总的来说,我认为 LangGraph 是构建复杂 LLM 应用程序的强大框架: