【指南】利用Haystack和Hypster实现模块化RAG

2024年10月23日 由 alex 发表 266 0

7


模块化 RAG

本文提供了一种结构化方法,将 RAG 系统分解为一个统一的框架,该框架可包含各种解决方案和方法。他们提出了六个主要组成部分:

  • 索引: 组织数据以实现高效搜索。
  • 预检索: 在搜索前处理用户的查询。
  • 检索: 查找最相关的信息。
  • 检索后: 完善检索到的信息。
  • 生成: 使用 LLM 生成响应。
  • 协调: 控制系统的整体流程。


本文的关键见解是,可以使用这些组件以类似乐高的方式描述各种现有的 RAG 解决方案。这种模块化提供了一个框架,可以更灵活、更清晰地理解、设计和指导构建 RAG 系统的过程。


在本文中,以现有的 RAG 解决方案为例,展示了如何使用相同的构件来表达这些解决方案。例如:


8


9


实施模块化 RAG

那么,我们如何才能真正实施这个 “模块化 RAG ”框架呢?


因为它更像是一个元框架--这在实践中意味着什么?这是否意味着我们需要实现所有可能的组件组合?还是说我们只需构建单个组件,然后让开发人员自己想办法把它们组合在一起?


我认为,在大多数情况下,没有必要试图涵盖所有可能的 RAG 配置,而应根据每个项目的要求和限制缩小相关配置的范围。


在本文中,我将向你展示一个具体示例,说明如何使用少量选项集构建可配置系统。希望这能为你提供正确的视角和工具,帮助你创建自己版本的模块化 RAG,其中包含针对特定用例的相关配置集。


让我们继续探索我们将要使用的两个主要工具:


Haystack - 主要组件库

haystack 是一个开源框架,用于构建生产就绪的 LLM 应用程序、检索增强生成管道和先进的搜索系统,可在大型文档集合上智能运行。


优点:

  • 出色的组件设计
  • 管道非常灵活,可进行动态配置
  • 文档极其完善
  • 该框架包含许多现有的实施方案,并与生成式人工智能提供商集成。


缺点:

  • 管道界面可能有点冗长
  • 在管道外使用组件不太符合人体工程学。


Hypster - 管理配置空间

hypster是一个适用于人工智能和机器学习项目的轻量级 pythonic 配置系统。它提供极简、直观的 pythonic 语法,支持分层和可交换配置。


Hypster 允许你定义一系列可能的配置,并在它们之间轻松切换,以进行实验和优化。这样就可以轻松添加和定制自己的配置空间,用不同的设置将其实例化,最终为生产环境选择最佳配置。


LLM

让我们从 LLM 配置空间的定义开始:


from hypster import config, HP


@config
def llm_config(hp: HP):
  anthropic_models = {"haiku": "claude-3-haiku-20240307", 
                      "sonnet": "claude-3-5-sonnet-20240620"}
  openai_models = {"gpt-4o-mini": "gpt-4o-mini", 
                   "gpt-4o": "gpt-4o", 
                   "gpt-4o-latest": "gpt-4o-2024-08-06"}
  
  model_options = {**anthropic_models, **openai_models}
  model = hp.select(model_options, default="gpt-4o-mini")
  temperature = hp.number_input(0.0)
  
  if model in openai_models.values():
    from haystack.components.generators import OpenAIGenerator
    
    llm = OpenAIGenerator(model=model, 
                          generation_kwargs={"temperature": temperature})
  else: #anthropic
    from haystack_integrations.components.generators.anthropic import AnthropicGenerator
    
    llm = AnthropicGenerator(model=model, 
                             generation_kwargs={"temperature": temperature})


本代码片段演示了 Hypster 和 Haystack 的一个基本示例。我们使用@config装饰器定义了一个名为llm_config的函数,该函数封装了 LLM 的配置空间。该空间包括用于选择不同 LLM 提供商(Anthropic 或 OpenAI)及其相应模型的选项,以及用于控制温度的参数。


