使用LangGraph进行多代理系统的RAG研究

2025年01月16日 由 alex 发表 1735 0

在本文中,我们介绍了一个实际项目,该项目以使用LangGraph开发的RAG Research 多代理工具为特色。此工具旨在解决需要多个来源和迭代步骤才能得出最终答案的复杂问题。它使用混合搜索和Cohere重新排序步骤来检索文档,还结合了自我纠正机制(包括幻觉检查过程)来提高响应的可靠性,使其成为企业应用程序的理想选择。


简介 - Naive与Agentic RAG

出于项目目的,简单的RAG(检索增强生成)方法存在以下不足:

  • 无法理解复杂查询:无法将复杂查询分解为多个可管理的子步骤,而是在单一层面处理查询,而不是分析每个步骤并得出统一结论。
  • 缺乏幻觉或错误处理机制:简单的RAG管道缺乏响应验证步骤和处理幻觉的机制,无法通过生成新响应来纠正错误。
  • 缺乏动态工具使用:简单的RAG系统不允许根据工作流程条件使用工具、调用外部API或与数据库交互。


因此,为了解决这些问题,实施了一个多代理RAG研究系统。事实上,基于代理的框架允许:

  • 路由和使用工具:路由代理可以对用户的查询进行分类,并将流程引导到适当的节点或工具。这实现了上下文驱动的决策,例如确定文档是否需要全面总结,是否需要更多详细信息,或者问题是否超出范围。
  • 规划子步骤:复杂查询通常需要分解为更小、可管理的步骤。从查询开始,可以生成一系列要执行的步骤,以在探索查询的不同方面时得出结论。例如,如果查询需要对文档的两个不同部分进行比较,基于代理的方法将允许识别这种比较需求,分别检索两个来源,并将它们合并成最终响应中的比较分析。
  • 反思和错误纠正:除了简单的响应生成外,基于代理的方法还允许添加一个验证步骤,以解决潜在的幻觉、错误或未能准确回答用户查询的响应。这还使得能够集成一个人机协同的自我纠正机制,将人类输入纳入自动化流程中。这种功能使基于代理的RAG系统成为企业应用中更稳健和可靠的解决方案,在这些应用中,可靠性是首要优先事项。
  • 共享全局状态:代理工作流共享一个全局状态,简化了跨多个步骤的状态管理。这种共享状态对于在多代理过程的不同阶段保持一致性至关重要。


项目概述


19


图步骤:

  1. 分析和路由查询(自适应RAG):用户的查询被分类并路由到适当的节点。从那里,系统可以继续进行下一步(“研究计划生成”),向用户请求更多信息,或者如果查询超出范围则立即响应。
  2. 研究计划生成:系统根据请求的复杂性生成一个逐步的研究计划,包含一个或多个步骤。然后,它返回一系列特定步骤,这些步骤是回答用户问题所必需的。
  3. 研究子图:对于研究计划生成中定义的每个步骤,都会调用一个子图。具体来说,子图首先通过大语言模型(LLM)生成两个查询。接下来,系统使用集成检索器(结合相似度搜索、BM25和MMR)检索与这些生成查询相关的文档。然后,重排序步骤应用基于Cohere的上下文压缩,最终为所有步骤生成前k个相关文档及其相关得分。
  4. 生成步骤:基于相关文档,工具通过大语言模型生成答案。
  5. 幻觉检查(带有人机协同的自我纠正RAG):有一个反思步骤,系统分析生成的答案,以确定其是否由提供的上下文支持并涵盖所有方面。如果检查失败,则图中断工作流程,并提示用户生成修订后的答案或结束过程。


为了创建向量存储,使用Docling和LangChain实现了基于段落的分块方法,并使用ChromaDB构建了向量数据库。


构建向量数据库


文档解析

对于包含复杂结构(包括复杂布局的表格)的PDF,选择用于解析的工具至关重要。许多库在处理具有复杂页面布局或表格结构的PDF时缺乏精确性。


为了解决这一问题,使用了Docling这一开源库。它能够实现简单高效的文档解析,并允许导出到所需格式。它可以从多种常用文档格式(包括PDF、DOCX、PPTX、XLSX、图像、HTML、AsciiDoc和Markdown)中读取并导出为Markdown和JSON。Docling提供对PDF文档的全面理解,包括表格结构、阅读顺序和页面布局。此外,它还支持扫描PDF的光学字符识别(OCR)。


