使用LangGraph构建可靠的RAG代理

2024年05月07日 由 alex 发表 374 0

简介

在这里,我们将使用 LangGraph、Groq-Llama-3 和 Chroma 构建可靠的 RAG 代理。


  • 自适应 RAG。我们已经实现了本文中描述的概念,构建了一个路由器,用于将问题路由到不同的检索方法中。
  • 纠正型 RAG。我们已经实现了本文中描述的概念,开发了一种后备机制,以便在检索到的上下文与所提问题无关时继续前进。
  • 自我 RAG。我们利用本文中描述的概念开发了一种幻觉分级器,即修正出现幻觉或与所提问题无关的答案。


什么是代理?

代理背后的基本概念涉及使用语言模型来选择行动序列。在链式程序中,这个序列被硬编码在代码中。相反,代理利用语言模型作为推理引擎来决定要采取的行动及其顺序。


它由 3 个部分组成:


  • 规划:将任务分解为更小的子目标
  • 记忆:短期(聊天记录)/长期(向量存储)
  • 工具使用:它可以利用不同的工具来扩展其功能。


代理可以通过使用 Langchain 的 ReAct 概念或 LangGraph 来实现、


什么是 Langchain?

LangChain 是一个利用语言模型开发应用程序的框架。它可以实现以下应用:


  • 上下文感知:将语言模型与上下文来源(提示指令、少量示例、可作为其响应基础的内容等)连接起来。
  • 推理:依靠语言模型进行推理(根据提供的上下文如何回答、采取什么行动等)。


什么是 LangGraph?

LangGraph 是一个扩展 LangChain 的库,为 LLM 应用程序提供循环计算能力。LangChain 支持定义计算链(有向无环图或 DAG),而 LangGraph 则支持循环。这样就可以实现更复杂的代理式行为,在循环中调用 LLM,以确定下一步行动。


关键概念

  • 有状态图 LangGraph 围绕有状态图的概念展开,图中的每个节点都代表计算的一个步骤,图中的状态会随着计算的进行而传递和更新。
  • 节点: 节点是 LangGraph 的构件。每个节点代表一个函数或一个计算步骤。我们定义节点是为了执行特定任务,如处理输入、做出决策或与外部 API 交互。
  • 边: 边将图中的节点连接起来,定义计算流程。LangGraph 支持条件边,允许你根据图的当前状态动态决定下一个要执行的节点。


使用 LangGraph 创建图形的步骤:

  1. 定义图形状态(Graph State): 这表示图的状态。
  2. 创建图形。
  3. 定义节点: 在这里,我们定义了与每个工作流状态相关的不同功能。
  4. 向图表添加节点: 在此将节点添加到图形中,并使用边和条件边定义流程。
  5. 设置图的入口和终点。


什么是 Tavily 搜索 API?

Tavily Search API 是专为 LLM 优化的搜索引擎,旨在提供高效、快速和持久的搜索结果。与 Serp 或 Google 等其他搜索 API 不同,Tavily 专注于为人工智能开发人员和自主人工智能代理优化搜索。


什么是 Groq?

Groq 为开发人员提供高性能的人工智能模型和 API 访问,与竞争对手相比,推理速度更快,成本更低。


支持的模型


10


什么是 FastEmbed?

FastEmbed 是一个轻量级、快速的 Python 库,用于嵌入生成。


支持的嵌入模型


11


RAG 代理的工作流程

  1. 根据问题,路由器决定是引导问题从矢量存储检索上下文,还是执行网络搜索。
  2. 如果路由器决定引导问题从矢量存储中检索,则从矢量存储中检索匹配文档,否则使用 tavily-api 搜索进行网络搜索。
  3. 然后,文档分级器会对文档进行相关或不相关分级。
  4. 如果检索到的上下文被分级为相关,则使用幻觉分级器检查是否存在幻觉。如果分级器判定回复不存在幻觉,则将回复提交给用户。
  5. 如果上下文被分级为不相关,则执行网络搜索以检索内容。
  6. 检索后,文档分级器会对网络搜索生成的内容进行分级。如果发现相关,则会使用 LLM 对响应进行合成,然后将其呈现给用户。


使用的技术堆栈

  • 嵌入模型:BAAI/bge-base-en-v1.5
  • LLM : Llam-3-8B
  • 矢量存储:Chroma
  • 图/代理: LangGraph


代码执行

安装所需的库


! pip install -U langchain-nomic langchain_community tiktoken langchainhub chromadb langchain langgraph tavily-python gpt4all fastembed langchain-groq 


导入所需程序库


from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings.fastembed import FastEmbedEmbeddings


实例化嵌入模型


embed_model = FastEmbedEmbeddings(model_name="BAAI/bge-base-en-v1.5")"BAAI/bge-base-en-v1.5")