在llm_config函数中,我们使用条件逻辑来根据所选模型实例化适当的 Haystack 组件。这样,我们就可以通过选择在不同的 LLM 之间无缝切换,而无需修改代码结构。


例如,要创建一个使用 “俳句 ”模型和 0.5 温度的人类学发生器,我们可以按如下方式实例化配置:


result = llm_config(final_vars=["llm"], 
                    selections={"model" : "haiku"}, 
                    overrides={"temperature" : 0.5})


索引管道

让我们继续创建索引管道,定义如何处理输入文件。在我们的例子中就是 PDF 文件。


@config
def indexing_config(hp: HP):
    from haystack import Pipeline
    from haystack.components.converters import PyPDFToDocument
    pipeline = Pipeline()
    pipeline.add_component("loader", PyPDFToDocument())


接下来,我们将添加一个可选功能--根据文档的前 1000 个字符,用 LLM 摘要来丰富文档。


这是一个很好的技巧,我们使用文档的前n 个字符,然后将文档分割成块,每个块 “继承 ”这些丰富的信息,用于嵌入和生成响应。


  enrich_doc_w_llm = hp.select([True, False], default=True)
  if enrich_doc_w_llm:
    from textwrap import dedent
    from haystack.components.builders import PromptBuilder
    from src.haystack_utils import AddLLMMetadata
    
    template = dedent("""
        Summarize the document's main topic in one sentence (15 words max). 
        Then list 3-5 keywords or acronyms that best \
        represent its content for search purposes.
        Context:
        {{ documents[0].content[:1000] }}
        
        ============================
        
        Output format:
        Summary:
        Keywords:
    """)
    
    llm = hp.propagate("configs/llm.py")
    pipeline.add_component("prompt_builder", PromptBuilder(template=template))
    pipeline.add_component("llm", llm["llm"])
    pipeline.add_component("document_enricher", AddLLMMetadata())
    
    pipeline.connect("loader", "prompt_builder")
    pipeline.connect("prompt_builder", "llm")
    pipeline.connect("llm", "document_enricher")
    pipeline.connect("loader", "document_enricher")
    splitter_source = "document_enricher"
  else:
    splitter_source = "loader"
  
  split_by = hp.select(["sentence", "word", "passage", "page"], 
                       default="sentence")
  splitter = DocumentSplitter(split_by=split_by, 
                              split_length=hp.int_input(10), 
                              split_overlap=hp.int_input(2))
  pipeline.add_component("splitter", splitter)
  pipeline.connect(splitter_source, "splitter")


在这里,我们可以看到 Haystack 的管道正在运行。如果用户选择enrich_doc_w_llm==True,我们就会继续添加组件和连接,以实现这种丰富化。在我们的例子中 PromptBuilder → LLM → AddLLMMetadata。


正如你所看到的,它非常灵活,我们可以使用条件逻辑即时构建它。这一点非常强大。

现在,我们可以通过几种方式实例化配置对象。例如:


results = indexing_config(selections={"enrich_doc_w_llm": False, 
                                      "split_by" : "page"}, 
                          overrides={"split_length" : 1})


下面是一个简单的管道,包含一个装载机和一个分割器,以及所选的分割器配置


10


否则,我们可以选择用 LLM 摘要来充实文件:


results = indexing_config(selections={"enrich_doc_w_llm": True})


请注意,Hypster 采用了在每个参数中定义的默认值,因此无需每次都指定所有参数选项。下面是最终管道的示意图:


11


注意我们是如何使用hp.propagte(“configs/llm_config.py”) 将llm_config插入索引管道的。通过这种传播能力,我们可以分层创建嵌套配置。我们可以使用点符号在嵌套的llm_config中选择和覆盖参数。例如:


results = indexing_config(selections={"llm.model" : "gpt-4o-latest"})