20


然后,将PDF中的文本转换为Markdown格式,这是后续进行基于段落的分块处理所必需的。


from docling.document_converter import DocumentConverter
logger.info("Starting document processing.")
converter = DocumentConverter()
markdown_document = converter.convert(source).document.export_to_markdown()


提取的文本将具有与以下图像相似的结构。可以看出,PDF和表格解析提取的文本保留了原始格式。


21


基于标题,并使用MarkdownHeaderTextSplitter工具,随后将输出文本分割成多个块,最终生成了一个包含332个文档对象(LangChain Document)的列表。


from langchain_text_splitters import MarkdownHeaderTextSplitter
headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2")
]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on)
docs_list = markdown_splitter.split_text(markdown_document)
docs_list


# Output example
[Document(metadata={'Header 2': 'A letter from our Chief Sustainability Officer and our Senior Vice President of Learning and Sustainability'}, page_content="...."),
...]
# len(docs_list):
332


向量存储构建

我们构建一个向量数据库,用于将句子存储为向量嵌入,并在该数据库中进行搜索。在这种情况下,我们使用Chroma,并在本地目录“db_vector”中存储一个持久化数据库。


from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embd = OpenAIEmbeddings()
vectorstore_from_documents = Chroma.from_documents(
    documents=docs_list,
    collection_name="rag-chroma-google-v1",
    embedding=embd,
    persist_directory='db_vector'
)


主图构建

实现的系统包括两个图:

  • 研究者子图,负责生成不同的查询,这些查询将用于从向量数据库中检索并重新排序前k个文档。
  • 主图,包含主要工作流程,如分析用户查询、生成完成任务所需的步骤、生成响应,以及通过人机协同机制检查是否存在幻觉。


主图结构


22


LangGraph的一个核心概念是状态。每次图执行时都会创建一个状态,该状态在图中的节点执行时相互传递,每个节点执行后会用其返回值更新这个内部状态。


让我们从构建图状态开始这个项目。为此,我们定义两个类:

  • Router:包含将用户查询分类为“更多信息”、“环境”或“一般”三个类别之一的结果。
  • GradeHallucination:包含一个二进制分数,指示响应中是否存在幻觉。


from pydantic import BaseModel, Field
class Router(TypedDict):
    """Classify user query."""
    logic: str
    type: Literal["more-info", "environmental", "general"]
from pydantic import BaseModel, Field
class GradeHallucinations(BaseModel):
    """Binary score for hallucination present in generation answer."""
    binary_score: str = Field(
        description="Answer is grounded in the facts, '1' or '0'"
    )


定义的图状态包括:

  • InputState:包含用户和代理之间交换的消息列表。
  • AgentState:包含Router对用户查询的分类、研究计划中要执行的步骤列表、代理可以引用的检索文档列表,以及二进制分GradeHallucination。


from dataclasses import dataclass, field
from typing import Annotated, Literal, TypedDict
from langchain_core.documents import Document
from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages
from utils.utils import reduce_docs
@dataclass(kw_only=True)
class InputState:
    """Represents the input state for the agent.
    This class defines the structure of the input state, which includes
    the messages exchanged between the user and the agent. It serves as
    a restricted version of the full State, providing a narrower interface
    to the outside world compared to what is maintained iternally.
    """
    messages: Annotated[list[AnyMessage], add_messages]
    
    """Messages track the primary execution state of the agent.
    Typically accumulates a pattern of Human/AI/Human/AI messages.
    Returns:
        A new list of messages with the messages from `right` merged into `left`.
        If a message in `right` has the same ID as a message in `left`, the
        message from `right` will replace the message from `left`."""


# Primary agent state
@dataclass(kw_only=True)
class AgentState(InputState):
    """State of the retrieval graph / agent."""
    router: Router = field(default_factory=lambda: Router(type="general", logic=""))
    """The router's classification of the user's query."""
    steps: list[str] = field(default_factory=list)
    """A list of steps in the research plan."""
    documents: Annotated[list[Document], reduce_docs] = field(default_factory=list)
    """Populated by the retriever. This is a list of documents that the agent can reference."""
    hallucination: GradeHallucinations = field(default_factory=lambda: GradeHallucinations(binary_score="0"))


步骤1:分析和路由查询

