构建Agentic RAG工作流:使用Llama-Index和Qdrant实际实施

2024年08月07日 由 alex 发表 174 0

本文将介绍由 Llama 3.1 支持的尖端工作流程 "Terraform 助手"。该工作流程演示了如何按需生成 Terraform 脚本并将其保存为物理文件。该工作流的独特之处在于它能够使用 Qdrant 将这些脚本索引到向量存储中,从而为基础架构即代码(IaC)提供强大的 Q&A RAG 系统。想象一下,DevOps 工程师需要与多个客户和项目打交道;他们现在可以提出与客户、项目和基础架构相关的具体问题,而不是手动筛选 IaC 配置。相关信息会立即被检索出来,从而节省宝贵的时间和精力。此外,通过利用Qdrant的高级过滤功能,可对搜索进行精细调整,以提供高度准确的结果,从而改变DevOps工程师管理和排除基础设施故障的方式。本文通过实际操作演示了这一创新工作流程如何彻底改变DevOps实践。


6


流程

图片中描述的架构概述了由 LlamaIndex 支持的双流程系统,展示了 LLM 驱动的工作流与 Qdrant 的集成。第一个流程侧重于 Terraform 脚本的创建和管理。当触发工作流时,它会启动 "创建 Terraform 脚本 "事件,根据给定主题生成所需的 Terraform 脚本。然后,该脚本会经过验证过程,以确保其正确性并符合指定标准。验证完成后,脚本将被保存为物理文件,从而完成工作流程的第一部分。该流程旨在自动化和简化 Terraform 脚本的创建和验证过程,减少 DevOps 工程师所需的手工劳动。


第二个流程强调在 IaC(基础设施即代码)脚本上创建 RAG(检索增强生成)系统。工作流程启动后,该流程首先将生成的 Terraform 脚本导入 Qdrant,这是一个可实现高效检索和搜索功能的矢量存储系统。下一步是根据用户查询从 Qdrant 中检索脚本。检索过程之后是重新排序,对搜索结果进行细化,以确保相关性和准确性。最后一步是综合信息,为用户有关 IaC 配置的查询提供精确的答案。该流程利用Qdrant的高级功能(如有效载荷过滤)提供高度准确的结果,使其成为需要快速访问特定基础架构详细信息的DevOps工程师的宝贵工具。


Llama-Index 工作流的概念

事件驱动架构(EDA): llama-index 工作流引入了 EDA,作为基于图形的用户体验的灵活替代方案,用于协调复杂的人工智能工作流。在事件驱动系统中,组件对事件做出响应并决定后续行动,从而简化了协调代码。这种架构可以更自然地处理循环、纠错和可选输入。EDA 增强了模块化,并通过分散控制逻辑,使工作流更易于调试和管理。


实施:

首先,让我们创建一个名为 infrastructure_as_code_assistant.py 的工作流