这将导致使用 OpenAI gpt-4o-2024-08 模型实例化带有 LLM 富集任务的索引管道。


检索

Haystack 自带一个内存文档存储空间,用于快速实验。它包括一个嵌入式检索器和一个 BM25 检索器。在本节中,我们将建立一个配置空间,以便使用 BM25、嵌入式检索器或两者。


@config
def in_memory_retrieval(hp: HP):
  from haystack import Pipeline
  from haystack.document_stores.in_memory import InMemoryDocumentStore
  from src.haystack_utils import PassThroughDocuments, PassThroughText
  pipeline = Pipeline()
  # utility components for the first and last parts of the pipline  
  pipeline.add_component("query", PassThroughText())
  pipeline.add_component("retrieved_documents", PassThroughDocuments())
  
  retrieval_types = hp.multi_select(["bm25", "embeddings"], 
                                    default=["bm25", "embeddings"])
  if len(retrieval_types) == 0:
      raise ValueError("At least one retrieval type must be selected.")
  
  document_store = InMemoryDocumentStore()
  
  if "embedding" in retrieval_types:
    from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
    embedding_similarity_function = hp.select(["cosine", "dot_product"], default="cosine")
    document_store.embedding_similarity_function = embedding_similarity_function
    pipeline.add_component("embedding_retriever", InMemoryEmbeddingRetriever(document_store=document_store))
  if "bm25" in retrieval_types:
    from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
    bm25_algorithm = hp.select(["BM25Okapi", "BM25L", "BM25Plus"], default="BM25L")
    document_store.bm25_algorithm = bm25_algorithm
    pipeline.add_component("bm25_retriever", InMemoryBM25Retriever(document_store=document_store))
    pipeline.connect("query", "bm25_retriever")
  if len(retrieval_types) == 2:  # both bm25 and embeddings
    from haystack.components.joiners.document_joiner import DocumentJoiner
    bm25_weight = hp.number_input(0.5)
    join_mode = hp.select(["distribution_based_rank_fusion", 
                          "concatenate", "merge", 
                          "reciprocal_rank_fusion"],
                          default="distribution_based_rank_fusion")
    joiner = DocumentJoiner(join_mode=join_mode, top_k=hp.int_input(10),
                            weights=[bm25_weight, 1-bm25_weight])
    pipeline.add_component("document_joiner", joiner)
    pipeline.connect("bm25_retriever", "document_joiner")
    pipeline.connect("embedding_retriever", "document_joiner")
    pipeline.connect("document_joiner", "retrieved_documents")
  elif "embeddings" in retrieval_types: #only embeddings retriever
    pipeline.connect("embedding_retriever", "retrieved_documents")
  else:  # only bm25
    pipeline.connect("bm25_retriever", "retrieved_documents")


在这里,我们使用了几个 “小技巧 ”来实现它。首先,我们使用hp.multi_select,它允许我们从选项中选择多个选项。其次,我们在管道的起点和终点添加了 “辅助 ”组件(PassThroughText、PassThroughDocuments),以确保任何选择都以查询开始,以检索到的文档结束,剩下的就相对简单了。


几个实例如下:


in_memory_retrieval(selections={"retrieval_types": ["bm25"], 
                                "bm25_algorithm": "BM25Okapi"})


12



in_memory_retrieval(selections={"join_mode": "reciprocal_rank_fusion"})


13


在完整的实现中,我添加了一个 Qdrant 向量存储、一个可选的重排步骤和一个最终生成管道。这些都是示例,旨在展示在这些管道中添加和定制不同组件的可能性,你也可以在完整版本库中找到它们。


最后,我们还有将所有这些设置绑定在一起的主配置:


@config
def rag_config(hp: HP):
  indexing = hp.propagate("configs/indexing.py")
  indexing_pipeline = indexing["pipeline"]
  
  embedder_type = hp.select(["fastembed", "jina"], default="fastembed")
  match embedder_type:
    case "fastembed":
      embedder = hp.propagate("configs/fast_embed.py")
    case "jina":
      embedder = hp.propagate("configs/jina_embed.py")
  
  indexing_pipeline.add_component("doc_embedder", embedder["doc_embedder"])
  document_store_type = hp.select(["in_memory", "qdrant"], 
                                  default="in_memory")
  match document_store_type:
    case "in_memory":
      retrieval = hp.propagate("configs/in_memory_retrieval.py")
    case "qdrant":
      retrieval = hp.propagate("configs/qdrant_retrieval.py", 
                  overrides={"embedding_dim": embedder["embedding_dim"]})
  
  from haystack.components.writers import DocumentWriter
  from haystack.document_stores.types import DuplicatePolicy
  
  document_writer = DocumentWriter(retrieval["document_store"], 
                                   policy=DuplicatePolicy.OVERWRITE)
  indexing_pipeline.add_component("document_writer", document_writer)
  indexing_pipeline.connect("splitter", "doc_embedder")
  indexing_pipeline.connect("doc_embedder", "document_writer")
  
  # Retrieval + Generation Pipeline
  pipeline = retrieval["pipeline"]
  pipeline.add_component("text_embedder", embedder["text_embedder"])
  pipeline.connect("query", "text_embedder")
  pipeline.connect("text_embedder", "embedding_retriever.query_embedding")
  
  from src.haystack_utils import PassThroughDocuments
  pipeline.add_component("docs_for_generation", PassThroughDocuments())
  
  use_reranker = hp.select([True, False], default=True)
  if use_reranker:
      reranker = hp.propagate("configs/reranker.py")
      pipeline.add_component("reranker", reranker["reranker"])
      pipeline.connect("retrieved_documents", "reranker")
      pipeline.connect("reranker", "docs_for_generation")
      pipeline.connect("query", "reranker")
  else:
      pipeline.connect("retrieved_documents", "docs_for_generation")
  
  response = hp.propagate("configs/response.py")
  from haystack.components.builders import PromptBuilder
  pipeline.add_component("prompt_builder", PromptBuilder(template=response["template"]))
  pipeline.add_component("llm", response["llm"])
  pipeline.connect("prompt_builder", "llm")
  pipeline.connect("query.text", "prompt_builder.query")
  pipeline.connect("docs_for_generation", "prompt_builder")


在这里,我们可以在任何子组件中定义任何我们想要的东西。例如:


results = rag_config(selections={"indexing.enrich_doc_w_llm": True,
                                 "indexing.llm.model": "gpt-4o-mini",
                                 "document_store": "qdrant",
                                 "embedder_type": "fastembed",
                                 "reranker.model": "tiny-bert-v2",
                                 "response.llm.model": "sonnet"},
    overrides={"indexing.splitter.split_length": 6, 
               "reranker.top_k": 3})


我们已经实例化了一套具体的工作管道:


14


现在我们可以按顺序执行它们:


indexing_pipeline = results["indexing_pipeline"]
indexing_pipeline.warm_up()
file_paths = ["data/raw/modular_rag.pdf", "data/raw/enhancing_rag.pdf"]
for file_path in file_paths:  # this can be parallelized
    indexing_pipeline.run({"loader": {"sources": [file_path]}})
query = "What are the 6 main modules of the modular RAG framework?"
pipeline = results["pipeline"]
pipeline.warm_up()
response = pipeline.run({"query": {"text": query}})
print("Response: ", response["llm"]["replies"][0])


Response: The six main modules of the modular RAG framework are 
Indexing, Pre-retrieval, Retrieval, Post-retrieval, Generation, 
and Orchestration.
Supporting quote from Document 1: "Based on the current stage of RAG 
development, we have established six main modules: Indexing, 
Pre-retrieval, Retrieval, Post-retrieval, Generation, and Orchestration."




文章来源:https://towardsdatascience.com/implementing-modular-rag-with-haystack-and-hypster-d2f0ecc88b8f
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消