函数analyze_and_route_query返回并更新状态AgentState中的router变量。函数route_query根据之前的查询分类确定下一步操作。


具体来说,此步骤使用变量包含以下值之一Router的对象来更新状态: 、或。根据此信息,工作流将被路由到适当的节点(、或之一)。type"more-info""environmental""general""create_research_plan""ask_for_more_info""respond_to_general_query"


async def analyze_and_route_query(
    state: AgentState, *, config: RunnableConfig
) -> dict[str, Router]:
    """Analyze the user's query and determine the appropriate routing.
    This function uses a language model to classify the user's query and decide how to route it
    within the conversation flow.
    Args:
        state (AgentState): The current state of the agent, including conversation history.
        config (RunnableConfig): Configuration with the model used for query analysis.
    Returns:
        dict[str, Router]: A dictionary containing the 'router' key with the classification result (classification type and logic).
    """
    model = ChatOpenAI(model=GPT_4o, temperature=TEMPERATURE, streaming=True)
    messages = [
        {"role": "system", "content": ROUTER_SYSTEM_PROMPT}
    ] + state.messages
    logging.info("---ANALYZE AND ROUTE QUERY---")
    response = cast(
        Router, await model.with_structured_output(Router).ainvoke(messages)
    )
    return {"router": response}

def route_query(
    state: AgentState,
) -> Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]:
    """Determine the next step based on the query classification.
    Args:
        state (AgentState): The current state of the agent, including the router's classification.
    Returns:
        Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]: The next step to take.
    Raises:
        ValueError: If an unknown router type is encountered.
    """
    _type = state.router["type"]
    if _type == "environmental":
        return "create_research_plan"
    elif _type == "more-info":
        return "ask_for_more_info"
    elif _type == "general":
        return "respond_to_general_query"
    else:
        raise ValueError(f"Unknown router type {_type}")


“检索2019年都柏林数据中心的PUE能效值”这一问题的输出示例:


{
  "logic":"This is a specific question about the environmental efficiency of a data center in Dublin in 2019, which relates to the Environmental Report.",
  "type":"environmental"
}


步骤1.1 超出范围/需要更多信息

然后,我们定义函数ask_for_more_info和respond_to_general_query,它们通过调用语言模型(LLM)直接为用户生成响应:如果路由器确定需要从用户那里获取更多信息,则执行第一个函数;而第二个函数则对与我们的主题无关的一般查询生成响应。在这种情况下,需要将生成的响应连接到消息列表中,更新状态中的messages变量。


