在信息检索中,一个重大挑战是缺乏能够智能处理和路由查询的高效代理。传统系统经常难以动态处理复杂查询,尤其是在处理 Neo4j 矢量和图形数据库中存储的大量复杂数据时。这种限制可能会导致检索结果不理想,从而阻碍人工智能应用程序在综合和生成知情响应方面的全部潜力。
该解决方案有效地集成了 Neo4j 图链的 LangChain 模板,特别是矢量和图 Cypher 模板,将这些链转换为使代理能够做出明智决策的工具。Neo4j 矢量链模板允许你平衡精确嵌入,并通过实施高级检索策略来保留上下文。而 Neo4j 密码链模板将自然语言问题转换为 Cypher 查询(用于从 Neo4j 数据库获取数据),执行查询,并根据查询结果提供自然语言响应。这种方法使代理能够根据查询智能地决定使用哪个链或工具。通过结合Neo4j的矢量和图形功能的优势,该系统不仅优化了查询处理,还提高了信息检索的整体质量。
LangChain
LangChain智能体利用大型语言模型来动态选择和排序动作,在人工智能应用中充当智能决策者。这些代理在复杂的推理的支持下,可以评估输入和上下文,以确定最有效的行动方案,从而增强人工智能驱动流程的能力和适应性。此外,链是对组件的调用序列,充当工作流中的模块化、可配置元素。它们支持大型语言模型与其他组件的集成,在复杂的应用场景中提供多功能性和效率。在这个项目中,将它们集成起来以增强 RAG 是一个有趣的挑战。更具体地说,代理评估用户问题并决定最佳行动方案。链被概念化为一系列动作和功能,被用作代理可以使用的工具。
高级检索策略
传统 RAG 方法中朴素向量相似性搜索的主要问题是它们准确识别大型文档中的上下文和特定概念的能力有限。这通常会导致检索概括性或不相关的信息,从而降低响应的有效性。先进的 RAG 策略通过将数据分割成更有意义的单元来解决这些挑战,从而实现上下文感知和概念精确的有针对性的检索。
LangChain模板
LangChain 模板代表了一种提供可部署参考架构集合的方法,可简化链和代理的创建和定制。这些模板采用标准格式设计,可轻松与 LangServe 集成,从而促进生产就绪 API 的快速部署。该框架不仅加速了开发进程,还使广大用户能够轻松共享、维护和增强人工智能功能。
实施
Retrievers.py
该文件包含从Neo4j向量索引使用不同检索策略检索数据的代码。它本质上是用于链式操作,一个提示被传递给检索器,其中包含对话历史和用户问题,当被调用时,它从特定检索策略的索引中检索数据。
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Neo4jVector
# Typical RAG retriever
typical_rag = Neo4jVector.from_existing_index(
OpenAIEmbeddings(), index_name="typical_rag"
)
# Parent retriever
parent_query = """
MATCH (node)<-[:HAS_CHILD]-(parent)
WITH parent, max(score) AS score // deduplicate parents
RETURN parent.text AS text, score, {} AS metadata LIMIT 1
"""
parent_vectorstore = Neo4jVector.from_existing_index(
OpenAIEmbeddings(),
index_name="parent_document",
retrieval_query=parent_query,
)
# Hypothetic questions retriever
hypothetic_question_query = """
MATCH (node)<-[:HAS_QUESTION]-(parent)
WITH parent, max(score) AS score // deduplicate parents
RETURN parent.text AS text, score, {} AS metadata
"""
hypothetic_question_vectorstore = Neo4jVector.from_existing_index(
OpenAIEmbeddings(),
index_name="hypothetical_questions",
retrieval_query=hypothetic_question_query,
)
# Summary retriever
summary_query = """
MATCH (node)<-[:HAS_SUMMARY]-(parent)
WITH parent, max(score) AS score // deduplicate parents
RETURN parent.text AS text, score, {} AS metadata
"""
summary_vectorstore = Neo4jVector.from_existing_index(
OpenAIEmbeddings(),
index_name="summary",
retrieval_query=summary_query,
)
History.py
这个文件包含了一个对话记忆模块,它将对话历史存储在Neo4j图形数据库中。会话记忆是为每个用户会话单独维护的,确保了个性化的互动。我努力将这些功能合并为一个,但由于可能与索引有差异的输入细节涉及到一些复杂的问题。
from typing import Any, Dict, List, Union
from langchain.graphs import Neo4jGraph
from langchain.memory import ChatMessageHistory
from langchain.schema import AIMessage, HumanMessage
graph = Neo4jGraph()
def convert_messages(input: List[Dict[str, Any]]) -> ChatMessageHistory:
history = ChatMessageHistory()
for item in input:
history.add_user_message(item["result"]["question"])
history.add_ai_message(item["result"]["answer"])
return history
def get_vector_history(input: Dict[str, Any]) -> List[Union[HumanMessage, AIMessage]]:
# Lookback conversation window
window = 3
data = graph.query(
"""
MATCH (u:User {id:$user_id})-[:HAS_SESSION]->(s:Session {id:$session_id}),
(s)-[:LAST_MESSAGE]->(last_message)
MATCH p=(last_message)<-[:NEXT*0.."""
+ str(window)
+ """]-()
WITH p, length(p) AS length
ORDER BY length DESC LIMIT 1
UNWIND reverse(nodes(p)) AS node
MATCH (node)-[:HAS_ANSWER]->(answer)
RETURN {question:node.text, answer:answer.text} AS result
""",
params=input,
)
history = convert_messages(data)
return history.messages
def save_vector_history(input: Dict[str, Any]) -> str:
input["context"] = [el.page_content for el in input["context"]]
# print(input)
has_history = bool(input.pop("chat_history"))
# store history to database
if has_history:
graph.query(
"""
MATCH (u:User {id: $user_id})-[:HAS_SESSION]->(s:Session{id: $session_id}),
(s)-[l:LAST_MESSAGE]->(last_message)
CREATE (last_message)-[:NEXT]->(q:Question
{text:$question, rephrased:$rephrased_question, date:datetime()}),
(q)-[:HAS_ANSWER]->(:Answer {text:$output}),
(s)-[:LAST_MESSAGE]->(q)
DELETE l
WITH q
UNWIND $context AS c
MATCH (n) WHERE elementId(n) = c
MERGE (q)-[:RETRIEVED]->(n)
""",
params=input,
)
else:
graph.query(
"""MERGE (u:User {id: $user_id})
CREATE (u)-[:HAS_SESSION]->(s1:Session {id:$session_id}),
(s1)-[:LAST_MESSAGE]->(q:Question
{text:$question, rephrased:$rephrased_question, date:datetime()}),
(q)-[:HAS_ANSWER]->(:Answer {text:$output})
WITH q
UNWIND $context AS c
MATCH (n) WHERE elementId(n) = c
MERGE (q)-[:RETRIEVED]->(n)
""",
params=input,
)
# Return LLM response to the chain
return input["output"]
def get_graph_history(input: Dict[str, Any]) -> ChatMessageHistory:
input.pop("question")
# Lookback conversation window
window = 3
data = graph.query(
"""
MATCH (u:User {id:$user_id})-[:HAS_SESSION]->(s:Session {id:$session_id}),
(s)-[:LAST_MESSAGE]->(last_message)
MATCH p=(last_message)<-[:NEXT*0.."""
+ str(window)
+ """]-()
WITH p, length(p) AS length
ORDER BY length DESC LIMIT 1
UNWIND reverse(nodes(p)) AS node
MATCH (node)-[:HAS_ANSWER]->(answer)
RETURN {question:node.text, answer:answer.text} AS result
""",
params=input,
)
history = convert_messages(data)
return history.messages
def save_graph_history(input):
input.pop("response")
# store history to database
graph.query(
"""MERGE (u:User {id: $user_id})
WITH u
OPTIONAL MATCH (u)-[:HAS_SESSION]->(s:Session{id: $session_id}),
(s)-[l:LAST_MESSAGE]->(last_message)
FOREACH (_ IN CASE WHEN last_message IS NULL THEN [1] ELSE [] END |
CREATE (u)-[:HAS_SESSION]->(s1:Session {id:$session_id}),
(s1)-[:LAST_MESSAGE]->(q:Question {text:$question, cypher:$query, date:datetime()}),
(q)-[:HAS_ANSWER]->(:Answer {text:$output}))
FOREACH (_ IN CASE WHEN last_message IS NOT NULL THEN [1] ELSE [] END |
CREATE (last_message)-[:NEXT]->(q:Question
{text:$question, cypher:$query, date:datetime()}),
(q)-[:HAS_ANSWER]->(:Answer {text:$output}),
(s)-[:LAST_MESSAGE]->(q)
DELETE l) """,
params=input,
)
# Return LLM response to the chain
return input["output"]
这段代码本身就是LangChain中的一个独立模板,它利用Neo4j向量索引执行RAG(检索式问答)。对于我的项目来说,我将这个链视为一个工具,最终会传递给一个代理,用于决策。这个工具的描述非常简单 —— “用于检索关于沙丘特定上下文的实用工具”。链首先从内存中检索数据,并使用用户问题通过CONDENSE_QUESTION_PROMPT提示模板来改述原始问题。接下来,它使用改述后的问题从Neo4j向量索引中检索数据,并将其作为上下文传递给ANSWER_PROMPT。最后,我们使用包含系统消息指令、上下文、谈话历史和用户问题的ANSWER_PROMPT,来输出与上下文相关的答案。
from operator import itemgetter
from typing import Optional, Type
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate, PromptTemplate, MessagesPlaceholder
from langchain.pydantic_v1 import BaseModel
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import ConfigurableField, RunnablePassthrough
from neo4j_advanced_rag.history import get_vector_history, save_vector_history
from langchain.tools import BaseTool
from pydantic import BaseModel, BaseSettings
from typing import Type, Any
from neo4j_advanced_rag.retrievers import (
hypothetic_question_vectorstore,
parent_vectorstore,
summary_vectorstore,
typical_rag,
)
class Question(BaseModel):
question: str
user_id: str
session_id: str
class VectorTool(BaseTool, BaseSettings):
name = "vector_tool"
description = "Useful Tool for retrieving specific context about Dune"
args_schema: Type[Question] = Question
def _run(self, question, user_id, session_id):
retriever = typical_rag.as_retriever().configurable_alternatives(
ConfigurableField(id="strategy"),
default_key="typical_rag",
parent_strategy=parent_vectorstore.as_retriever(),
hypothetical_questions=hypothetic_question_vectorstore.as_retriever(),
summary_strategy=summary_vectorstore.as_retriever(),
)
# Define LLM
llm = ChatOpenAI()
# Condense a chat history and follow-up question into a standalone question
condense_template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language.
Make sure to include all the relevant information.
Chat History:
{chat_history}
Follow Up Input: {question}
Standalone question:""" # noqa: E501
CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(condense_template)
# RAG answer synthesis prompt
answer_template = """Answer the question based only on the following context:
<context>
{context}
</context>"""
ANSWER_PROMPT = ChatPromptTemplate.from_messages(
[
("system", answer_template),
MessagesPlaceholder(variable_name="chat_history"),
("user", "{question}"),
]
)
chain = (
RunnablePassthrough.assign(chat_history=get_vector_history)
| RunnablePassthrough.assign(rephrased_question=CONDENSE_QUESTION_PROMPT | llm | StrOutputParser())
| RunnablePassthrough.assign(context=itemgetter("rephrased_question") | retriever)
| RunnablePassthrough.assign(output=ANSWER_PROMPT | llm | StrOutputParser())
| save_vector_history
).with_types(input_type=Question)
return chain.invoke(
{
"question": question,
"user_id": user_id,
"session_id": session_id,
},
{"configurable": {"strategy": "typical_rag"}} #todo
)
这个工具的整体结构与向量工具非常相似。它将自然语言问题转换成Cypher查询(用于从Neo4j数据库获取数据),执行查询,并根据查询结果提供自然语言回应。描述这个工具时使用了“用于检索与Dune相关的结构化、互联的和关系性知识的有用工具”。然而,在这一链条中有一个非常有趣的组件——CypherQueryCorrector,它基本上是根据给定的模式验证并修正Cypher语句中的关系方向。
from langchain.chains.graph_qa.cypher_utils import CypherQueryCorrector, Schema
from langchain.chat_models import ChatOpenAI
from langchain.graphs import Neo4jGraph
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.pydantic_v1 import BaseModel
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from typing import Type
from neo4j_advanced_rag.history import get_graph_history, save_graph_history
from langchain.tools import BaseTool
class Question(BaseModel):
question: str
user_id: str
session_id: str
class GraphTool(BaseTool):
name = "graph_tool"
description = "Useful Tool for retrieving structural, interconnected and relational knowledge related to Dune"
args_schema: Type[Question] = Question
def _run(self, question, user_id, session_id):
# Connection to Neo4j
graph = Neo4jGraph()
# Cypher validation tool for relationship directions
corrector_schema = [
Schema(el["start"], el["type"], el["end"])
for el in graph.structured_schema.get("relationships")
]
cypher_validation = CypherQueryCorrector(corrector_schema)
# LLMs
cypher_llm = ChatOpenAI(model_name="gpt-4", temperature=0.0)
qa_llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0.0)
# Generate Cypher statement based on natural language input
cypher_template = """This is important for my career.
Based on the Neo4j graph schema below, write a Cypher query that would answer the user's question:
{schema}
Question: {question}
Cypher query:""" # noqa: E501
cypher_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"Given an input question, convert it to a Cypher query. No pre-amble.",
),
MessagesPlaceholder(variable_name="history"),
("human", cypher_template),
]
)
cypher_response = (
RunnablePassthrough.assign(schema=lambda _: graph.get_schema, history=get_graph_history)
| cypher_prompt
| cypher_llm.bind(stop=["\nCypherResult:"])
| StrOutputParser()
)
# Generate natural language response based on database results
response_template = """Based on the the question, Cypher query, and Cypher response, write a natural language response:
Question: {question}
Cypher query: {query}
Cypher Response: {response}""" # noqa: E501
response_prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"Given an input question and Cypher response, convert it to a "
"natural language answer. No pre-amble.",
),
("human", response_template),
]
)
chain = (
RunnablePassthrough.assign(query=cypher_response)
| RunnablePassthrough.assign(
response=lambda x: graph.query(cypher_validation(x["query"])),
)
| RunnablePassthrough.assign(
output=response_prompt | qa_llm | StrOutputParser(),
)
| save_graph_history
).with_types(input_type=Question)
return chain.invoke(
{
"question": question,
"user_id": user_id,
"session_id": session_id,
}
)
现在我们已经定义了向量和图形工具,定义一个代理就变得直截了当。我们将这些工具转换为OpenAI函数,以利用OpenAI的高效函数调用功能。函数调用是一个有趣的功能,因为它允许传递Pydantic类并返回结构化的输出,这对于进一步处理来说更好。
from pydantic import BaseModel, Field
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.pydantic_v1 import BaseModel
from langchain.chat_models import ChatOpenAI
from langchain.agents import AgentExecutor
from langchain.tools.render import format_tool_to_openai_function
from langchain.agents.format_scratchpad import format_to_openai_function_messages
from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser
from langchain.tools import tool
from langchain.agents import AgentExecutor
from neo4j_advanced_rag.neo4j_vector import VectorTool
from neo4j_advanced_rag.neo4j_cypher import GraphTool
from langchain.schema.runnable import ConfigurableField
from neo4j_advanced_rag.retrievers import (
hypothetic_question_vectorstore,
parent_vectorstore,
summary_vectorstore,
typical_rag,
)
class AgentInput(BaseModel):
input: str
user_id: str
session_id: str
llm = ChatOpenAI()
vector_tool = VectorTool()
graph_tool = GraphTool()
tools = [vector_tool, graph_tool]
llm_with_tools = llm.bind(functions=[format_tool_to_openai_function(t) for t in tools])
assistant_system_message = """You are a helpful assistant. \
Use one of the tools provided to you if necessary."""
prompt = ChatPromptTemplate.from_messages(
[
("system", assistant_system_message),
("user", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
]
)
agent = (
{
"input": lambda x: x["input"],
"user_id": lambda x: x["user_id"],
"session_id": lambda x: x["session_id"],
"agent_scratchpad": lambda x: format_to_openai_function_messages(
x["intermediate_steps"]
)
}
| prompt
| llm_with_tools
| OpenAIFunctionsAgentOutputParser()
)
agent_executor = AgentExecutor(agent=agent, tools=tools).with_types(input_type=AgentInput)
agent_executor = agent_executor | (lambda x: x["output"])
结果
为了运行应用程序并在LangServe上查看其运行情况,请在项目目录的根目录下运行langchain serve命令。当你有不同的数据存储在矢量和图索引中,或者当你有非常好的工具描述以供代理调用时,这种方法确实可以非常有效。但我的目标是要全面了解整个项目,这样在将来我再从事类似项目时,我可以迅速地将其联系起来。
为了演示目的,我向执行代理提供了2个问题,以便我能够调用其中的一个工具。以下是一个样本回应。
How many tools have you been provided?
Can you please use this tool and answer what is the plot of Dune?
总结
该项目将 Neo4j 图数据库与 LangChain 代理集成,利用向量和 Cypher 链作为工具有效地处理查询。该系统采用高级检索策略,提高了从向量和图数据库提取的信息的精确性和相关性。它具有一个对话记忆模块,确保每次用户交互都是在上下文中进行的。装备了这些工具的代理,会根据查询来做出关于使用哪种检索方法的明智决定。这种方法优化了检索特定数据和保持整体上下文之间的平衡。实施方法直接简单,侧重于实用性和适应不同数据类型的能力。该项目旨在提高 AI 驱动的数据检索和处理的效率和准确性。