使用Neo4j和LangGraph实现GraphReader

2024年09月24日 由 alex 发表 98 0

大型语言模型(LLM)在摘要和情感分析等传统 NLP 任务中表现出色,但更强大的模型也展现出了良好的推理能力。LLM 的推理能力通常被理解为通过制定计划、执行计划和评估每一步的进展来解决复杂问题的能力。根据评估结果,它们可以通过修改计划或采取替代行动来进行调整。在回答 RAG 应用中的复杂问题时,代理的崛起正日益成为一种引人注目的方法。


在本篇文章中,我们将探讨 GraphReader 代理的实现。该代理旨在从遵循预定义模式的结构化知识图中检索信息。与你可能在演示文稿中看到的典型图不同,这个图更接近于文档或词法图,包含文档、文档块以及以原子事实形式存在的相关元数据。


10


上图展示了一个知识图谱,从顶部开始是一个标记为圣女贞德的文档节点。该文档被分解为文本块,由编号为 0、1、2、3 的圆形节点表示,这些节点通过 NEXT 关系依次连接,表示文本块在文档中出现的顺序。在文本块下方,图表进一步细分为原子事实,表示有关内容的具体陈述。最后,在图的底层,我们看到了关键元素,它们以圆形节点表示,主题包括历史图标、丹麦人、法兰西民族和法国。这些元素就像元数据一样,将事实与文档相关的更广泛的主题和概念联系起来。


构建知识图谱后,我们将按照 GraphReader 论文中提供的方法进行实施。


11


代理探索过程包括用合理的计划初始化代理,并选择初始节点开始在图中搜索。代理在探索这些节点时,首先收集原子事实,然后阅读相关文本块,并更新其笔记本。代理可根据收集到的信息,决定探索更多文本块、邻近节点或终止搜索。当代理决定终止时,将执行答案推理步骤以生成最终答案。


我们将使用 Neo4j 作为存储层,并结合 LangChain 和 LangGraph 来定义代理及其流程,从而实现 GraphReader 论文。


环境设置

你需要设置一个 Neo4j 来学习本博文中的示例。最简单的方法是在 Neo4j Aura 上启动一个免费实例,它提供 Neo4j 数据库的云实例。另外,你也可以通过下载 Neo4j Desktop 应用程序并创建本地数据库实例来建立 Neo4j 数据库的本地实例。


以下代码将实例化一个 LangChain 封装器,以连接 Neo4j 数据库。


os.environ["NEO4J_URI"] = "bolt://localhost:7687"
os.environ["NEO4J_USERNAME"] = "neo4j"
os.environ["NEO4J_PASSWORD"] = "password"
graph = Neo4jGraph(refresh_schema=False)
graph.query("CREATE CONSTRAINT IF NOT EXISTS FOR (c:Chunk) REQUIRE c.id IS UNIQUE")
graph.query("CREATE CONSTRAINT IF NOT EXISTS FOR (c:AtomicFact) REQUIRE c.id IS UNIQUE")
graph.query("CREATE CONSTRAINT IF NOT EXISTS FOR (c:KeyElement) REQUIRE c.id IS UNIQUE")


此外,我们还为将要使用的节点类型添加了限制条件。这些约束可确保更快的导入和检索性能。


此外,你还需要在以下代码中传递一个 OpenAI api 密钥:


os.environ["OPENAI_API_KEY"] = getpass.getpass("OpenAI API Key:")


构建图表

在本例中,我们将使用圣女贞德维基百科页面。我们将使用 LangChain 内置工具来检索文本。


wikipedia = WikipediaQueryRun(
    api_wrapper=WikipediaAPIWrapper(doc_content_chars_max=10000)
)
text = wikipedia.run("Joan of Arc")


如前所述,GraphReader 代理期望知识图谱包含块、相关原子事实和关键元素。


12


首先,将文档分割成块。在论文中,他们在分块的同时保持了段落结构。然而,这很难用通用的方法做到。因此,我们将在这里使用天真分块法。


接下来,LLM 会对每个分块进行处理,以识别原子事实,即捕捉核心细节的最小、不可分割的信息单位。例如,从句子“Neo4j 的首席执行官是 Emil Eifrem,位于瑞典”中,原子事实可以分解为“Neo4j 的首席执行官是 Emil Eifrem。”和“Neo4j 在瑞典。”每个原子事实都集中在一个清晰、独立的信息上。