async def ask_for_more_info(
    state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
    """Generate a response asking the user for more information.
    This node is called when the router determines that more information is needed from the user.
    Args:
        state (AgentState): The current state of the agent, including conversation history and router logic.
        config (RunnableConfig): Configuration with the model used to respond.
    Returns:
        dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response.
    """
    model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
    system_prompt = MORE_INFO_SYSTEM_PROMPT.format(
        logic=state.router["logic"]
    )
    messages = [{"role": "system", "content": system_prompt}] + state.messages
    response = await model.ainvoke(messages)
    return {"messages": [response]}


async def respond_to_general_query(
    state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
    """Generate a response to a general query not related to environmental.
    This node is called when the router classifies the query as a general question.
    Args:
        state (AgentState): The current state of the agent, including conversation history and router logic.
        config (RunnableConfig): Configuration with the model used to respond.
    Returns:
        dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response.
    """
    model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
    system_prompt = GENERAL_SYSTEM_PROMPT.format(
        logic=state.router["logic"]
    )
    logging.info("---RESPONSE GENERATION---")
    messages = [{"role": "system", "content": system_prompt}] + state.messages
    response = await model.ainvoke(messages)
    return {"messages": [response]}


“阿尔塔穆拉的天气怎么样?”的输出示例。


{
  "logic":"What's the weather like in Altamura?",
  "type":"general"
}


# ---RESPONSE GENERATION---
"I appreciate your question, but I'm unable to provide information about the weather. My focus is on Environmental Reports. If you have any questions related to that topic, please let me know, and I'll be happy to help!"


步骤2:制定研究计划

如果查询分类返回的值是“environmental”(环境相关的),则用户的请求在文档范围内,工作流程将到达create_research_plan节点,该节点的函数会创建一个逐步的研究计划,以回答与环境相关的查询。


async def create_research_plan(
    state: AgentState, *, config: RunnableConfig
) -> dict[str, list[str] | str]:
    """Create a step-by-step research plan for answering a environmental-related query.
    Args:
        state (AgentState): The current state of the agent, including conversation history.
        config (RunnableConfig): Configuration with the model used to generate the plan.
    Returns:
        dict[str, list[str]]: A dictionary with a 'steps' key containing the list of research steps.
    """
    class Plan(TypedDict):
        """Generate research plan."""
        steps: list[str]
    model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
    messages = [
        {"role": "system", "content": RESEARCH_PLAN_SYSTEM_PROMPT}
    ] + state.messages
    logging.info("---PLAN GENERATION---")
    response = cast(Plan, await model.with_structured_output(Plan).ainvoke(messages))
    return {"steps": response["steps"], "documents": "delete"}


“检索2019年都柏林数据中心的PUE能效值”这个问题的输出示例:


{
  "steps":
    ["Look up the PUE (Power Usage Effectiveness) efficiency value for data centers specifically in Dublin for the year 2019 using statistical data sources."
  ]
}


在这种情况下,用户的需求只需一个步骤即可检索到信息。


步骤3:进行研究

此函数从研究计划中取出第一步,并使用它来进行研究。为了进行研究,该函数调用子图researcher_graph,该子图返回一个我们将在下一节中探讨的块列表。最后,我们更新状态中的steps变量,移除刚刚执行的步骤。


async def conduct_research(state: AgentState) -> dict[str, Any]:
    """Execute the first step of the research plan.
    This function takes the first step from the research plan and uses it to conduct research.
    Args:
        state (AgentState): The current state of the agent, including the research plan steps.
    Returns:
        dict[str, list[str]]: A dictionary with 'documents' containing the research results and
                              'steps' containing the remaining research steps.
    Behavior:
        - Invokes the researcher_graph with the first step of the research plan.
        - Updates the state with the retrieved documents and removes the completed step.
    """
    result = await researcher_graph.ainvoke({"question": state.steps[0]}) #graph call directly
    docs = result["documents"]
    step = state.steps[0]
    logging.info(f"\n{len(docs)} documents retrieved in total for the step: {step}.")
    return {"documents": result["documents"], "steps": state.steps[1:]}


步骤4:构建研究者子图


23


如上图所示,该图由一个查询生成步骤组成,该步骤从主图传递的步骤开始,还有一个用于检索相关块的步骤。就像我们为主图所做的那样,接下来让我们定义QueryState(研究者图中retrieve_documents节点的私有状态)和ResearcherState(研究者图的状态)。


"""States for the researcher subgraph.
This module defines the state structures used in the researcher subgraph.
"""
from dataclasses import dataclass, field
from typing import Annotated
from langchain_core.documents import Document
from utils.utils import reduce_docs
@dataclass(kw_only=True)
class QueryState:
    """Private state for the retrieve_documents node in the researcher graph."""
    query: str
@dataclass(kw_only=True)
class ResearcherState:
    """State of the researcher graph / agent."""
    question: str
    """A step in the research plan generated by the retriever agent."""
    queries: list[str] = field(default_factory=list)
    """A list of search queries based on the question that the researcher generates."""
    documents: Annotated[list[Document], reduce_docs] = field(default_factory=list)
    """Populated by the retriever. This is a list of documents that the agent can reference."""


步骤4.1:生成查询

此步骤根据问题(研究计划中的一个步骤)生成搜索查询。该函数使用语言模型(LLM)生成多样化的搜索查询,以帮助回答问题。


async def generate_queries(
    state: ResearcherState, *, config: RunnableConfig
) -> dict[str, list[str]]:
    """Generate search queries based on the question (a step in the research plan).
    This function uses a language model to generate diverse search queries to help answer the question.
    Args:
        state (ResearcherState): The current state of the researcher, including the user's question.
        config (RunnableConfig): Configuration with the model used to generate queries.
    Returns:
        dict[str, list[str]]: A dictionary with a 'queries' key containing the list of generated search queries.
    """
    class Response(TypedDict):
        queries: list[str]
    logger.info("---GENERATE QUERIES---")
    model = ChatOpenAI(model="gpt-4o-mini-2024-07-18", temperature=0)
    messages = [
        {"role": "system", "content": GENERATE_QUERIES_SYSTEM_PROMPT},
        {"role": "human", "content": state.question},
    ]
    response = cast(Response, await model.with_structured_output(Response).ainvoke(messages))
    queries = response["queries"]
    queries.append(state.question)
    logger.info(f"Queries: {queries}")
    return {"queries": response["queries"]}


针对“检索2019年都柏林数据中心的PUE能效值”这一问题的输出示例:


{
  "queries":[
    "Look up the PUE (Power Usage Effectiveness) efficiency value for data centers specifically in Dublin for the year 2019 using statistical data sources."
    "PUE efficiency value data centers Dublin 2019",
    "Power Usage Effectiveness statistics data centers Dublin 2019"
  ]
}


一旦生成了查询,我们就可以使用之前定义的持久性数据库来定义向量存储。


def _setup_vectorstore() -> Chroma:
    """
    Set up and return the Chroma vector store instance.
    """
    embeddings = OpenAIEmbeddings()
    return Chroma(
        collection_name=VECTORSTORE_COLLECTION,
        embedding_function=embeddings,
        persist_directory=VECTORSTORE_DIRECTORY
    )


在检索增强(RAG)系统中,最关键的部分是文档检索过程。因此,人们对所使用的技术给予了极大的关注:特别是选择了混合搜索作为集成检索器,以及使用Cohere进行重新排序。


混合搜索是“关键字风格”搜索和“向量风格”搜索的结合。它既有关键字搜索的优势,也有通过嵌入和向量搜索获得的语义搜索的优势。集成检索器是一种检索算法,旨在通过结合多个单独检索器的优势来提高信息检索的性能。这种方法被称为“集成检索”,它使用一种称为倒数排名融合的方法来重新排序和合并来自不同检索器的结果,从而提供比任何单一检索器更准确、更相关的结果。


# Create base retrievers
retriever_bm25 = BM25Retriever.from_documents(documents, search_kwargs={"k": TOP_K})
retriever_vanilla = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": TOP_K})
retriever_mmr = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": TOP_K})
ensemble_retriever = EnsembleRetriever(
        retrievers=[retriever_vanilla, retriever_mmr, retriever_bm25],
        weights=ENSEMBLE_WEIGHTS,
    )


重新排序是一种可用于提高检索增强(RAG)管道性能的技术。它是一种非常强大的方法,可以显著提升搜索系统的表现。简而言之,重新排序接收一个查询和一个响应,并输出它们之间的相关性得分。这样,就可以使用任何搜索系统来找出可能包含查询答案的多个文档,然后使用重新排序端点对这些文档进行排序。


但是:为什么我们需要重新排序步骤呢?


为了解决准确性方面的挑战,采用了两阶段检索来提高搜索质量。在这些两阶段系统中,第一阶段模型(集成检索器)从更大的数据集中检索出一组候选文档。然后,使用第二阶段模型(即重新排序器)对第一阶段模型检索到的文档进行重新排序。此外,像Cohere Rerank这样的重新排序模型,在给定查询和文档对时,会输出一个相似度得分。这个得分可以用来重新排序与搜索查询最相关的文档。在重新排序方法中,Cohere Rerank模型因其能够显著提升搜索准确性而脱颖而出。该模型不同于传统的嵌入模型,它采用深度学习来直接评估每个文档与查询之间的对齐程度。Cohere Rerank通过并行处理查询和文档来输出相关性得分,从而实现更细致的文档选择过程。


在这种情况下,对检索到的文档进行重新排序,并返回最相关的前两个文档。


from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain_cohere import CohereRerank
from langchain_community.llms import Cohere
# Set up Cohere re-ranking
compressor = CohereRerank(top_n=2, model="rerank-english-v3.0")
# Build compression retriever
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=ensemble_retriever,
)
compression_retriever.invoke(
    "Retrieve the data center PUE efficiency in Dublin in 2019"
)


“检索2019年都柏林数据中心的PUE能效值”这一问题的输出示例:


[Document(metadata={'Header 2': 'Endnotes', 'relevance_score': 0.27009502}, page_content="- 1 This calculation is based on..."),'Header 2': 'Endnotes', 'relevance_score': 0.27009502}, page_content="- 1 This calculation is based on..."),
 Document(metadata={'Header 2': 'DATA CENTER GRID REGION CFE', 'relevance_score': 0.20593424}, page_content="2023  \n| Country..." )]


