利用LlamaIndex工作流构建高效的知识图谱代理

2025年01月20日 由 alex 发表 1477 0

检索增强生成(RAG)技术已经到来,并且有其充分的理由。它是一个强大的框架,将先进的语言模型与有针对性的信息检索技术相结合,使得能够更快地访问相关数据,并生成更准确、更具上下文意识的响应。虽然RAG应用通常侧重于非结构化数据,但我非常赞成将结构化数据也纳入其中,这是一种至关重要却常被忽视的方法。我最喜欢的方式之一就是利用像Neo4j这样的图数据库。


从图中检索数据时,常用的方法是Text2Cypher,即将自然语言查询自动转换为Cypher语句来查询图数据库。这种技术依赖于一个语言模型(或基于规则的系统)来解释用户查询,推断其潜在意图,并将其转换为有效的Cypher查询,从而使RAG应用能够从知识图中检索到相关信息并生成准确答案。


8


Text2Cypher 提供了非凡的灵活性,因为它允许用户用自然语言提出问题,而无需了解底层的图模式或Cypher语法。


基准测试的最重要结果如下所示。


9


从高层视角来看,该基准测试比较了三组模型:

  • 针对Text2Cypher任务的微调模型
  • 开放基础模型
  • 封闭基础模型


该基准测试使用两个指标评估它们在生成正确Cypher查询方面的性能:Google BLEU(上图)和ExactMatch(下图)。


Google BLEU指标衡量生成查询与参考查询之间(以 n-gram 为单位)的重叠程度。分数越高通常表示与参考查询越接近,但并不一定保证查询在数据库上下文中能正确运行。


另一方面,ExactMatch是一种基于执行的指标。它表示生成查询中与正确查询文本完全匹配的百分比,这意味着它们在执行时会产生相同的结果。这使得ExactMatch成为正确性的更严格衡量标准,并且更直接地与查询在现实世界中的实际效用相关联。


尽管微调取得了一些有前景的结果,但总体准确率水平表明Text2Cypher仍然是一项不断发展中的技术。一些模型在某些情况下仍然难以生成完全正确的查询,这强调了进一步改进的必要性。


在这篇文章中,我们将尝试使用LlamaIndex Workflows为Text2Cypher实施更具主动性的策略。我们将尝试一种多步骤方法,允许重试或替代查询公式化,而不是依赖通常大多数基准测试所采用的一次性查询生成。通过纳入这些额外步骤和备用选项,我们的目标是提高总体准确率,并减少生成有缺陷Cypher查询的情况。


10


LlamaIndex 工作流

LlamaIndex 工作流是一种通过事件驱动系统将不同操作连接起来,从而组织多步骤AI过程的实用方法。它们有助于将复杂任务分解为更小、更易管理的部分,这些部分可以以结构化的方式相互通信。工作流中的每一步都处理特定事件并产生新事件,从而创建一系列操作,可以完成诸如文档处理、问题回答或内容生成等任务。系统自动处理步骤之间的协调,使得构建和维护复杂的AI应用程序更加容易。


简单的Text2Cypher流程

简单的Text2Cypher架构是一种将自然语言问题转换为Neo4j图数据库的Cypher查询的简化方法。它通过三阶段工作流进行操作:

  1. 它使用少样本学习与存储在向量数据库中的相似示例,从输入问题生成Cypher查询。
  2. 系统在图数据库上执行生成的Cypher查询。
  3. 它通过语言模型处理数据库结果,以生成直接回答原始问题的自然语言响应。


该架构保持了一个简单而有效的管道,利用向量相似性搜索,例如少样本检索和语言模型(LLM)用于Cypher查询生成和响应格式化。


下面展示了可视化的简单Text2Cypher工作流。


11


值得注意的是,大多数Neo4j模式生成方法在处理具有多个标签的节点时都会遇到困难。这一问题不仅由于增加的复杂性,还因为标签的组合爆炸,这可能会使提示信息变得过于复杂。为了缓解这一问题,我们在模式生成过程中排除了Actor和Director标签:


schema = graph_store.get_schema_str(exclude_types=["Actor", "Director"])


该管道从生成Cypher步骤开始:


@step
async def generate_cypher(self, ctx: Context, ev: StartEvent) -> ExecuteCypherEvent:
    question = ev.input
    # Cypher query generation using an LLM
    cypher_query = await generate_cypher_step(
        self.llm, question, self.few_shot_retriever
    )
    # Streaming event information to the web UI.
    ctx.write_event_to_stream(
        SseEvent(
            label="Cypher generation",
            message=f"Generated Cypher: {cypher_query}",
        )
    )
    # Return for the next step
    return ExecuteCypherEvent(question=question, cypher=cypher_query)