from llama_index.core.workflow import (import (
    Workflow,
    Event,
    StartEvent,
    StopEvent,
    step
)
from llama_index.core import Settings
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
import logging
logging.basicConfig(level=logging.INFO)
Settings.embed_model = OllamaEmbedding(model_name='mxbai-embed-large:latest', base_url='http://localhost:11434')

class TerraformScriptEvent(Event):
    script: str

class ValidationErrorEvent(Event):
    error: str
    wrong_output: str
    passage: str

class InfrastructureAsCodeAssistant(Workflow):
    llm = Ollama(model='llama3.1', base_url='http://localhost:11434', temperature=0.8, request_timeout=300)
    @step()
    async def create_script(self, ev: StartEvent) -> TerraformScriptEvent:
        try:
            topic = ev.get("topic")
            prompt = f"Write optimized terraform script for {topic}. No explanation is needed just give the script only."
            logging.info(f'create_script_prompt: {prompt}')
            response = await self.llm.acomplete(prompt)
            logging.info(f'generated script: {response}')
            return TerraformScriptEvent(script=str(response))
        except Exception as e:
            print("Creation failed,...")
            logging.error(str(e))
    @step()
    async def validate_script(self, ev: TerraformScriptEvent) -> TerraformScriptEvent:
        try:
            terraform_script = ev.script
            prompt = (f"Assume you are a senior devops engineer and given the terraform script: {terraform_script}, "
                      f"your job is to validate the script before user execute it in order to reduce the error time."
                      f"don't give any comments if no deviations found the script. comment if only script has errors")
            logging.info(f'validating the script: {prompt}')
            response = await self.llm.acomplete(prompt)
            logging.info(f'after validation: {response}')
            return TerraformScriptEvent(script=str(response))
        except Exception as e:
            print("Validation failed, ...")
            logging.error(str(e))
    @step()
    async def save_script(self, ev: TerraformScriptEvent) -> StopEvent:
        try:
            terraform_script = ev.script
            with open('main.tf', mode='w') as script:
                script.write(terraform_script)
            return StopEvent(result=str(terraform_script))
        except Exception as e:
            print("Script writing failed, ...")
            logging.error(str(e))


所提供的代码使用 LlamaIndex 定义了一个事件驱动的工作流程,用于生成、验证和保存 Terraform 脚本。


1. 设置: 导入必要的类并初始化日志和嵌入设置。

2. 自定义事件: 为脚本数据定义 TerraformScriptEvent,为错误处理定义 ValidationErrorEvent。

3. 工作流类: 创建 InfrastructureAsCodeAssistant 类,利用 Ollama 的 LLM 生成脚本。

4. 步骤:

  • create_script: 根据给定主题生成 Terraform 脚本。
  • validate_script: 验证生成的脚本以确保其正确性。
  • save_script: 将验证后的脚本保存到名为 main.tf 的文件中。


该架构展示了事件驱动工作流在 IaC 自动化中的实际应用,确保脚本在执行前得到优化和验证。


现在,让我们在名为 rag_oniac.py 的文件中创建 RAG 工作流


from llama_index.core import SimpleDirectoryReader, VectorStoreIndex, Settings, StorageContext
from llama_index.core.text_splitter import SentenceSplitter
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
    Event,
    draw_most_recent_execution
)
from llama_index.core.schema import NodeWithScore
from llama_index.llms.ollama import Ollama
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.embeddings.ollama import OllamaEmbedding
import qdrant_client

class RetrieverEvent(Event):
    """Result of running retrieval"""
    nodes: list[NodeWithScore]

class RAGWorkflow(Workflow):
    def __init__(self, timeout: int = 30, verbose: bool = False):
        super().__init__(timeout, verbose)
        text_parser = SentenceSplitter(chunk_size=512, chunk_overlap=100)
        client = qdrant_client.QdrantClient(url="http://localhost:6333/", api_key="th3s3cr3tk3y")
        self.vector_store = QdrantVectorStore(client=client, collection_name="iac")
        Settings.transformations = [text_parser]
    @step(pass_context=True)
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
        dirname = ev.get("dirname")
        if not dirname:
            return None
        documents = SimpleDirectoryReader(input_files=['main.tf']).load_data()
        storage_context = StorageContext.from_defaults(vector_store=self.vector_store)
        ctx.data["index"] = VectorStoreIndex.from_documents(documents=documents,
                                                            storage_context=storage_context,
                                                            transformations=Settings.transformations,
                                                            embed_model=OllamaEmbedding(
                                                                model_name='mxbai-embed-large:latest',
                                                                base_url='http://localhost:11434'),
                                                            show_progress=True)
        return StopEvent(result=f"Indexed {len(documents)} documents.")
    @step(pass_context=True)
    async def retrieve(
            self, ctx: Context, ev: StartEvent
    ) -> RetrieverEvent | None:
        "Entry point for RAG, triggered by a StartEvent with `query`."
        query = ev.get("query")
        if not query:
            return None
        print(f"Query the database with: {query}")
        # store the query in the global context
        ctx.data["query"] = query
        # get the index from the global context
        index = ctx.data.get("index")
        if index is None:
            print("Index is empty, load some documents before querying!")
            return None
        retriever = index.as_retriever(similarity_top_k=2)
        nodes = retriever.retrieve(query)
        print(f"Retrieved {len(nodes)} nodes.")
        return RetrieverEvent(nodes=nodes)
    @step(pass_context=True)
    async def synthesize(self, ctx: Context, ev: RetrieverEvent) -> StopEvent:
        """Return a streaming response using reranked nodes."""
        llm = Ollama(model='llama3.1', base_url='http://localhost:11434')
        summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)
        query = ctx.data.get("query")
        response = await summarizer.asynthesize(query, nodes=ev.nodes)
        return StopEvent(result=response)