步骤4.2:检索文档并重新排序函数


async def retrieve_and_rerank_documents(
    state: QueryState, *, config: RunnableConfig
) -> dict[str, list[Document]]:
    """Retrieve documents based on a given query.
    This function uses a retriever to fetch relevant documents for a given query.
    Args:
        state (QueryState): The current state containing the query string.
        config (RunnableConfig): Configuration with the retriever used to fetch documents.
    Returns:
        dict[str, list[Document]]: A dictionary with a 'documents' key containing the list of retrieved documents.
    """
    logger.info("---RETRIEVING DOCUMENTS---")
    logger.info(f"Query for the retrieval process: {state.query}")
    response = compression_retriever.invoke(state.query)
    return {"documents": response}


步骤4.3 构建子图


builder = StateGraph(ResearcherState)
builder.add_node(generate_queries)
builder.add_node(retrieve_and_rerank_documents)
builder.add_edge(START, "generate_queries")"generate_queries")
builder.add_conditional_edges(
    "generate_queries",
    retrieve_in_parallel,  # type: ignore
    path_map=["retrieve_and_rerank_documents"],
)
builder.add_edge("retrieve_and_rerank_documents", END)
researcher_graph = builder.compile()


步骤5:检查是否完成

使用conditional_edge,我们构建一个循环,其结束条件由check_finished返回的值决定。此函数检查由create_research_plan节点创建的步骤列表中是否还有更多步骤需要处理。一旦所有步骤都完成,流程将继续进行到respond节点。