从这些原子事实中,可以识别出关键要素。对于第一个事实 “Neo4j 的首席执行官是 Emil Eifrem”,关键要素是 “首席执行官”、“Neo4j ”和 “Emil Eifrem”。第二个事实 “Neo4j 位于瑞典 ”的关键要素是 “Neo4j ”和 “瑞典”。这些关键要素是基本名词和专有名词,它们捕捉了每个原子事实的核心含义。


本文附录中提供了用于提取图表的提示。


13


作者使用了基于提示的提取方法,即指示 LLM 应该输出什么,然后实现一个函数,以结构化的方式解析信息。在提取结构化信息时,我更倾向于使用 LangChain 中的 with_structured_output 方法,它可以利用工具功能提取结构化信息。这样,我们就可以跳过定义自定义解析函数。


下面是我们可以用于提取的提示。


construction_system = """
You are now an intelligent assistant tasked with meticulously extracting both key elements and
atomic facts from a long text.
1. Key Elements: The essential nouns (e.g., characters, times, events, places, numbers), verbs (e.g.,
actions), and adjectives (e.g., states, feelings) that are pivotal to the text’s narrative.
2. Atomic Facts: The smallest, indivisible facts, presented as concise sentences. These include
propositions, theories, existences, concepts, and implicit elements like logic, causality, event
sequences, interpersonal relationships, timelines, etc.
Requirements:
#####
1. Ensure that all identified key elements are reflected within the corresponding atomic facts.
2. You should extract key elements and atomic facts comprehensively, especially those that are
important and potentially query-worthy and do not leave out details.
3. Whenever applicable, replace pronouns with their specific noun counterparts (e.g., change I, He,
She to actual names).
4. Ensure that the key elements and atomic facts you extract are presented in the same language as
the original text (e.g., English or Chinese).
"""
construction_human = """Use the given format to extract information from the 
following input: {input}"""
construction_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            construction_system,
        ),
        (
            "human",
            (
                "Use the given format to extract information from the "
                "following input: {input}"
            ),
        ),
    ]
)


我们将指令放在系统提示中,然后在用户信息中提供需要处理的相关文本块。


为了定义所需的输出,我们可以使用 Pydantic 对象定义。


class AtomicFact(BaseModel):
    key_elements: List[str] = Field(description="""The essential nouns (e.g., characters, times, events, places, numbers), verbs (e.g.,
actions), and adjectives (e.g., states, feelings) that are pivotal to the atomic fact's narrative.""")
    atomic_fact: str = Field(description="""The smallest, indivisible facts, presented as concise sentences. These include
propositions, theories, existences, concepts, and implicit elements like logic, causality, event
sequences, interpersonal relationships, timelines, etc.""")
class Extraction(BaseModel):
    atomic_facts: List[AtomicFact] = Field(description="List of atomic facts")


我们希望提取一个原子事实列表,其中每个原子事实都包含一个包含事实的字符串字段和一个当前关键元素列表。为获得最佳结果,必须为每个元素添加描述。


现在,我们可以将它们组合成一个链。


model = ChatOpenAI(model="gpt-4o-2024-08-06", temperature=0.1)
structured_llm = model.with_structured_output(Extraction)
construction_chain = construction_prompt | structured_llm


为了将这一切整合在一起,我们将创建一个函数,获取单个文档,对其进行分块,提取原子事实和关键元素,并将结果存储到 Neo4j 中。


async def process_document(text, document_name, chunk_size=2000, chunk_overlap=200):
    start = datetime.now()
    print(f"Started extraction at: {start}")
    text_splitter = TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    texts = text_splitter.split_text(text)
    print(f"Total text chunks: {len(texts)}")
    tasks = [
        asyncio.create_task(construction_chain.ainvoke({"input":chunk_text}))
        for index, chunk_text in enumerate(texts)
    ]
    results = await asyncio.gather(*tasks)
    print(f"Finished LLM extraction after: {datetime.now() - start}")
    docs = [el.dict() for el in results]
    for index, doc in enumerate(docs):
        doc['chunk_id'] = encode_md5(texts[index])
        doc['chunk_text'] = texts[index]
        doc['index'] = index
        for af in doc["atomic_facts"]:
            af["id"] = encode_md5(af["atomic_fact"])
    # Import chunks/atomic facts/key elements
    graph.query(import_query, 
            params={"data": docs, "document_name": document_name})
    # Create next relationships between chunks
    graph.query("""MATCH (c:Chunk) WHERE c.document_name = $document_name
WITH c ORDER BY c.index WITH collect(c) AS nodes
UNWIND range(0, size(nodes) -2) AS index
WITH nodes[index] AS start, nodes[index + 1] AS end
MERGE (start)-[:NEXT]->(end)
""",
           params={"document_name":document_name})
    print(f"Finished import at: {datetime.now() - start}")