实例化 LLM


from groq import Groq
from langchain_groq import ChatGroq
from google.colab import userdata
llm = ChatGroq(temperature=0,
                      model_name="Llama3-8b-8192",
                      api_key=userdata.get("GROQ_API_KEY"),)


下载数据


urls = [
    "https://lilianweng.github.io/posts/2023-06-23-agent/","https://lilianweng.github.io/posts/2023-06-23-agent/",
    "https://lilianweng.github.io/posts/2023-03-15-prompt-engineering/",
    "https://lilianweng.github.io/posts/2023-10-25-adv-attack-llm/",
]
docs = [WebBaseLoader(url).load() for url in urls]
docs_list = [item for sublist in docs for item in sublist]
print(f"len of documents :{len(docs_list)}")


将文档分块,以便与 LLM 的上下文窗口同步


text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=512, chunk_overlap=0512, chunk_overlap=0
)
doc_splits = text_splitter.split_documents(docs_list)
print(f"length of document chunks generated :{len(doc_splits)}")


将文档加载到向量存储


vectorstore = Chroma.from_documents(documents=doc_splits,
                                    embedding=embed_model,
                                    collection_name="local-rag")"local-rag")


实例化寻回器


retriever = vectorstore.as_retriever(search_kwargs={"k":2})"k":2})


实现路由器


import time
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.output_parsers import StrOutputParser
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an expert at routing a 
    user question to a vectorstore or web search. Use the vectorstore for questions on LLM  agents, 
    prompt engineering, and adversarial attacks. You do not need to be stringent with the keywords 
    in the question related to these topics. Otherwise, use web-search. Give a binary choice 'web_search' 
    or 'vectorstore' based on the question. Return the a JSON with a single key 'datasource' and 
    no premable or explaination. Question to route: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question"],
)
start = time.time()
question_router = prompt | llm | JsonOutputParser()
#
question = "llm agent memory"
print(question_router.invoke({"question": question}))
end = time.time()
print(f"The time required to generate response by Router Chain in seconds:{end - start}")
#############################RESPONSE ###############################
{'datasource': 'vectorstore'}
The time required to generate response by Router Chain in seconds:0.34175705909729004


实现生成链


prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an assistant for question-answering tasks. """<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an assistant for question-answering tasks. 
    Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. 
    Use three sentences maximum and keep the answer concise <|eot_id|><|start_header_id|>user<|end_header_id|>
    Question: {question} 
    Context: {context} 
    Answer: <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "document"],
)
# Post-processing
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)
# Chain
start = time.time()
rag_chain = prompt | llm | StrOutputParser()
#############################RESPONSE##############################
The time required to generate response by the generation chain in seconds:1.0384225845336914
The agent memory in the context of LLM-powered autonomous agents refers to the ability of the agent to learn from its past experiences and adapt to new situations.


实现检索分级器


#
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing relevance 
    of a retrieved document to a user question. If the document contains keywords related to the user question, 
    grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals. \n
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score' and no premable or explaination.
     <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved document: \n\n {document} \n\n
    Here is the user question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables=["question", "document"],
)
start = time.time()
retrieval_grader = prompt | llm | JsonOutputParser()
question = "agent memory"
docs = retriever.invoke(question)
doc_txt = docs[1].page_content
print(retrieval_grader.invoke({"question": question, "document": doc_txt}))
end = time.time()
print(f"The time required to generate response by the retrieval grader in seconds:{end - start}")
############################RESPONSE ###############################
{'score': 'yes'}
The time required to generate response by the retrieval grader in seconds:0.8115921020507812


实施幻觉分级


# Prompt
prompt = PromptTemplate(
    template=""" <|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether 
    an answer is grounded in / supported by a set of facts. Give a binary 'yes' or 'no' score to indicate 
    whether the answer is grounded in / supported by a set of facts. Provide the binary score as a JSON with a 
    single key 'score' and no preamble or explanation. <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here are the facts:
    \n ------- \n
    {documents} 
    \n ------- \n
    Here is the answer: {generation}  <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "documents"],
)
start = time.time()
hallucination_grader = prompt | llm | JsonOutputParser()
hallucination_grader_response = hallucination_grader.invoke({"documents": docs, "generation": generation})
end = time.time()
print(f"The time required to generate response by the generation chain in seconds:{end - start}")
print(hallucination_grader_response)
####################################RESPONSE#################################
The time required to generate response by the generation chain in seconds:1.020448923110962
{'score': 'yes'}


实施答案评分器


