Azure AI Search和Langchain:构建高效RAG管道

2024年04月25日 由 alex 发表 479 0

介绍

使用 LLM 框架(Langchain、llamaindex 等)构建 RAG 管道的快速演示几乎不需要 5-6 行代码。尽管设计生产级 RAG 应用程序需要的不仅仅是对文档进行查询。


在本文中,我们将详细讨论如何设计端到端生产级 RAG Pipeline 以及我们需要构建哪些附加层。


使用的框架和工具的简要概述


5


Langchain:Langchain 是一种非常流行的框架,可以轻松构建和设计基于 LLM 的应用程序。它提供了出色的抽象和内置功能,可以从头开始开发基于 RAG 的应用程序


Azure AI 搜索:Azure AI 搜索是来自 Azure 堆栈的搜索引擎。它在传统和 GenAI 搜索应用程序中对用户拥有的内容提供大规模的安全信息检索


Langfuse:Langfuse 是开源监控和可观察性工具。这有助于在整个生命周期中构建和改进法学硕士申请。它提供跟踪、评估、提示管理和指标功能。可以轻松部署到本地系统或云服务


NeMo-Guardrails: NeMo-guardrails 是一个开源工具包,可轻松将可编程护栏添加到基于 LLM 的对话系统中。关键护栏包括输入护栏、输出护栏和对话护栏。


6


Ragas: Ragas 是一个框架,可帮助你评估检索增强生成 (RAG) 管道。这提供了有助于评估 RAG 的关键指标。


7


Azure OpenAI 搜索: 此服务来自 Azure 堆栈,提供来自 OpenAI 的模型套件作为 REST API 访问。这些型号包括 - GPT-4、GPT-4-turbo、GPT-3.5-turbo 和嵌入型号系列


代码实现

在本文中,我们将详细介绍每个管道组件,并尝试了解上述框架/工具的使用。


导入库并设置环境变量


from dotenv import load_dotenv
load_dotenv()
from langchain_community.document_loaders import TextLoader
from ragas.testset.generator import TestsetGenerator
from ragas.testset.evolutions import simple, reasoning, multi_context
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain_core.output_parsers import StrOutputParser
from langchain.vectorstores.azuresearch import AzureSearch
from langchain.text_splitter import CharacterTextSplitter
from langfuse.callback import CallbackHandler
from azure.search.documents.indexes.models import (
    FreshnessScoringFunction,
    FreshnessScoringParameters,
    ScoringProfile,
    SearchableField,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    TextWeights,
)
from tqdm import tqdm
from langfuse import Langfuse
from datasets import Dataset  
from nemoguardrails import LLMRails, RailsConfig
from nemoguardrails.integrations.langchain.runnable_rails import RunnableRails
import os
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision,context_recall
from ragas.metrics.critique import SUPPORTED_ASPECTS, harmfulness
import pandas as pd
import nest_asyncio
nest_asyncio.apply()
import warnings
warnings.filterwarnings("ignore") 
vector_store_address = os.getenv("AZURE_SEARCH_ENDPOIND")
vector_store_password = os.getenv("AZURE_SEARCH_PASSWORD")


收集数据集

我们使用微软公司 24 财年第一季度和第二季度的收入电话记录。我们的想法是建立一个应用程序,允许对这些收入电话记录进行查询。


生成分块并添加元数据

为每个分块添加样本发布日期和文件名作为元数据。


def split_doc(filename_):split_doc(filename_):
    print(f'Reading - {filename_}')
    loader = TextLoader(filename_, encoding="utf-8")
    documents = loader.load()
    text_splitter = CharacterTextSplitter(chunk_size=2500, chunk_overlap=0)
    docs = text_splitter.split_documents(documents)
    return docs
def add_metadata(data,time):
    for chunk in data:
        chunk.metadata['last_update'] = time
    return data