在高层次上,该代码处理文档的方法是将文档分成若干块,使用人工智能模型从每个块中提取信息,并将结果存储在图形数据库中。摘要如下:

  1. 它将文档文本分割成指定大小的语块,并允许一定程度的重叠。作者在论文中使用的是 2000 个词块的大小。
  2. 对于每个块,它都会异步将文本发送到 LLM,以提取原子事实和关键元素。
  3. 使用 md5 编码函数,每个数据块和事实都有一个唯一的标识符。
  4. 处理后的数据被导入图数据库,并在连续的数据块之间建立关系。


现在,我们可以在弧形琼文本上运行此功能。


await process_document(text, "Joan of Arc", chunk_size=500, chunk_overlap=100)


我们使用了较小的块大小,因为这是一个小文档,我们希望有几个块用于演示目的。如果你在 Neo4j 浏览器中浏览图表,应该会看到类似的可视化效果。


14


该结构的中心是文档节点(蓝色),它的分支是大块节点(粉红色)。这些块节点又与原子事实(橙色)相连,每个原子事实又与关键元素(绿色)相连。


让我们来检查一下构建的图。我们先来看看原子事实的标记数分布。


def num_tokens_from_string(string: str) -> int:
    """Returns the number of tokens in a text string."""
    encoding = tiktoken.encoding_for_model("gpt-4")
    num_tokens = len(encoding.encode(string))
    return num_tokens

atomic_facts = graph.query("MATCH (a:AtomicFact) RETURN a.text AS text")
df = pd.DataFrame.from_records(
    [{"tokens": num_tokens_from_string(el["text"])} for el in atomic_facts]
)
sns.histplot(df["tokens"])


结果


15


原子事实相对较短,最长的也只有约 50 个代币。让我们研究几个原子事实,以便更好地了解它们。


graph.query("""MATCH (a:AtomicFact) 
RETURN a.text AS text
ORDER BY size(text) ASC LIMIT 3
UNION ALL
MATCH (a:AtomicFact) 
RETURN a.text AS text
ORDER BY size(text) DESC LIMIT 3""")


结果


16


一些最简短的事实缺乏背景。例如,原始配乐和剧本并没有直接提到哪个。因此,如果我们处理多个文档,这些原子事实可能就不那么有用了。这种缺乏上下文的情况可以通过额外的提示工程来解决。


让我们再看看最常见的关键词。


data = graph.query("""
MATCH (a:KeyElement) 
RETURN a.id AS key, 
       count{(a)<-[:HAS_KEY_ELEMENT]-()} AS connections
ORDER BY connections DESC LIMIT 5""")
df = pd.DataFrame.from_records(data)
sns.barplot(df, x='key', y='connections')


结果


17


不出所料,“圣女贞德 ”是被提及最多的关键词或元素。紧随其后的是电影、英语和法国等宽泛的关键词。我猜想,如果我们解析了很多文档,广义关键词最终会有很多关联,这可能会导致一些下游问题,而这些问题在最初的实现中并没有处理。另一个小问题是提取的非确定性,因为每次运行的结果都会略有不同。


图形阅读器代理

我们已经准备好实现基于图的代理系统 GraphReader。该代理从几个预定义的步骤开始,然后是它可以自主遍历图的步骤,这意味着代理可以决定接下来的步骤以及如何遍历图。


下面是我们将要实现的代理的 LangGraph 可视化图。


18


这一过程始于合理规划阶段,之后代理会初步选择要处理的节点(关键要素)。接下来,代理会检查与所选关键要素相关联的原子事实。由于所有这些步骤都是预定义的,因此它们都用一条完整的线表示出来。


根据原子事实检查的结果,流程会继续阅读相关的文本块或探索初始关键元素的邻近区域,以寻找更多相关信息。在这里,下一步是有条件的,以 LLM 的结果为基础,因此用虚线表示。