生成Cypher步骤接收一个自然语言问题,并利用语言模型从向量存储中检索相似示例,将其转换为Cypher查询。该步骤还会实时将生成的Cypher查询流回用户界面,提供查询生成过程的即时反馈。


带重试功能的简单Text2Cypher流程

这个带重试功能的Text2Cypher增强版本在原始架构的基础上增加了一个自我修正机制。当生成的Cypher查询执行失败时,系统不会立即失败,而是通过在CorrectCypherEvent步骤中将错误信息反馈回语言模型来尝试修复查询。这使得系统更具韧性,能够处理初始错误,类似于人类在收到错误反馈后可能会调整其方法的方式。


下面展示了带重试功能的简单Text2Cypher工作流程的可视化图。


12


让我们来看看ExecuteCypherEvent:


@step
async def execute_query(
    self, ctx: Context, ev: ExecuteCypherEvent
) -> SummarizeEvent | CorrectCypherEvent:
    # Get global var
    retries = await ctx.get("retries")
    try:
        database_output = str(graph_store.structured_query(ev.cypher))
    except Exception as e:
        database_output = str(e)
        # Retry
        if retries < self.max_retries:
            await ctx.set("retries", retries + 1)
            return CorrectCypherEvent(
                question=ev.question, cypher=ev.cypher, error=database_output
            )
    return SummarizeEvent(
        question=ev.question, cypher=ev.cypher, context=database_output
    )


execute函数首先尝试运行查询,如果成功,则将结果传递以进行汇总。然而,如果出现问题,它不会立即放弃——相反,它会检查是否还有剩余的重试次数,如果有,则将查询连同出错信息一起发送回去进行修正。这创建了一个更具容错性的系统,能够从错误中学习,就像我们在收到反馈后可能会调整方法一样。


带重试和评估功能的简单Text2Cypher流程

在带重试功能的简单Text2Cypher流程的基础上,这个增强版本增加了一个评估阶段,用于检查查询结果是否足以回答用户的问题。如果结果被认为不充分,系统会将查询连同如何改进的信息一起发送回去进行修正。如果结果可接受,流程将继续进行到最终的汇总步骤。这一额外的验证层进一步增强了管道的韧性,确保用户最终获得尽可能准确和完整的答案。


13


额外的评估步骤是这样实现的:


@step
async def evaluate_context(
    self, ctx: Context, ev: EvaluateEvent
) -> SummarizeEvent | CorrectCypherEvent:
    # Get global var
    retries = await ctx.get("retries")
    evaluation = await evaluate_database_output_step(
        self.llm, ev.question, ev.cypher, ev.context
    )
    if retries < self.max_retries and not evaluation == "Ok":
        await ctx.set("retries", retries + 1)
        return CorrectCypherEvent(
            question=ev.question, cypher=ev.cypher, error=evaluation
        )
    return SummarizeEvent(
        question=ev.question, cypher=ev.cypher, context=ev.context
    )


evaluate_check函数是一个简单的检查,用于确定查询结果是否充分回答了用户的问题。如果评估结果显示结果不充分且还有剩余的重试次数,它会返回一个CorrectCypherEvent,以便对查询进行细化。否则,它会继续进行SummarizeEvent,表示结果适合进行最终汇总。


后来我意识到,捕获通过修正无效Cypher语句而成功自我修复的情况会是一个极好的主意。这些示例随后可以作为未来Cypher生成的动态少样本提示。这种方法将使代理不仅能够自我修复,还能随着时间的推移不断自我学习和改进。仅为此流程实现了存储这些少样本示例的示例代码(因为它提供了最佳的自我修复准确性)。


@step
async def summarize_answer(self, ctx: Context, ev: SummarizeEvent) -> StopEvent:
    retries = await ctx.get("retries")
    # If retry was successful:
    if retries > 0 and check_ok(ev.evaluation):
        # print(f"Learned new example: {ev.question}, {ev.cypher}")
        # Store success retries to be used as fewshots!
        store_fewshot_example(ev.question, ev.cypher, self.llm.model)


迭代规划器流程

最后的流程是最复杂的,我将其保留在代码中,以便您从我的探索中学习。