所提供的代码使用 LlamaIndex 和 Qdrant 定义了一个检索增强生成(RAG)工作流。工作流程包括三个主要步骤:摄取文档、检索信息和合成响应。


1. 摄取: 摄取方法是从指定目录中读取文档,使用句子分割法对文档进行处理,并使用 Ollama 嵌入模型在 Qdrant 向量存储中为文档建立索引。

2. 检索: 检索方法接收查询,在索引文档中搜索最相关的结果,并返回匹配度最高的节点。

3. 合成: 合成方法利用 Ollama LLM 和 CompactAndRefine 摘要器,使用检索到的节点生成一致的响应。


现在让我们依次调用这两个工作流,因为后面的工作流需要形成器输出。


from llama_index.core.workflow import (
    draw_most_recent_execution
)
from llama_index.core import Settings
from llama_index.embeddings.ollama import OllamaEmbedding
from infrastructure_as_code_assistant import InfrastructureAsCodeAssistant
import logging
logging.basicConfig(level=logging.INFO)
Settings.embed_model = OllamaEmbedding(model_name='mxbai-embed-large:latest', base_url='http://localhost:11434')

async def main():
    w = InfrastructureAsCodeAssistant(timeout=60, verbose=True)
    result = await w.run(topic="Azure Storage Account")
    print(str(result))
    draw_most_recent_execution(w, filename='flow.html')

if __name__ == '__main__':
    import asyncio
    asyncio.run(main=main())


1. 日志配置:

  • 配置日志记录以显示信息。


2. 设置嵌入模型:

  • 将嵌入模型设置为 OllamaEmbedding,模型名称为 mxbai-embed-large:latest,通过本地服务器访问。


3. 异步主函数:

  • 定义异步函数 main(),用于初始化和运行 InfrastructureAsCodeAssistant,超时 60 秒,并记录详细日志。
  • 执行与 "Azure 存储账户 "相关的任务并打印结果。
  • 生成最近执行情况的可视化表示并将其保存为 HTML 文件(flow.html)。


4. 脚本入口点:

  • 使用 asyncio.run() 执行 main() 函数,确保异步代码正确运行。


至此,TerraformAssistant 就完成了,它将创建脚本并充当机器人来查询任何特定的基础架构信息。


输出结果


7


8


9


结论:

总之,开发专为 DevOps 团队设计的助手解决了基础设施管理领域快节奏和不断变化的关键需求。通过利用该助手,DevOps 团队可以简化操作、提高生产率,即使在团队变动时也能确保无缝连续性。该助手具有生成脚本、解决实时查询、提供详细文档和指导故障排除的功能,是现代 DevOps 环境不可或缺的工具。该助手可与现有工具和平台集成,提供用户友好型界面,促进顺畅沟通和高效任务管理。随着企业努力提高敏捷性和弹性,采用此类创新解决方案无疑将增强 DevOps 团队提供优质服务、减少停机时间和维护稳健基础设施的能力,最终提高客户满意度,取得业务成功。

文章来源:https://medium.com/@manthapavankumar11/practical-implementation-of-agentic-rag-workflows-with-llama-index-and-qdrant-3b6622cd3124
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
写评论取消
回复取消