在文本块检查阶段,LLM 会读取并评估从当前文本块中收集到的信息是否足够。根据评估结果,LLM 有几种选择。如果信息看起来不完整或不清楚,它可以决定阅读更多的文本块。或者,LLM 可以选择探索邻近的关键要素,寻找更多最初选择可能没有捕捉到的上下文或相关信息。不过,如果 LLM 确定已经收集到足够的相关信息,它就会直接进入答案推理步骤。此时,LLM 会根据收集到的信息生成最终答案。


在整个过程中,代理会根据条件检查的结果动态浏览流程,并根据具体情况决定是重复步骤还是继续前进。这样既能灵活处理不同的输入,又能保持各步骤的结构化进展。


现在,我们将使用 LangGraph 抽象来复习和实现这些步骤。你可以通过 LangChain 学院课程了解有关 LangGraph 的更多信息。


LangGraph 状态

要构建 LangGraph 实现,我们首先要定义沿流程步骤传递的状态。


class InputState(TypedDict):
    question: str
class OutputState(TypedDict):
    answer: str
    analysis: str
    previous_actions: List[str]
class OverallState(TypedDict):
    question: str
    rational_plan: str
    notebook: str
    previous_actions: Annotated[List[str], add]
    check_atomic_facts_queue: List[str]
    check_chunks_queue: List[str]
    neighbor_check_queue: List[str]
    chosen_action: str


对于更高级的用例,可以使用多个独立的状态。在我们的实现中,我们有独立的输入和输出状态(定义了 LangGraph 的输入和输出),以及独立的总体状态(在各步骤之间传递)。


默认情况下,从节点返回的状态会被覆盖。不过,你也可以定义其他操作。例如,通过 previous_actions,我们可以定义附加或添加状态,而不是覆盖状态。


代理一开始会维护一个笔记本来记录支持性事实,这些事实最终会被用来推导出最终答案。其他状态将在我们继续讨论时进行解释。


下面我们来定义 LangGraph 中的节点。


合理计划

在合理计划步骤中,代理将问题分解成更小的步骤,确定所需的关键信息,并创建一个逻辑计划。逻辑计划允许代理处理复杂的多步骤问题。


虽然代码不可用,但所有提示都在附录中,我们可以轻松复制。


19


作者没有明确说明是在系统信息还是用户信息中提供提示。在大多数情况下,我决定将提示作为系统信息。


下面的代码展示了如何将上述合理计划作为系统消息构建链。


rational_plan_system = """As an intelligent assistant, your primary objective is to answer the question by gathering
supporting facts from a given article. To facilitate this objective, the first step is to make
a rational plan based on the question. This plan should outline the step-by-step process to
resolve the question and specify the key information required to formulate a comprehensive answer.
Example:
#####
User: Who had a longer tennis career, Danny or Alice?
Assistant: In order to answer this question, we first need to find the length of Danny’s
and Alice’s tennis careers, such as the start and retirement of their careers, and then compare the
two.
#####
Please strictly follow the above format. Let’s begin."""
rational_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            rational_plan_system,
        ),
        (
            "human",
            (
                "{question}"
            ),
        ),
    ]
)
rational_chain = rational_prompt | model | StrOutputParser()


现在,我们可以用这条链来定义一个合理计划节点。LangGraph 中的节点是一个函数,它将状态作为输入,并将更新状态作为输出。


def rational_plan_node(state: InputState) -> OverallState:
    rational_plan = rational_chain.invoke({"question": state.get("question")})
    print("-" * 20)
    print(f"Step: rational_plan")
    print(f"Rational plan: {rational_plan}")
    return {
        "rational_plan": rational_plan,
        "previous_actions": ["rational_plan"],
    }


函数首先调用 LLM 链,生成合理计划。我们先进行少量打印以便调试,然后更新状态作为函数的输出。我喜欢这种简单的方法。


初始节点选择

下一步,我们根据问题和合理计划选择初始节点。提示如下:


20


提示开始时,LLM 会了解整个代理系统的一些背景情况,然后是任务指令。这样做的目的是让 LLM 挑选出前 10 个最相关的节点并为它们打分。作者只是简单地将数据库中的所有关键要素放在提示中,供 LLM 选择。不过,我认为这种方法并不适合大规模使用。因此,我们将创建并使用一个矢量索引来检索提示的输入节点列表。