msft_q1 = split_doc('MSFT_q1_2024.txt')
msft_q2 = split_doc('MSFT_q2_2024.txt')
# Adding same data with different last_update 
from datetime import datetime, timedelta
q2_time = (datetime.utcnow() - timedelta(days=90)).strftime(
    "%Y-%m-%dT%H:%M:%S-00:00"
)
q1_time = (datetime.utcnow() - timedelta(days=180)).strftime(
    "%Y-%m-%dT%H:%M:%S-00:00"
)
msft_q1 = add_metadata(msft_q1,q1_time)
msft_q2 = add_metadata(msft_q2,q2_time)
documents = msft_q1 + msft_q2


在 Azure AI Search 中创建索引

要在 AI Search 中创建索引,我们需要定义字段及其类型。我们使用 OpenAI 嵌入模型生成向量。首先,我们定义一些常用字段:


  1. id: # 块的数量
  2. content:信息块的文本内容
  3. content_vector:生成的内容嵌入(可搜索字段)
  4. 元数据:存储数据块的元数据


embeddings = AzureOpenAIEmbeddings(
    azure_deployment="text-embedding-ada-002","text-embedding-ada-002",
    openai_api_version="2023-05-15",
)
embedding_function=embeddings.embed_query
fields = [
    SimpleField(
        name="id",
        type=SearchFieldDataType.String,
        key=True,
        filterable=True,
    ),
    SearchableField(
        name="content",
        type=SearchFieldDataType.String,
        searchable=True,
    ),
    SearchField(
        name="content_vector",
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
        searchable=True,
        vector_search_dimensions=len(embedding_function("Text")),
        vector_search_profile_name="myHnswProfile",
    ),
    SearchableField(
        name="metadata",
        type=SearchFieldDataType.String,
        searchable=True,
    ),
    # Additional field for filtering on document source
    SimpleField(
        name="source",
        type=SearchFieldDataType.String,
        filterable=True,
    ),
    # Additional data field for last doc update
    SimpleField(
        name="last_update",
        type=SearchFieldDataType.DateTimeOffset,
        searchable=True,
        filterable=True,
    ),
]
# Adding a custom scoring profile with a freshness function
sc_name = "scoring_profile"
sc = ScoringProfile(
    name=sc_name,
    text_weights=TextWeights(weights={"content": 5}),
    function_aggregation="sum",
    functions=[
        FreshnessScoringFunction(
            field_name="last_update",
            boost=100,
            parameters=FreshnessScoringParameters(boosting_duration="P2D"),
            interpolation="linear",
        )
    ],
)
index_name = "earning_call-scoring-profile"
vector_store: AzureSearch = AzureSearch(
    azure_search_endpoint=vector_store_address,
    azure_search_key=vector_store_password,
    index_name=index_name,
    embedding_function=embeddings.embed_query,
    fields=fields,
    scoring_profiles=[sc],
    default_scoring_profile=sc_name,
)
vector_store.add_documents(documents=documents)
azureai_retriever = vector_store.as_retriever()


与 Langfuse 集成

Langchain 提供了一个封装器,可轻松与 Langfuse 工具集成。我们可以通过以下方法初始化 langfuse 句柄。我们需要将 langfuse 公钥和私钥设置为环境变量


langfuse_handler = CallbackHandler()


构建 RAG 链

我们在检索链中使用 GPT-4 涡轮模型来构建 QA 链。要将其与 langfuse 集成,我们需要将回调处理程序与 invoke() 方法一起传递。


llm = AzureChatOpenAI(temperature=0,openai_api_version="2023-07-01-preview","2023-07-01-preview",
    azure_deployment="gpt-4-1106-preview")
chain_type = 'stuff'
chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type=chain_type,
    retriever=azureai_retriever,
    metadata={"application_type": "question_answering"},
)
query = "How is Windows OEM revenue growth?"
chain.invoke({"query": query}, config={"callbacks": [langfuse_handler]})


到目前为止,我们已经建立了一个 RAG 管道,可以对收入调用数据进行查询。现在,我们将添加工具/框架来构建 e2e 管道


将 Guardrails 添加到 RAG 链

到目前为止,我们的 RAG 链还没有任何机制来标记超出主题或不恰当的查询,并控制模型的响应是否存在偏见/陈规定型。我们将使用 nemo-guardrails,使我们的解决方案能够标记/避免回答超出范围的问题。