迭代规划器流程通过实现一个迭代规划系统,引入了一种更复杂的方法。它不是直接生成一个Cypher查询,而是首先创建一个子查询计划,在执行前验证每个子查询的Cypher语句,并且包含一个信息检查机制,如果初始结果不充分,可以修改计划。该系统可以进行最多三次信息收集迭代,每次根据之前的结果调整其方法。这创建了一个更彻底的问答系统,能够通过将复杂查询分解为可管理的步骤并在每个阶段验证信息来处理它们。


下面的图示展示了迭代规划器的工作流程。


14


让我们来看看查询规划器的提示。我期待大语言模型能给出以下回应:


class SubqueriesOutput(BaseModel):
    """Defines the output format for transforming a question into parallel-optimized retrieval steps."""
    plan: List[List[str]] = Field(
        description=(
            """A list of query groups where:
        - Each group (inner list) contains queries that can be executed in parallel
        - Groups are ordered by dependency (earlier groups must be executed before later ones)
        - Each query must be a specific information retrieval request
        - Split into multiple steps only if intermediate results return ≤25 values
        - No reasoning or comparison tasks, only data fetching queries"""
        )
    )


该输出表示一个结构化的计划,用于将复杂问题转化为顺序和并行的查询步骤。每个步骤由一组可以并行执行的查询组成,后续步骤依赖于先前步骤的结果。查询仅限于信息检索,避免执行推理任务,并且如果需要管理结果大小,会将查询拆分为更小的步骤。例如,以下计划首先并行列出两位演员的电影,然后是一个步骤,从第一步的结果中识别出票房最高的电影。


plan = [
# 2 steps in parallel
    [
        "List all movies made by Tom Hanks in the 2000s.",
        "List all movies made by Tom Cruise in the 2000s.",
    ],
# Second step
    ["Find the highest profiting movie among winner of step 1"],
]


这个想法无疑很酷。它是一种将复杂问题分解为更小、可操作的步骤的聪明方法,甚至利用并行性来优化检索。这听起来是一种可以真正加速的策略。但在实践中,期望大语言模型能够可靠地执行这一点有些过于雄心勃勃。并行性虽然在理论上高效,但引入了很多复杂性。依赖关系、中间结果以及保持并行步骤之间的逻辑一致性,即使是先进模型也容易出错。虽然顺序执行不那么吸引人,但目前它更可靠,并且显著减少了模型的认知开销。


此外,大语言模型在处理像列表的列表这样的结构化工具输出时常常遇到困难,尤其是在推理步骤之间的依赖关系时。在这里,我很好奇仅通过提示(没有工具输出)能在多大程度上提高模型在这些任务上的表现。


基准测试

为在LlamaIndex工作流架构中评估text2cypher代理创建基准数据集,这感觉像是向前迈出的令人兴奋的一步。


我们寻求传统单步Cypher执行指标(如ExactMatch)的替代方案,因为这些指标往往无法捕捉到迭代规划等工作流的全部潜力。在这些工作流中,采用多个步骤来细化查询并检索相关信息,使得单步执行指标不够充分。


因此,我们选择了Ragas的answer_relevancy——它感觉更符合我们想要衡量的内容。在这里,我们使用一个大语言模型来生成答案,然后用它作为评判标准来与真实值进行比较。我们准备了一个包含约50个样本的自定义数据集,精心设计以避免生成过多的输出(即过大或过详细的数据库结果)。这样的输出可能会使大语言模型评判者难以有效地评估相关性,因此保持结果简短可以确保对单步和多步工作流进行公平且集中的比较。


以下是结果。


15


在答案相关性方面,Claude 3.5 Sonnet、Deepseek-V3和GPT-4o脱颖而出,得分均超过0.80。NaiveText2CypherRetryCheckFlow往往整体相关性最高,而IterativePlanningFlow则始终排名较低(最低降至0.163)。


尽管OpenAI的o1模型相当准确,但由于多次超时(设置为90秒),它可能并未名列前茅。Deepseek-V3尤其令人瞩目,因为其得分很高,同时延迟相对较低。总体而言,这些结果强调了在实际部署场景中,不仅原始准确性重要,稳定性和速度也同样重要。


请查看另一张表格,其中可以轻松比较不同流程之间的提升。


16


Sonnet 3.5在NaiveText2CypherFlow中的得分从0.596稳步提升至NaiveText2CypherRetryFlow中的0.616,然后在NaiveText2CypherRetryCheckFlow中实现了更大的飞跃,达到0.843。GPT-4o整体上呈现出类似的模式,从NaiveText2CypherFlow中的0.622略微下降至NaiveText2CypherRetryFlow中的0.603,但在NaiveText2CypherRetryCheckFlow中显著提升至0.837。这些改进表明,添加重试机制和最终验证步骤可以显著提高答案的相关性。