neo4j_vector = Neo4jVector.from_existing_graph(
    embedding=embeddings,
    index_name="keyelements",
    node_label="KeyElement",
    text_node_properties=["id"],
    embedding_node_property="embedding",
    retrieval_query="RETURN node.id AS text, score, {} AS metadata"
)
def get_potential_nodes(question: str) -> List[str]:
    data = neo4j_vector.similarity_search(question, k=50)
    return [el.page_content for el in data]


from_existing_graph 方法会从图中提取已定义的文本节点属性,并计算缺失的嵌入。在这里,我们只需嵌入 KeyElement 节点的 id 属性。


现在我们来定义链。我们首先复制提示。


initial_node_system = """
As an intelligent assistant, your primary objective is to answer questions based on information
contained within a text. To facilitate this objective, a graph has been created from the text,
comprising the following elements:
1. Text Chunks: Chunks of the original text.
2. Atomic Facts: Smallest, indivisible truths extracted from text chunks.
3. Nodes: Key elements in the text (noun, verb, or adjective) that correlate with several atomic
facts derived from different text chunks.
Your current task is to check a list of nodes, with the objective of selecting the most relevant initial nodes from the graph to efficiently answer the question. You are given the question, the
rational plan, and a list of node key elements. These initial nodes are crucial because they are the
starting point for searching for relevant information.
Requirements:
#####
1. Once you have selected a starting node, assess its relevance to the potential answer by assigning
a score between 0 and 100. A score of 100 implies a high likelihood of relevance to the answer,
whereas a score of 0 suggests minimal relevance.
2. Present each chosen starting node in a separate line, accompanied by its relevance score. Format
each line as follows: Node: [Key Element of Node], Score: [Relevance Score].
3. Please select at least 10 starting nodes, ensuring they are non-repetitive and diverse.
4. In the user’s input, each line constitutes a node. When selecting the starting node, please make
your choice from those provided, and refrain from fabricating your own. The nodes you output
must correspond exactly to the nodes given by the user, with identical wording.
Finally, I emphasize again that you need to select the starting node from the given Nodes, and
it must be consistent with the words of the node you selected. Please strictly follow the above
format. Let’s begin.
"""
initial_node_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            initial_node_system,
        ),
        (
            "human",
            (
                """Question: {question}
Plan: {rational_plan}
Nodes: {nodes}"""
            ),
        ),
    ]
)


同样,我们将大部分指令作为系统信息。由于我们有多个输入,因此可以在人类信息中定义它们。不过,这次我们需要一个更有条理的输出。我们可以使用 use_structured_outputmethod 来定义所需的输出结构,而不是编写一个接收文本并输出 JSON 的解析函数。


class Node(BaseModel):
    key_element: str = Field(description="""Key element or name of a relevant node""")
    score: int = Field(description="""Relevance to the potential answer by assigning
a score between 0 and 100. A score of 100 implies a high likelihood of relevance to the answer,
whereas a score of 0 suggests minimal relevance.""")
class InitialNodes(BaseModel):
    initial_nodes: List[Node] = Field(description="List of relevant nodes to the question and plan")
initial_nodes_chain = initial_node_prompt | model.with_structured_output(InitialNodes)


我们希望输出一个包含关键元素和得分的节点列表。我们可以使用 Pydantic 模型轻松定义输出。此外,为每个字段添加描述也很重要,这样我们就能尽可能地引导 LLM。


这一步的最后一件事是将节点定义为函数。


def initial_node_selection(state: OverallState) -> OverallState:
    potential_nodes = get_potential_nodes(state.get("question"))
    initial_nodes = initial_nodes_chain.invoke(
        {
            "question": state.get("question"),
            "rational_plan": state.get("rational_plan"),
            "nodes": potential_nodes,
        }
    )
    # paper uses 5 initial nodes
    check_atomic_facts_queue = [
        el.key_element
        for el in sorted(
            initial_nodes.initial_nodes,
            key=lambda node: node.score,
            reverse=True,
        )
    ][:5]
    return {
        "check_atomic_facts_queue": check_atomic_facts_queue,
        "previous_actions": ["initial_node_selection"],
    }


在初始节点选择中,我们首先根据输入使用向量相似性搜索获得潜在节点列表。也可以选择使用合理计划。LLM 会被提示输出 10 个最相关的节点。不过,作者说我们应该只使用 5 个初始节点。因此,我们只需按照节点的得分排序,然后取前 5 个节点。然后,我们用选定的初始关键元素更新 check_atomic_facts_queue 。


