介绍
使用 LLM 框架(Langchain、llamaindex 等)构建 RAG 管道的快速演示几乎不需要 5-6 行代码。尽管设计生产级 RAG 应用程序需要的不仅仅是对文档进行查询。
在本文中,我们将详细讨论如何设计端到端生产级 RAG Pipeline 以及我们需要构建哪些附加层。
使用的框架和工具的简要概述
Langchain:Langchain 是一种非常流行的框架,可以轻松构建和设计基于 LLM 的应用程序。它提供了出色的抽象和内置功能,可以从头开始开发基于 RAG 的应用程序
Azure AI 搜索:Azure AI 搜索是来自 Azure 堆栈的搜索引擎。它在传统和 GenAI 搜索应用程序中对用户拥有的内容提供大规模的安全信息检索
Langfuse:Langfuse 是开源监控和可观察性工具。这有助于在整个生命周期中构建和改进法学硕士申请。它提供跟踪、评估、提示管理和指标功能。可以轻松部署到本地系统或云服务
NeMo-Guardrails: NeMo-guardrails 是一个开源工具包,可轻松将可编程护栏添加到基于 LLM 的对话系统中。关键护栏包括输入护栏、输出护栏和对话护栏。
Ragas: Ragas 是一个框架,可帮助你评估检索增强生成 (RAG) 管道。这提供了有助于评估 RAG 的关键指标。
Azure OpenAI 搜索: 此服务来自 Azure 堆栈,提供来自 OpenAI 的模型套件作为 REST API 访问。这些型号包括 - GPT-4、GPT-4-turbo、GPT-3.5-turbo 和嵌入型号系列
代码实现
在本文中,我们将详细介绍每个管道组件,并尝试了解上述框架/工具的使用。
导入库并设置环境变量
from dotenv import load_dotenv
load_dotenv()
from langchain_community.document_loaders import TextLoader
from ragas.testset.generator import TestsetGenerator
from ragas.testset.evolutions import simple, reasoning, multi_context
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain_core.output_parsers import StrOutputParser
from langchain.vectorstores.azuresearch import AzureSearch
from langchain.text_splitter import CharacterTextSplitter
from langfuse.callback import CallbackHandler
from azure.search.documents.indexes.models import (
FreshnessScoringFunction,
FreshnessScoringParameters,
ScoringProfile,
SearchableField,
SearchField,
SearchFieldDataType,
SimpleField,
TextWeights,
)
from tqdm import tqdm
from langfuse import Langfuse
from datasets import Dataset
from nemoguardrails import LLMRails, RailsConfig
from nemoguardrails.integrations.langchain.runnable_rails import RunnableRails
import os
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision,context_recall
from ragas.metrics.critique import SUPPORTED_ASPECTS, harmfulness
import pandas as pd
import nest_asyncio
nest_asyncio.apply()
import warnings
warnings.filterwarnings("ignore")
vector_store_address = os.getenv("AZURE_SEARCH_ENDPOIND")
vector_store_password = os.getenv("AZURE_SEARCH_PASSWORD")
收集数据集
我们使用微软公司 24 财年第一季度和第二季度的收入电话记录。我们的想法是建立一个应用程序,允许对这些收入电话记录进行查询。
生成分块并添加元数据
为每个分块添加样本发布日期和文件名作为元数据。
def split_doc(filename_):split_doc(filename_):
print(f'Reading - {filename_}')
loader = TextLoader(filename_, encoding="utf-8")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=2500, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
return docs
def add_metadata(data,time):
for chunk in data:
chunk.metadata['last_update'] = time
return data
msft_q1 = split_doc('MSFT_q1_2024.txt')
msft_q2 = split_doc('MSFT_q2_2024.txt')
# Adding same data with different last_update
from datetime import datetime, timedelta
q2_time = (datetime.utcnow() - timedelta(days=90)).strftime(
"%Y-%m-%dT%H:%M:%S-00:00"
)
q1_time = (datetime.utcnow() - timedelta(days=180)).strftime(
"%Y-%m-%dT%H:%M:%S-00:00"
)
msft_q1 = add_metadata(msft_q1,q1_time)
msft_q2 = add_metadata(msft_q2,q2_time)
documents = msft_q1 + msft_q2
在 Azure AI Search 中创建索引
要在 AI Search 中创建索引,我们需要定义字段及其类型。我们使用 OpenAI 嵌入模型生成向量。首先,我们定义一些常用字段:
embeddings = AzureOpenAIEmbeddings(
azure_deployment="text-embedding-ada-002","text-embedding-ada-002",
openai_api_version="2023-05-15",
)
embedding_function=embeddings.embed_query
fields = [
SimpleField(
name="id",
type=SearchFieldDataType.String,
key=True,
filterable=True,
),
SearchableField(
name="content",
type=SearchFieldDataType.String,
searchable=True,
),
SearchField(
name="content_vector",
type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
searchable=True,
vector_search_dimensions=len(embedding_function("Text")),
vector_search_profile_name="myHnswProfile",
),
SearchableField(
name="metadata",
type=SearchFieldDataType.String,
searchable=True,
),
# Additional field for filtering on document source
SimpleField(
name="source",
type=SearchFieldDataType.String,
filterable=True,
),
# Additional data field for last doc update
SimpleField(
name="last_update",
type=SearchFieldDataType.DateTimeOffset,
searchable=True,
filterable=True,
),
]
# Adding a custom scoring profile with a freshness function
sc_name = "scoring_profile"
sc = ScoringProfile(
name=sc_name,
text_weights=TextWeights(weights={"content": 5}),
function_aggregation="sum",
functions=[
FreshnessScoringFunction(
field_name="last_update",
boost=100,
parameters=FreshnessScoringParameters(boosting_duration="P2D"),
interpolation="linear",
)
],
)
index_name = "earning_call-scoring-profile"
vector_store: AzureSearch = AzureSearch(
azure_search_endpoint=vector_store_address,
azure_search_key=vector_store_password,
index_name=index_name,
embedding_function=embeddings.embed_query,
fields=fields,
scoring_profiles=[sc],
default_scoring_profile=sc_name,
)
vector_store.add_documents(documents=documents)
azureai_retriever = vector_store.as_retriever()
与 Langfuse 集成
Langchain 提供了一个封装器,可轻松与 Langfuse 工具集成。我们可以通过以下方法初始化 langfuse 句柄。我们需要将 langfuse 公钥和私钥设置为环境变量
langfuse_handler = CallbackHandler()
构建 RAG 链
我们在检索链中使用 GPT-4 涡轮模型来构建 QA 链。要将其与 langfuse 集成,我们需要将回调处理程序与 invoke() 方法一起传递。
llm = AzureChatOpenAI(temperature=0,openai_api_version="2023-07-01-preview","2023-07-01-preview",
azure_deployment="gpt-4-1106-preview")
chain_type = 'stuff'
chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type=chain_type,
retriever=azureai_retriever,
metadata={"application_type": "question_answering"},
)
query = "How is Windows OEM revenue growth?"
chain.invoke({"query": query}, config={"callbacks": [langfuse_handler]})
到目前为止,我们已经建立了一个 RAG 管道,可以对收入调用数据进行查询。现在,我们将添加工具/框架来构建 e2e 管道
到目前为止,我们的 RAG 链还没有任何机制来标记超出主题或不恰当的查询,并控制模型的响应是否存在偏见/陈规定型。我们将使用 nemo-guardrails,使我们的解决方案能够标记/避免回答超出范围的问题。
我们需要创建 “配置 ”文件,并定义要使用的轨道类型。在本例中,我们只使用 “输入栏”。Langchain 提供了封装器,可轻松与现有的 nemo-guardrails 链集成。
config = RailsConfig.from_path("./config")
guardrails = RunnableRails(config=config,llm=llm)
chain_with_guardrails = guardrails | chain
chain_with_guardrails.invoke({"query": query}, config={"callbacks": [langfuse_handler]})
建立评估 RAG 管道的 "黄金数据集
拥有定义明确的数据集是对 RAG 管道进行稳健评估的基础。生成此类数据集的方法主要有两种:
在理想情况下,我们可以混合使用这两种方法。例如,我们可以收集人工数据集,并生成不同的问题。这样我们就可以扩充我们的数据集。在本例中,我们将使用拉加类生成评估数据集。这样,我们就可以生成 3 种不同的问题(简单、推理和多跳)。
gpt4_llm = AzureChatOpenAI(temperature=0,openai_api_version="2024-02-01","2024-02-01",
azure_deployment="gpt-4")
generator_llm = gpt4_llm
critic_llm = gpt4_llm
embeddings = embeddings
generator = TestsetGenerator.from_langchain(
generator_llm,
critic_llm,
embeddings
)
%%time
testset = generator.generate_with_langchain_docs(documents, test_size=10, distributions={simple: 1},is_async=False )
testset.to_pandas().to_excel('Ground_Truth_Dataset.xlsx',index=False)#['question'].tolist()
使用 ragas 指标在 “黄金数据集 ”上进行评估
正确定义数据集和评估器(LLM)后,我们将使用 ragas 指标对 RAG 管道性能进行评分。我们将使用的关键指标包括
在计算这些分数的同时,我们会将这些指标和结果推送给 Langfuse 以进行监控和观察
metrics = [faithfulness, answer_relevancy, context_precision, harmfulness]
langfuse = Langfuse()
langfuse.auth_check()
test_dataset = pd.read_excel('Ground_Truth_Dataset.xlsx')'Ground_Truth_Dataset.xlsx')
test_dataset = test_dataset[~test_dataset.ground_truth.isna()]
def evaluate_rag(question,ground_truth):
trace = langfuse.trace(name = "rag")
contexts = azureai_retriever.invoke(question)
trace.span(
name = "retrieval", input={'question': question}, output={'contexts': contexts}
)
answer = chain.invoke(question)['result']
trace.span(
name = "generation", input={'question': question, 'contexts': contexts}, output={'answer': answer}
)
data = {
'question': [question],
'answer': [answer],
'contexts' : [[context.page_content for context in contexts]],
'ground_truth': [row['ground_truth']]
}
dataset = Dataset.from_dict(data)
result = evaluate(
dataset,
metrics=[
context_precision,
faithfulness,
answer_relevancy,
context_recall,
harmfulness
],
llm=gpt4_llm, embeddings=embeddings
)
for m in metrics:
print(metrics)
trace.score(name=m.name, value=result[m.name])
%%time
for _,row in tqdm(test_dataset.iterrows()):
evaluate_rag(row['question'],row['ground_truth'])
在 Langfuse 中显示性能/分数
现在,当我们将指标和查询推送到 Langfuse 时,我们可以使用 langfuse UI 来了解整个系统的性能(即平均指标分数、延迟、每个组件的延迟、查询成本等)。
这是 Langfuse UI 的示例
结论
现在,我们拥有了基于 RAG 的 e2e 监控管道,它配备了适当的防护栏和评估控制。本博客使用了每个工具的基本功能,请阅读每个框架/工具的文档以了解其整体功能。