def check_finished(state: AgentState) -> Literal["respond", "conduct_research"]:
    """Determine if the research process is complete or if more research is needed.
    This function checks if there are any remaining steps in the research plan:
        - If there are, route back to the `conduct_research` node
        - Otherwise, route to the `respond` node
    Args:
        state (AgentState): The current state of the agent, including the remaining research steps.
    Returns:
        Literal["respond", "conduct_research"]: The next step to take based on whether research is complete.
    """
    if len(state.steps or []) > 0:
        return "conduct_research"
    else:
        return "respond"


步骤6:回应

根据所进行的研究,生成对用户查询的最终回应。此函数利用对话历史和研究代理检索到的文档,制定出一个全面的答案。


async def respond(
    state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
    """Generate a final response to the user's query based on the conducted research.
    This function formulates a comprehensive answer using the conversation history and the documents retrieved by the researcher.
    Args:
        state (AgentState): The current state of the agent, including retrieved documents and conversation history.
        config (RunnableConfig): Configuration with the model used to respond.
    Returns:
        dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response.
    """
    print("--- RESPONSE GENERATION STEP ---")
    model = ChatOpenAI(model="gpt-4o-2024-08-06", temperature=0)
    context = format_docs(state.documents)
    prompt = RESPONSE_SYSTEM_PROMPT.format(context=context)
    messages = [{"role": "system", "content": prompt}] + state.messages
    response = await model.ainvoke(messages)
    return {"messages": [response]}


步骤7:检查虚构内容

此步骤检查前一步中由语言模型(LLM)生成的回应是否由基于检索到的文档的一组事实所支持,并给出一个二进制评分。


async def check_hallucinations(
    state: AgentState, *, config: RunnableConfig
) -> dict[str, Any]:
    """Analyze the user's query and checks if the response is supported by the set of facts based on the document retrieved,
    providing a binary score result.
    This function uses a language model to analyze the user's query and gives a binary score result.
    Args:
        state (AgentState): The current state of the agent, including conversation history.
        config (RunnableConfig): Configuration with the model used for query analysis.
    Returns:
        dict[str, Router]: A dictionary containing the 'router' key with the classification result (classification type and logic).
    """
    model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
    system_prompt = CHECK_HALLUCINATIONS.format(
        documents=state.documents,
        generation=state.messages[-1]
    )
    messages = [
        {"role": "system", "content": system_prompt}
    ] + state.messages
    logging.info("---CHECK HALLUCINATIONS---")
    response = cast(GradeHallucinations, await model.with_structured_output(GradeHallucinations).ainvoke(messages))
    
    return {"hallucination": response}


步骤8:人工审批(人工介入)

如果语言模型(LLM)的回应没有得到事实集的支持,那么它很可能包含了虚构内容。在这种情况下,图会被中断,用户将控制下一步操作:仅重试最后的生成步骤,而不需要重新启动整个工作流程,或者结束进程。这个人工介入步骤在确保用户控制的同时,避免了意外的循环或不希望发生的操作。


LangGraph中的中断功能通过在特定节点暂停图、向人类展示信息并根据其输入恢复图,从而实现了人工介入工作流程。此功能对于审批、编辑或收集额外输入等任务非常有用。中断功能与命令对象结合使用,以根据人类提供的值恢复图。


def human_approval(
    state: AgentState,
):
    _binary_score = state.hallucination.binary_score
    if _binary_score == "1":if _binary_score == "1":
        return "END"
    else:
        retry_generation = interrupt(
        {
            "question": "Is this correct?",
            "llm_output": state.messages[-1]
        })
        if retry_generation == "y":
            print("voglio continuare")
            return "respond"
        else:
            return "END"


4.3 构建主图


from langgraph.graph import END, START, StateGraph
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
builder = StateGraph(AgentState, input=InputState)
builder.add_node(analyze_and_route_query)
builder.add_edge(START, "analyze_and_route_query")
builder.add_conditional_edges("analyze_and_route_query", route_query)
builder.add_node(create_research_plan)
builder.add_node(ask_for_more_info)
builder.add_node(respond_to_general_query)
builder.add_node(conduct_research)
builder.add_node("respond", respond)
builder.add_node(check_hallucinations)
builder.add_conditional_edges("check_hallucinations", human_approval, {"END": END, "respond": "respond"})
builder.add_edge("create_research_plan", "conduct_research")
builder.add_conditional_edges("conduct_research", check_finished)
builder.add_edge("respond", "check_hallucinations")
graph = builder.compile(checkpointer=checkpointer)


构建主函数(app.py)

“每个函数都定义为异步函数,以在生成步骤期间启用流式处理行为。”


from subgraph.graph_states import ResearcherState
from main_graph.graph_states import AgentState
from utils.utils import config, new_uuid
from subgraph.graph_builder import researcher_graph
from main_graph.graph_builder import InputState, graph
from langgraph.types import Command
import asyncio
import uuid
import asyncio
import time
import builtins
thread = {"configurable": {"thread_id": new_uuid()}}
async def process_query(query):
    inputState = InputState(messages=query)
    async for c, metadata in graph.astream(input=inputState, stream_mode="messages", config=thread):
        if c.additional_kwargs.get("tool_calls"):
            print(c.additional_kwargs.get("tool_calls")[0]["function"].get("arguments"), end="", flush=True)
        if c.content:
            time.sleep(0.05)
            print(c.content, end="", flush=True)
    if len(graph.get_state(thread)[-1]) > 0:
        if len(graph.get_state(thread)[-1][0].interrupts) > 0:
            response = input("\nThe response may contain uncertain information. Retry the generation? If yes, press 'y': ")
            if response.lower() == 'y':
                async for c, metadata in graph.astream(Command(resume=response), stream_mode="messages", config=thread):
                    if c.additional_kwargs.get("tool_calls"):
                        print(c.additional_kwargs.get("tool_calls")[0]["function"].get("arguments"), end="")
                    if c.content:
                        time.sleep(0.05)
                        print(c.content, end="", flush=True)

async def main():
    input = builtins.input
    print("Enter your query (type '-q' to quit):")
    while True:
        query = input("> ")
        if query.strip().lower() == "-q":
            print("Exiting...")
            break
        await process_query(query)

if __name__ == "__main__":
    asyncio.run(main())


在第一次调用后,会检查图的状态以确认是否有中断。如果发现任何中断,可以使用命令再次调用该图:


graph.astream(Command(resume=response), stream_mode="messages", config=thread)resume=response), stream_mode="messages", config=thread)