原子事实检查

在这一步中,我们将获取初始关键元素并检查链接的原子事实。提示是:


21


所有提示都先给 LLM 提供一些上下文,然后是任务指示。LLM 收到的指令是阅读原子事实,然后决定是否阅读链接文本块,或者如果原子事实无关,则通过探索邻近文本块来搜索更多信息。提示的最后一点是输出指令。我们将再次使用结构化输出方法,以避免手动解析和结构化输出。


由于链的实现非常相似,只是提示不同,我们将避免在这篇博文中展示每一个定义。不过,我们将查看 LangGraph 节点定义,以便更好地理解流程。


def atomic_fact_check(state: OverallState) -> OverallState:
    atomic_facts = get_atomic_facts(state.get("check_atomic_facts_queue"))
    print("-" * 20)
    print(f"Step: atomic_fact_check")
    print(
        f"Reading atomic facts about: {state.get('check_atomic_facts_queue')}"
    )
    atomic_facts_results = atomic_fact_chain.invoke(
        {
            "question": state.get("question"),
            "rational_plan": state.get("rational_plan"),
            "notebook": state.get("notebook"),
            "previous_actions": state.get("previous_actions"),
            "atomic_facts": atomic_facts,
        }
    )
    notebook = atomic_facts_results.updated_notebook
    print(
        f"Rational for next action after atomic check: {atomic_facts_results.rational_next_action}"
    )
    chosen_action = parse_function(atomic_facts_results.chosen_action)
    print(f"Chosen action: {chosen_action}")
    response = {
        "notebook": notebook,
        "chosen_action": chosen_action.get("function_name"),
        "check_atomic_facts_queue": [],
        "previous_actions": [
            f"atomic_fact_check({state.get('check_atomic_facts_queue')})"
        ],
    }
    if chosen_action.get("function_name") == "stop_and_read_neighbor":
        neighbors = get_neighbors_by_key_element(
            state.get("check_atomic_facts_queue")
        )
        response["neighbor_check_queue"] = neighbors
    elif chosen_action.get("function_name") == "read_chunk":
        response["check_chunks_queue"] = chosen_action.get("arguments")[0]
    return response


原子事实检查节点首先会调用 LLM 来评估所选节点的原子事实。由于我们使用的是 use_structured_output(结构化输出),因此我们可以直接解析更新后的笔记本和所选操作输出。如果所选操作是通过检查邻接节点来获取额外信息,我们就会使用一个函数来查找这些邻接节点,并将它们追加到 check_atomic_facts_queue 中。否则,我们会将选中的数据块追加到 check_chunks_queue 中。我们通过更新笔记本、队列和选择的操作来更新整体状态。


文本块检查

正如 LangGraph 节点的名称所示,在这一步中,LLM 会读取选定的文本块,并根据所提供的信息决定下一步的最佳操作。提示如下:


22


LLM被指示阅读文本块并决定最佳方法。我的直觉是,有时相关信息位于文本块的开头或结尾,而部分信息可能会因为分块过程而丢失。因此,作者决定让 LLM 选择阅读上一个或下一个文本块。如果 LLM 认为自己已经掌握了足够的信息,它就可以跳转到最后一步。否则,它可以选择使用 search_more 函数搜索更多细节。


我们还是只看 LangGraph 节点函数。