# Prompt
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a grader assessing whether an 
    answer is useful to resolve a question. Give a binary score 'yes' or 'no' to indicate whether the answer is 
    useful to resolve a question. Provide the binary score as a JSON with a single key 'score' and no preamble or explanation.
     <|eot_id|><|start_header_id|>user<|end_header_id|> Here is the answer:
    \n ------- \n
    {generation} 
    \n ------- \n
    Here is the question: {question} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["generation", "question"],
)
start = time.time()
answer_grader = prompt | llm | JsonOutputParser()
answer_grader_response = answer_grader.invoke({"question": question,"generation": generation})
end = time.time()
print(f"The time required to generate response by the answer grader in seconds:{end - start}")
print(answer_grader_response)
##############################RESPONSE###############################
The time required to generate response by the answer grader in seconds:0.2455885410308838
{'score': 'yes'}


使用网络搜索工具


import os
from langchain_community.tools.tavily_search import TavilySearchResults
os.environ['TAVILY_API_KEY'] = "YOUR API KEY"
web_search_tool = TavilySearchResults(k=3)


定义图形状态:表示图形的状态。


定义以下属性:


  • 问题
  • 生成 : LLM 生成
  • web_search:是否添加搜索
  • documents:文件列表


from typing_extensions import TypedDict
from typing import List
### State
class GraphState(TypedDict):
    question : str
    generation : str
    web_search : str
    documents : List[str]


12


定义节点


from langchain.schema import Document
def retrieve(state):
    """
    Retrieve documents from vectorstore
    Args:
        state (dict): The current graph state
    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE---")
    question = state["question"]
    # Retrieval
    documents = retriever.invoke(question)
    return {"documents": documents, "question": question}
#
def generate(state):
    """
    Generate answer using RAG on retrieved documents
    Args:
        state (dict): The current graph state
    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]
    
    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}
#
def grade_documents(state):
    """
    Determines whether the retrieved documents are relevant to the question
    If any document is not relevant, we will set a flag to run web search
    Args:
        state (dict): The current graph state
    Returns:
        state (dict): Filtered out irrelevant documents and updated web_search state
    """
    print("---CHECK DOCUMENT RELEVANCE TO QUESTION---")
    question = state["question"]
    documents = state["documents"]
    
    # Score each doc
    filtered_docs = []
    web_search = "No"
    for d in documents:
        score = retrieval_grader.invoke({"question": question, "document": d.page_content})
        grade = score['score']
        # Document relevant
        if grade.lower() == "yes":
            print("---GRADE: DOCUMENT RELEVANT---")
            filtered_docs.append(d)
        # Document not relevant
        else:
            print("---GRADE: DOCUMENT NOT RELEVANT---")
            # We do not include the document in filtered_docs
            # We set a flag to indicate that we want to run web search
            web_search = "Yes"
            continue
    return {"documents": filtered_docs, "question": question, "web_search": web_search}
#
def web_search(state):
    """
    Web search based based on the question
    Args:
        state (dict): The current graph state
    Returns:
        state (dict): Appended web results to documents
    """
    print("---WEB SEARCH---")
    question = state["question"]
    documents = state["documents"]
    # Web search
    docs = web_search_tool.invoke({"query": question})
    web_results = "\n".join([d["content"] for d in docs])
    web_results = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results)
    else:
        documents = [web_results]
    return {"documents": documents, "question": question}
#


定义条件边


13


def route_question(state):
    """
    Route question to web search or RAG.
    Args:
        state (dict): The current graph state
    Returns:
        str: Next node to call
    """
    print("---ROUTE QUESTION---")
    question = state["question"]
    print(question)
    source = question_router.invoke({"question": question})  
    print(source)
    print(source['datasource'])
    if source['datasource'] == 'web_search':
        print("---ROUTE QUESTION TO WEB SEARCH---")
        return "websearch"
    elif source['datasource'] == 'vectorstore':
        print("---ROUTE QUESTION TO RAG---")
        return "vectorstore"
def decide_to_generate(state):
    """
    Determines whether to generate an answer, or add web search
    Args:
        state (dict): The current graph state
    Returns:
        str: Binary decision for next node to call
    """
    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    web_search = state["web_search"]
    filtered_documents = state["documents"]
    if web_search == "Yes":
        # All documents have been filtered check_relevance
        # We will re-generate a new query
        print("---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---")
        return "websearch"
    else:
        # We have relevant documents, so generate answer
        print("---DECISION: GENERATE---")
        return "generate"


14


def grade_generation_v_documents_and_question(state):
    """
    Determines whether the generation is grounded in the document and answers question.
    Args:
        state (dict): The current graph state
    Returns:
        str: Decision for next node to call
    """
    print("---CHECK HALLUCINATIONS---")
    question = state["question"]
    documents = state["documents"]
    generation = state["generation"]
    score = hallucination_grader.invoke({"documents": documents, "generation": generation})
    grade = score['score']
    # Check hallucination
    if grade == "yes":
        print("---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---")
        # Check question-answering
        print("---GRADE GENERATION vs QUESTION---")
        score = answer_grader.invoke({"question": question,"generation": generation})
        grade = score['score']
        if grade == "yes":
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        pprint("---DECISION: GENERATION IS NOT GROUNDED IN DOCUMENTS, RE-TRY---")
        return "not supported"