学习心得与投入生产

这是一个为期两个月的项目,在这个过程中我学到了很多。其中一个亮点是在测试基准中实现了84%的相关性,这是一个显著的成就。然而,这是否意味着在生产环境中也能达到84%的准确性呢?可能并非如此。


生产环境带来了一系列独特的挑战——真实世界的数据往往比基准数据集更嘈杂、更多样化且结构更松散。我们尚未讨论,但你在实际应用和用户实践中会看到的一点是,需要采取适合生产的步骤。这意味着不仅要关注在受控基准测试中实现高精度,还要确保系统可靠、适应性强,并在真实世界条件下提供一致的结果。


在这些环境中,你需要实施某种防护措施,以防止无关问题进入Text2Cypher管道。


17


我们有一个示例防护措施的实现。除了简单地重定向无关问题外,初始的防护措施步骤还可以通过引导用户了解他们可以提出的问题类型、展示可用工具以及演示如何有效使用这些工具来教育用户。


在以下示例中,我们强调了添加一个过程来将用户输入的值映射到数据库中的重要性。这一步骤对于确保用户提供的信息与数据库模式一致至关重要,从而能够准确执行查询并最大程度地减少由不匹配或模糊数据引起的错误。


18


这是一个用户询问“科幻”电影的例子。问题的出现是因为数据库中该类型被存储为“Sci-Fi”,导致查询没有返回任何结果。


人们常常忽视的是空值的存在。在现实世界的数据中,空值很常见,必须加以考虑,尤其是在执行排序等类似操作时。如果未能妥善处理空值,可能会导致意外结果或错误。


19


在这个例子中,我们随机获取了一部评分字段为空的电影。为了解决这个问题,查询需要增加一个条件:WHERE m.imdbRating IS NOT NULL。


还有些情况下,缺失的信息不仅仅是数据问题,而是模式(schema)的限制。例如,如果我们询问奥斯卡获奖电影,但模式中不包含任何关于奖项的信息,那么查询根本无法返回期望的结果。


20


由于大型语言模型(LLM)被训练成要取悦用户,所以即使模式不匹配,LLM仍然会生成一些符合用户提问意图但无效的内容。我还不知道如何最好地处理这样的例子。


最后,我想提到的是查询规划部分。我使用了以下查询计划来回答这个问题:


“21世纪谁拍的电影更多,汤姆·汉克斯还是汤姆·克鲁斯?并找出票房最高的那位的一部最赚钱的电影。”


查询计划是:


plan = [
# 2 steps in parallel
    [
        "List all movies made by Tom Hanks in the 2000s.",
        "List all movies made by Tom Cruise in the 2000s.",
    ],
# Second step
    ["Find the highest profiting movie among winner of step 1"],
]


它看起来很复杂,但实际上Cypher语言非常灵活,GPT-4o可以在一个查询中处理这个问题。


21


我会说在这种情况下使用并行处理是大材小用。如果你处理的是真正需要它的复杂问题类型,可以包含一个查询规划器,但要记住,许多多步问题可以通过一个Cypher语句高效地处理。


这个例子突出了另一个问题:最终答案含糊不清,因为大型语言模型(LLM)只获得了有限的信息。具体来说,获胜者是《世界大战》的汤姆·克鲁斯。在这种情况下,推理已经在数据库内完成,因此不需要LLM来处理这个逻辑。然而,LLM默认倾向于以这种方式运作,这强调了为LLM提供完整上下文以确保准确和无歧义回答的重要性。


最后,你还必须考虑如何处理返回大量结果的问题。


22


在我们的实现中,我们对结果强制执行了100条记录的硬限制。虽然这有助于管理数据量,但在某些情况下仍然可能过多,甚至在LLM(大型语言模型)的推理过程中产生误导。


此外,本文中介绍的所有代理并不都具备对话能力。你可能需要在开始时添加一个问题重写步骤,使其具备对话性,或者这可以作为防护措施步骤的一部分。如果你有一个庞大的图模式,无法全部放入提示中,那么你需要设计一个系统来动态获取相关的图模式。


总结

代理可以非常有用,但最好从简单的开始,避免一开始就陷入过于复杂的实现。重点在于建立一个坚实的基准,以便有效地评估和比较不同的架构。关于工具输出,考虑尽量减少其使用,或者坚持使用尽可能简单的工具,因为许多代理在处理工具输出时都遇到困难,往往需要手动解析。

文章来源:https://medium.com/neo4j/building-knowledge-graph-agents-with-llamaindex-workflows-b81f8d372f19
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消