def chunk_check(state: OverallState) -> OverallState:
    check_chunks_queue = state.get("check_chunks_queue")
    chunk_id = check_chunks_queue.pop()
    print("-" * 20)
    print(f"Step: read chunk({chunk_id})")
    chunks_text = get_chunk(chunk_id)
    read_chunk_results = chunk_read_chain.invoke(
        {
            "question": state.get("question"),
            "rational_plan": state.get("rational_plan"),
            "notebook": state.get("notebook"),
            "previous_actions": state.get("previous_actions"),
            "chunk": chunks_text,
        }
    )
    notebook = read_chunk_results.updated_notebook
    print(
        f"Rational for next action after reading chunks: {read_chunk_results.rational_next_move}"
    )
    chosen_action = parse_function(read_chunk_results.chosen_action)
    print(f"Chosen action: {chosen_action}")
    response = {
        "notebook": notebook,
        "chosen_action": chosen_action.get("function_name"),
        "previous_actions": [f"read_chunks({chunk_id})"],
    }
    if chosen_action.get("function_name") == "read_subsequent_chunk":
        subsequent_id = get_subsequent_chunk_id(chunk_id)
        check_chunks_queue.append(subsequent_id)
    elif chosen_action.get("function_name") == "read_previous_chunk":
        previous_id = get_previous_chunk_id(chunk_id)
        check_chunks_queue.append(previous_id)
    elif chosen_action.get("function_name") == "search_more":
        # Go over to next chunk
        # Else explore neighbors
        if not check_chunks_queue:
            response["chosen_action"] = "search_neighbor"
            # Get neighbors/use vector similarity
            print(f"Neighbor rational: {read_chunk_results.rational_next_move}")
            neighbors = get_potential_nodes(
                read_chunk_results.rational_next_move
            )
            response["neighbor_check_queue"] = neighbors
    response["check_chunks_queue"] = check_chunks_queue
    return response


我们首先从队列中弹出一个块 ID,然后从图中检索其文本。利用检索到的文本和来自 LangGraph 系统整体状态的附加信息,我们调用 LLM 链。如果 LLM 决定要读取之前或之后的块,我们就会将它们的 ID 附加到队列中。另一方面,如果 LLM 选择搜索更多信息,我们有两种选择。如果队列中还有其他要读取的数据块,我们就开始读取它们。否则,我们可以使用向量搜索来获取更多相关的关键元素,然后通过读取它们的原子事实来重复这一过程,以此类推。


本文对 search_more 功能略有怀疑。一方面,它指出 search_more 函数只能读取队列中的其他块。另一方面,在附录中的示例中,该函数显然是在探索邻居。


23


为了澄清,我给作者发了电子邮件,他们确认 search_more 函数首先会尝试查看队列中的其他数据块。如果没有,它就会继续探索相邻块。由于没有明确定义如何探索邻居,我们再次使用向量相似性搜索来寻找潜在节点。


邻居选择

当 LLM 决定探索邻居时,我们会使用辅助函数来寻找潜在的关键元素。但是,我们不会探索所有的节点。相反,LLM 会决定其中哪些值得探索(如果有的话)。提示如下:


24


根据所提供的潜在邻居,LLM 可以决定探索哪些邻居。如果没有值得探索的,它就可以决定终止流程,进入答案推理步骤。


代码如下:


def neighbor_select(state: OverallState) -> OverallState:
    print("-" * 20)
    print(f"Step: neighbor select")
    print(f"Possible candidates: {state.get('neighbor_check_queue')}")
    neighbor_select_results = neighbor_select_chain.invoke(
        {
            "question": state.get("question"),
            "rational_plan": state.get("rational_plan"),
            "notebook": state.get("notebook"),
            "nodes": state.get("neighbor_check_queue"),
            "previous_actions": state.get("previous_actions"),
        }
    )
    print(
        f"Rational for next action after selecting neighbor: {neighbor_select_results.rational_next_move}"
    )
    chosen_action = parse_function(neighbor_select_results.chosen_action)
    print(f"Chosen action: {chosen_action}")
    # Empty neighbor select queue
    response = {
        "chosen_action": chosen_action.get("function_name"),
        "neighbor_check_queue": [],
        "previous_actions": [
            f"neighbor_select({chosen_action.get('arguments', [''])[0] if chosen_action.get('arguments', ['']) else ''})"
        ],
    }
    if chosen_action.get("function_name") == "read_neighbor_node":
        response["check_atomic_facts_queue"] = [
            chosen_action.get("arguments")[0]
        ]
    return response


在这里,我们执行 LLM 链并解析结果。如果选择的操作是探索任何邻居,我们会将它们添加到 check_atomic_facts_queue 中。


答案推理

流程的最后一步是要求 LLM 根据笔记本中收集的信息构建最终答案。提示如下:


25


从代码中可以看出,这个节点的实现相当简单:


def answer_reasoning(state: OverallState) -> OutputState:
    print("-" * 20)
    print("Step: Answer Reasoning")
    final_answer = answer_reasoning_chain.invoke(
        {"question": state.get("question"), "notebook": state.get("notebook")}
    )
    return {
        "answer": final_answer.final_answer,
        "analysis": final_answer.analyze,
        "previous_actions": ["answer_reasoning"],
    }