添加节点


from langgraph.graph import END, StateGraph
workflow = StateGraph(GraphState)
# Define the nodes
workflow.add_node("websearch", web_search) # web search
workflow.add_node("retrieve", retrieve) # retrieve
workflow.add_node("grade_documents", grade_documents) # grade documents
workflow.add_node("generate", generate) # generatae


设置入口点和终点


workflow.set_conditional_entry_point(
    route_question,
    {
        "websearch": "websearch","websearch": "websearch",
        "vectorstore": "retrieve",
    },
)
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "websearch": "websearch",
        "generate": "generate",
    },
)
workflow.add_edge("websearch", "generate")
workflow.add_conditional_edges(
    "generate",
    grade_generation_v_documents_and_question,
    {
        "not supported": "generate",
        "useful": END,
        "not useful": "websearch",
    },
)


编译工作流程


app = workflow.compile()compile()


测试工作流程


from pprint import pprint
inputs = {"question": "What is prompt engineering?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])
########################RESPONSE##############################
---ROUTE QUESTION---
What is prompt engineering?
{'datasource': 'vectorstore'}
vectorstore
---ROUTE QUESTION TO RAG---
---RETRIEVE---
'Finished running: retrieve:'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: GENERATE---
'Finished running: grade_documents:'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
('Prompt engineering refers to methods for communicating with large language '
 'models to steer their behavior for desired outcomes without updating the '
 'model weights. It is an empirical science that requires heavy '
 'experimentation and heuristics.')


针对不同问题测试工作流程


app = workflow.compile()compile()
# Test
from pprint import pprint
inputs = {"question": "Who are the Bears expected to draft first in the NFL draft?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])
#############################RESPONSE##############################
---ROUTE QUESTION---
Who are the Bears expected to draft first in the NFL draft?
{'datasource': 'web_search'}
web_search
---ROUTE QUESTION TO WEB SEARCH---
---WEB SEARCH---
'Finished running: websearch:'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
('According to the provided context, the Chicago Bears are expected to take '
 'USC quarterback Caleb Williams with the first overall pick in the NFL draft.')


测试不同问题的工作流程


app = workflow.compile()compile()
#
inputs = {"question": "What are the types of agent memory?"}
for output in app.stream(inputs):
    for key, value in output.items():
        pprint(f"Finished running: {key}:")
pprint(value["generation"])
###########################RESPONSE############################
---ROUTE QUESTION---
What are the types of agent memory?
{'datasource': 'vectorstore'}
vectorstore
---ROUTE QUESTION TO RAG---
---RETRIEVE---
'Finished running: retrieve:'
---CHECK DOCUMENT RELEVANCE TO QUESTION---
---GRADE: DOCUMENT RELEVANT---
---GRADE: DOCUMENT NOT RELEVANT---
---ASSESS GRADED DOCUMENTS---
---DECISION: ALL DOCUMENTS ARE NOT RELEVANT TO QUESTION, INCLUDE WEB SEARCH---
'Finished running: grade_documents:'
---WEB SEARCH---
'Finished running: websearch:'
---GENERATE---
---CHECK HALLUCINATIONS---
---DECISION: GENERATION IS GROUNDED IN DOCUMENTS---
---GRADE GENERATION vs QUESTION---
---DECISION: GENERATION ADDRESSES QUESTION---
'Finished running: generate:'
('The text mentions the following types of agent memory:\n'
 '\n'
 '1. Short-Term Memory (STM) or Working Memory: It stores information that the '
 'agent is currently aware of and needed to carry out complex cognitive '
 'tasks.\n'
 '2. Long-Term Memory (LTM): It can store information for a remarkably long '
 'time, ranging from a few days to decades, with an essentially unlimited '
 'storage capacity.')


可视化代理/图表


!apt-get install python3-dev graphviz libgraphviz-dev pkg-config
!pip install pygraphviz


from IPython.display import Image
Image(app.get_graph().draw_png())


15


结论

LangGraph 是一款灵活的工具,专为使用 LLM 构建复杂的有状态应用程序而设计。初学者可以通过掌握其基本原理和基本示例来利用它的功能来完成自己的项目。重要的是要专注于管理状态、处理条件边并确保图中没有死结。


在我看来,与 ReAct 代理相比,它更有优势,因为在这里我们可以完全控制工作流,而不是由代理做出决定。



文章来源:https://medium.com/the-ai-forum/build-a-reliable-rag-agent-using-langgraph-2694d55995cd
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
写评论取消
回复取消