我们需要创建 “配置 ”文件,并定义要使用的轨道类型。在本例中,我们只使用 “输入栏”。Langchain 提供了封装器,可轻松与现有的 nemo-guardrails 链集成。


config = RailsConfig.from_path("./config")
guardrails = RunnableRails(config=config,llm=llm)
chain_with_guardrails = guardrails | chain
chain_with_guardrails.invoke({"query": query}, config={"callbacks": [langfuse_handler]})


建立评估 RAG 管道的 "黄金数据集

拥有定义明确的数据集是对 RAG 管道进行稳健评估的基础。生成此类数据集的方法主要有两种:


  1. 人工注释:- 更稳健,但速度较慢
  2. LLM 作为数据生成:- 速度更快,但可能产生幻觉


在理想情况下,我们可以混合使用这两种方法。例如,我们可以收集人工数据集,并生成不同的问题。这样我们就可以扩充我们的数据集。在本例中,我们将使用拉加类生成评估数据集。这样,我们就可以生成 3 种不同的问题(简单、推理和多跳)。


gpt4_llm = AzureChatOpenAI(temperature=0,openai_api_version="2024-02-01","2024-02-01",
    azure_deployment="gpt-4")
generator_llm = gpt4_llm
critic_llm = gpt4_llm
embeddings = embeddings
generator = TestsetGenerator.from_langchain(
    generator_llm,
    critic_llm,
    embeddings
)
%%time
testset = generator.generate_with_langchain_docs(documents, test_size=10, distributions={simple: 1},is_async=False )
testset.to_pandas().to_excel('Ground_Truth_Dataset.xlsx',index=False)#['question'].tolist()


使用 ragas 指标在 “黄金数据集 ”上进行评估

正确定义数据集和评估器(LLM)后,我们将使用 ragas 指标对 RAG 管道性能进行评分。我们将使用的关键指标包括


  1. 忠实度
  2. 答案相关性
  3. 上下文精确度
  4. 有害性


在计算这些分数的同时,我们会将这些指标和结果推送给 Langfuse 以进行监控和观察


metrics = [faithfulness, answer_relevancy, context_precision, harmfulness]
langfuse = Langfuse()
langfuse.auth_check()
test_dataset = pd.read_excel('Ground_Truth_Dataset.xlsx')'Ground_Truth_Dataset.xlsx')
test_dataset = test_dataset[~test_dataset.ground_truth.isna()]
def evaluate_rag(question,ground_truth):
    trace = langfuse.trace(name = "rag")
    contexts = azureai_retriever.invoke(question)
    trace.span(
        name = "retrieval", input={'question': question}, output={'contexts': contexts}
    )
    answer = chain.invoke(question)['result']
    trace.span(
        name = "generation", input={'question': question, 'contexts': contexts}, output={'answer': answer}
    )
    data = {
        'question': [question],
        'answer': [answer],
        'contexts' : [[context.page_content for context in contexts]],
        'ground_truth': [row['ground_truth']]
    }
    dataset = Dataset.from_dict(data)
    result = evaluate(
        dataset,
        metrics=[
            context_precision,
            faithfulness,
            answer_relevancy,
            context_recall,
            harmfulness
        ],
        llm=gpt4_llm, embeddings=embeddings
    )
    for m in metrics:
        print(metrics)
        trace.score(name=m.name, value=result[m.name])

%%time
for _,row in tqdm(test_dataset.iterrows()):
    evaluate_rag(row['question'],row['ground_truth'])


在 Langfuse 中显示性能/分数

现在,当我们将指标和查询推送到 Langfuse 时,我们可以使用 langfuse UI 来了解整个系统的性能(即平均指标分数、延迟、每个组件的延迟、查询成本等)。


这是 Langfuse UI 的示例


8


结论

现在,我们拥有了基于 RAG 的 e2e 监控管道,它配备了适当的防护栏和评估控制。本博客使用了每个工具的基本功能,请阅读每个框架/工具的文档以了解其整体功能。


文章来源:https://medium.com/codex/build-end-to-end-rag-pipeline-with-monitoring-and-evaluation-using-langchain-azure-ai-search-6f190fffab2a
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消