我们只需将原始问题和包含所收集信息的笔记本输入到该链中,然后要求它在分析部分制定最终答案并提供解释。


LangGraph 流程定义

剩下的唯一工作就是定义 LangGraph 流程,以及它应该如何在节点之间进行遍历。我非常喜欢 LangChain 团队选择的这种简单方法。


langgraph = StateGraph(OverallState, input=InputState, output=OutputState)
langgraph.add_node(rational_plan_node)
langgraph.add_node(initial_node_selection)
langgraph.add_node(atomic_fact_check)
langgraph.add_node(chunk_check)
langgraph.add_node(answer_reasoning)
langgraph.add_node(neighbor_select)
langgraph.add_edge(START, "rational_plan_node")
langgraph.add_edge("rational_plan_node", "initial_node_selection")
langgraph.add_edge("initial_node_selection", "atomic_fact_check")
langgraph.add_conditional_edges(
    "atomic_fact_check",
    atomic_fact_condition,
)
langgraph.add_conditional_edges(
    "chunk_check",
    chunk_condition,
)
langgraph.add_conditional_edges(
    "neighbor_select",
    neighbor_condition,
)
langgraph.add_edge("answer_reasoning", END)
langgraph = langgraph.compile()


我们首先要定义状态图对象,在此可以定义 LangGraph 中传递的信息。每个节点都可以用 add_node 方法简单添加。可以使用 add_edge 方法添加正常的边,即一个步骤总是紧接着另一个步骤。另一方面,如果遍历依赖于之前的操作,我们可以使用 add_conditional_edge 并传递选择下一个节点的函数。例如,atomic_fact_condition 看起来像这样:


def atomic_fact_condition(
    state: OverallState,
) -> Literal["neighbor_select", "chunk_check"]:
    if state.get("chosen_action") == "stop_and_read_neighbor":
        return "neighbor_select"
    elif state.get("chosen_action") == "read_chunk":
        return "chunk_check"


如你所见,定义条件边非常简单。


评估

最后,我们可以用几个问题来测试我们的实现。让我们从一个简单的问题开始。


langgraph.invoke({"question":"Did Joan of Arc lose any battles?"})


结果


26


代理首先制定了一个合理的计划,以确定圣女贞德在其军事生涯中参加过的战役,并确定是否有任何战役失败。制定完计划后,代理开始对奥尔良围城战、巴黎围城战和夏里特战役等关键战役进行原子事实检查。代理不会扩展图表,而是直接确认所需的事实。它阅读的文本块提供了有关圣女贞德失败战役的更多细节,尤其是失败的巴黎围攻战和拉夏里特战役。由于这些信息回答了琼是否在任何战役中失败的问题,因此代理到此为止,不再继续探索。最后,程序给出了最终答案,根据收集到的证据,确认琼确实输掉了一些战役,尤其是在巴黎和拉夏里特的战役。


现在,让我们给它一个曲线球。


langgraph.invoke({"question":"What is the weather in Spain?"})


结果


27


经过合理规划后,代理选择了最初要探索的关键要素。但问题是,这些关键元素在数据库中都不存在,而 LLM 只是幻觉。也许一些提示工程可以解决幻觉问题,但我还没有尝试过。需要注意的一点是,这并不可怕,因为数据库中不存在这些关键要素,所以我们无法提取任何相关信息。由于代理没有获得任何相关数据,所以它搜索了更多信息。然而,没有一个邻居是相关的,因此进程停止,让用户知道信息不可用。


现在我们来试试多跳问题。


langgraph.invoke(
  {"question":"Did Joan of Arc visit any cities in early life where she won battles later?"})


结果


28


复制整个流程有点太多,所以我只复制了答案部分。这个问题的流程非常不确定,而且非常依赖于所使用的模型。有趣的是,在我的测试中,模型越新,性能越差。因此,GPT-4 是最好的(本例中也使用了),其次是 GPT-4-turbo,最后是 GPT-4o。


总结

我对 GraphReader 和类似方法感到非常兴奋,特别是因为我认为这种(图形)RAG 方法非常通用,可以应用于任何领域。此外,由于图模式是静态的,因此可以避免整个图建模部分,允许图代理使用预定义函数对其进行遍历。


文章来源:https://towardsdatascience.com/implementing-graphreader-with-neo4j-and-langgraph-e4c73826a8b7
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消