通过这种方式,工作流程将从中断的步骤继续,而不会重新执行之前的步骤,且使用相同的thread_id。


结果

对于以下测试,我们使用了一份关于谷歌环境可持续性战略的年度报告。


实时测试

作为第一个测试,我们执行了以下查询,以从不同的表中提取不同的值,结合了多步骤方法的能力,并利用了Docling库的解析功能。


复杂问题:“检索新加坡第二设施在2019年和2022年的数据中心PUE效率值。同时检索2023年亚太地区的区域平均CFE。”


24

25


完整结果是正确的,且虚构内容检查已成功通过。


聊天机器人生成的步骤:

  • “查找新加坡第二设施2019年和2022年的PUE效率值。”,
  • “查找2023年亚太地区的区域平均CFE。”


生成的文本:“- 新加坡第二设施2019年的电源使用效率(PUE)不可用,因为未提供该年的数据。然而,2022年的PUE为1.21。


2023年亚太地区的区域平均无碳能源(CFE)比例为12%。”


完整输出:


Enter your query (type '-q' to quit):
> Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023 
2025-01-10 20:39:53,381 - INFO - ---ANALYZE AND ROUTE QUERY---
2025-01-10 20:39:53,381 - INFO - MESSAGES: [HumanMessage(content='Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023 ', additional_kwargs={}, response_metadata={}, id='351a00e9-ecda-49e2-b069-19196348a82a')]
{"logic":"Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023","type":"environmental"}2025-01-10 20:39:55,586 - INFO - ---PLAN GENERATION---
{"steps":["Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.","Find the regional average CFE for the Asia Pacific region in 2023."]}2025-01-10 20:39:57,323 - INFO - ---GENERATE QUERIES---
{"queries":["PUE efficiency values Singapore 2nd facility 2019","PUE efficiency values Singapore 2nd facility 2022"]}2025-01-10 20:39:58,285 - INFO - Queries: ['PUE efficiency values Singapore 2nd facility 2019', 'PUE efficiency values Singapore 2nd facility 2022', 'Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.']
2025-01-10 20:39:58,288 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:39:58,288 - INFO - Query for the retrieval process: PUE efficiency values Singapore 2nd facility 2019
2025-01-10 20:39:59,568 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:39:59,568 - INFO - Query for the retrieval process: PUE efficiency values Singapore 2nd facility 2022
2025-01-10 20:40:00,891 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:00,891 - INFO - Query for the retrieval process: Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022.
2025-01-10 20:40:01,820 - INFO - 
4 documents retrieved in total for the step: Look up the PUE efficiency values for the Singapore 2nd facility for the years 2019 and 2022..
2025-01-10 20:40:01,825 - INFO - ---GENERATE QUERIES---
{"queries":["Asia Pacific regional average CFE 2023","CFE statistics Asia Pacific 2023"]}2025-01-10 20:40:02,778 - INFO - Queries: ['Asia Pacific regional average CFE 2023', 'CFE statistics Asia Pacific 2023', 'Find the regional average CFE for the Asia Pacific region in 2023.']
2025-01-10 20:40:02,780 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:02,780 - INFO - Query for the retrieval process: Asia Pacific regional average CFE 2023
2025-01-10 20:40:03,757 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:03,757 - INFO - Query for the retrieval process: CFE statistics Asia Pacific 2023
2025-01-10 20:40:04,885 - INFO - ---RETRIEVING DOCUMENTS---
2025-01-10 20:40:04,885 - INFO - Query for the retrieval process: Find the regional average CFE for the Asia Pacific region in 2023.
2025-01-10 20:40:06,526 - INFO - 
4 documents retrieved in total for the step: Find the regional average CFE for the Asia Pacific region in 2023..
2025-01-10 20:40:06,530 - INFO - --- RESPONSE GENERATION STEP ---
- The Power Usage Effectiveness (PUE) for the Singapore 2nd facility in 2019 is not available, as the data for that year is not provided. However, the PUE for 2022 is 1.21 [e048d08a-4ef6-77b5-20d3-352dcec590b7].
- The regional average Carbon-Free Energy (CFE) in the Asia Pacific for 2023 is 12% [9c489d2f-f16f-572b-abed-ee1d5d0ed379].2025-01-10 20:40:14,918 - INFO - ---CHECK HALLUCINATIONS---
{"binary_score":"1"}> 


现在让我们在ChatGPT上试试。将PDF文件上传到网页应用后,我们进行了相同的查询。


如图中所示,ChatGPT返回的值是不正确的,并且模型出现了虚构内容。在这种情况下,如果有一个虚构内容检查步骤,就可以让回应重新生成(自我反思的检索增强生成)。


25


结论

Agentic RAG 标志着人工智能领域的重大突破。通过将大型语言模型的功能与自主推理和信息检索相结合,Agentic RAG 引入了智能和灵活性的新标准。随着人工智能的不断发展,Agentic RAG 将在各个行业中发挥基础性作用,改变我们使用技术的方式。




文章来源:https://medium.com/@nicoladisabato_19197/building-rag-research-multi-agent-with-langgraph-1bd47acac69f
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消