使用Llama-Agents、Qdrant 和 Kafka进行高级实时RAG

2024年07月26日 由 alex 发表 166 0

在当今数据驱动的世界中,实时处理和检索信息的能力至关重要。本文将利用 Llama-Agents、Qdrant 和 Kafka 深入探讨高级实时检索增强生成(RAG)。通过集成这些功能强大的工具,我们创建了一个代理系统,可以有效地管理数据摄取、处理和检索。了解该架构如何动态处理用户查询、平衡工作负载并确保无缝访问有价值的数据,从而改变企业管理信息工作流的方式。


3


架构

该流程从一个大型 JSON 对象开始,该对象是初始数据源。该 JSON 对象被导入以灵活性和可扩展性著称的 NoSQL 数据库 MongoDB。MongoDB 可以高效地存储和检索大型 JSON 对象,为进一步处理做好准备。


然后,从 MongoDB 输入的数据通过 Confluent 的 Kafka 源连接器传输。这些连接器可将数据流导入 Kafka,确保数据流顺畅高效。


数据一旦进入Kafka,就会发布到Kafka消息队列。Kafka 在此架构中充当中心枢纽,管理和路由系统各组件之间的数据流。Qdrant 的 Kafka 连接器为集成提供了便利,可确保无缝的数据索引和检索。


来自用户的数据被移交给信息检索代理。该代理负责执行语义搜索操作。它使用汇总的数据,并与 Qdrant 的实时数据(为语义搜索而优化的矢量数据库)进行交互。


下一个组件是汇总代理,负责处理来自信息检索代理的数据。该代理利用大型语言模型 Mistral-7b 对输入的信息进行总结。


数据导入:

下面的代码使用指定参数创建了一个 Qdrant 集合。然后从 “data.json ”文件中加载数据。使用 MongoClient 与 MongoDB 建立连接后,脚本会选择 “startups ”数据库和其中的 “docs ”集合。


对于加载的 JSON 数据中的每个对象,脚本都会提取名称、图片、alt 文本、描述、链接和城市等属性。它使用指定的嵌入模型为描述创建文本嵌入,并用集合名称、唯一 ID、嵌入向量和包含提取属性的有效载荷构建文档。然后将该文档插入 MongoDB 集合。


from pymongo import MongoClient
from utils.app_utils import create_qdrant_collection
from fastembed import TextEmbedding
import json
collection_name: str = 'startups'
embed_model_name: str = 'snowflake/snowflake-arctic-embed-s'
# Ste 0: create qdrant_collection
create_qdrant_collection(collection_name=collection_name, embed_model=embed_model_name)
with open(file='data.json', mode='r') as file:
    data = json.load(file)
# Step 1: Connect to MongoDB
client = MongoClient('mongodb://127.0.0.1:27017/?replicaSet=rs0&directConnection=true')
# Step 2: Select Database
db = client['startups']
# Step 3: Select Collection
collection = db['docs']
for index, obj in enumerate(data):
    # Extract properties
    name: str = obj.get('name')
    images: str = obj.get('images')
    alt: str = obj.get('alt')
    description: str = obj.get('description')
    link: str = obj.get('link')
    city: str = obj.get('city')
    # Step 4: Create a Document to Insert
    embedding_model = TextEmbedding(model_name=embed_model_name)
    vector = next(embedding_model.embed(documents=description)).tolist()
    document = {
        "collection_name": collection_name,
        "id": index+1,
        "vector": vector,
        "payload": {
            "name": name,
            "city": city,
            "description": description,
            "images": images,
            "url": link
        }
    }
    # Step 5: Insert the Document into the Collection
    result = collection.insert_one(document)
    # Step 6: Print the Inserted Document's ID
    print("Inserted document ID:", result.inserted_id)


一旦数据被摄取到Mongodb,我们的CDC就会启动,最终在Qdrant中看到数据。


实时数据上的 RAG:

下面的代码定义了一个类--HybridQdrantOperations,用于管理Qdrant服务器上的混合搜索操作。该类使用密集和稀疏嵌入模型进行初始化,连接本地 Qdrant 服务器,并加载 JSON 有效载荷。混合搜索(hybrid_search)方法以执行时间为衡量标准,使用 Qdrant 客户端执行搜索,返回给定查询文本的顶级相似结果的元数据。


import json
from qdrant_client import QdrantClient, models
from utils.decorator_utils import execution_time_decorator

class HybridQdrantOperations:
    def __init__(self):
        self.payload_path = "../data.json"
        self.collection_name = "startups"
        self.DENSE_MODEL_NAME = "snowflake/snowflake-arctic-embed-s"
        self.SPARSE_MODEL_NAME = "prithivida/Splade_PP_en_v1"
        # collect to our Qdrant Server
        self.client = QdrantClient(host="localhost", port=6333, api_key="th3s3cr3tk3y")
        self.client.set_model(self.DENSE_MODEL_NAME)
        # comment this line to use dense vectors only
        self.client.set_sparse_model(self.SPARSE_MODEL_NAME)
        self.metadata = []
        self.documents = []
    @execution_time_decorator
    def hybrid_search(self, text: str, top_k: int = 5):
        # self.client.query will have filters also if you want to do query on filter data.
        search_result = self.client.query(
            collection_name=self.collection_name,
            query_text=text,
            limit=top_k,  # 5 the closest results
        )
        # `search_result` contains found vector ids with similarity scores
        # along with the stored payload
        # Select and return metadata
        metadata = [hit.metadata for hit in search_result]
        return metadata


下面的代码使用 llama-agents 和 kafka 建立了一个基于代理的系统,用于管理和检索有关初创公司的信息。首先,它初始化了一个用于混合搜索的 HybridQdrantOperations 实例,并配置了日志和 Kafka 连接细节。使用特定参数创建 Ollama 模型实例,并使用 OllamaEmbedding 设置嵌入模型。


定义了一个函数 get_startup_info,用于通过 Qdrant 执行混合搜索,并以 JSON 格式返回结果。该函数使用 FunctionTool 封装成一个工具,并使用 ReActAgent 类创建了一个配备该工具的代理 startup_tool_agent。另一个代理(summarization_agent)也被实例化,但没有任何工具。


系统采用 KafkaMessageQueue 进行消息处理,并连接到指定的 Kafka URL。控制平面服务器(ControlPlaneServer)通过消息队列和 AgentOrchestrator 进行初始化,将控制平面的服务器端口设置为 8001。创建了两个 AgentService 实例:startup_service 用于检索启动信息,summarization_service 用于汇总信息,每个实例都与各自的代理和 Kafka 消息队列相关联,并分配了唯一的服务端口(8002 和 8003)。这种设置有助于通过代理交互实现启动信息的无缝检索和汇总。


from llama_index.core import Settings
from llama_agents import (
    AgentService,
    AgentOrchestrator,
    ControlPlaneServer
)
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_agents.message_queues.apache_kafka import KafkaMessageQueue
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.ollama import Ollama
from retrievers.qdrant_ops import HybridQdrantOperations
import json
qdrant_ops = HybridQdrantOperations()
is_logging_enabled = True
# better managed from env
KAFKA_CONNECTION_URL = "localhost:9092"
llm = Ollama(base_url='http://localhost:11434', model='mistral:latest', temperature=0.8, request_timeout=300,
             system_prompt="You are an agent who consider the context passed "
                           "in, to answer any questions dont consider your prior "
                           "knowledge to answer and if you dont find the answer "
                           "please respond that you dont know.")
Settings.embed_model = OllamaEmbedding(base_url='http://localhost:11434', model_name='snowflake-arctic-embed:33m')

# create an agent
def get_startup_info(query: str) -> str:
    """Returns the information about startups."""
    if is_logging_enabled:
        print(f"Query from agent: {query}")
    resp = qdrant_ops.hybrid_search(text=query, top_k=5)
    if is_logging_enabled:
        print(f"Response from search: {resp}")
    return json.dumps(resp)

startup_info_tool = FunctionTool.from_defaults(fn=get_startup_info)
startup_tool_agent = ReActAgent.from_tools(tools=[startup_info_tool], llm=llm)
summarization_agent = ReActAgent.from_tools(tools=[], llm=llm)
message_queue = KafkaMessageQueue(url=KAFKA_CONNECTION_URL)
control_plane = ControlPlaneServer(
    message_queue=message_queue,
    orchestrator=AgentOrchestrator(llm=llm),
    port=8001,
)
startup_service = AgentService(
    agent=startup_tool_agent,
    message_queue=message_queue,
    description="Useful for getting the information about startups.",
    service_name="info_extract_agent",
    port=8002,
)
summarization_service = AgentService(
    agent=summarization_agent,
    message_queue=message_queue,
    description="Useful for consolidating or summarizing the information.",
    service_name="info_summarization_agent",
    port=8003,
)


下面的代码使用之前定义的 startup_service、summarization_service、control_plane 和 message_queue 设置了一个 LocalLauncher 来管理和执行代理服务。然后进入一个无限循环,提示用户输入查询。如果用户输入 “bye ”或 “exit”,循环就会中断,程序结束。对于其他任何输入,启动器都会使用 launch_single 方法执行查询并打印结果。通过这种设置,可以对基于代理的系统进行交互式查询,以进行信息检索和汇总。


from llama_agents import LocalLauncher
from agent_with_rag_tool import startup_service, summarization_service, control_plane, message_queue
# launch it
launcher = LocalLauncher(
    [startup_service, summarization_service],
    control_plane,
    message_queue,
)
while True:
    input_query = input("Query (type 'bye' or 'exit' to quit the program ):")
    if input_query.lower() == 'bye' or input_query.lower() == 'exit':
        break
    result = launcher.launch_single(initial_task=input_query)
    print(f"Result: {result}")


一切运行正常后,输出结果如下。


4


后台发生了什么?

系统采用 llama-agents 框架通过 Kafka 平台管理查询,确保高效的主题创建、平衡和删除。当用户提交查询时,框架会动态创建必要的 Kafka 主题,并为这些主题注册消费者。然后启动代理来处理查询。控制平面负责协调工作流程,发布消息以启动任务并处理任务的完成。在处理完查询并提供响应后,框架会停止消费者并删除不再需要的主题,从而确保高效的资源管理和系统性能。


5


未来(高度可扩展概念)


6


上述架构是一个代理系统(Agentic System),各团队可与专用代理互动,从 Salesforce、Jira 和 AWS CloudWatch 等多个数据源检索相关信息。这些数据源分别提供客户数据、项目管理信息和应用程序日志。以下是架构的详细分解:


数据源:

  • Salesforce: 管理客户和销售相关数据,包括线索和机会。
  • Jira:处理项目管理任务和问题。
  • AWS CloudWatch: 收集和监控应用程序日志。


源连接器: 使用 Confluent 的 Kafka 源连接器将这些源的数据摄取到系统中,从而实现向 Kafka 的无缝数据流。


Kafka 消息队列: Kafka 充当中心枢纽,管理数据流并将其路由到系统内的各种代理和组件。


代理:

  • 信息检索代理: 从摄取的数据流中检索特定数据。
  • 汇总代理: 处理和汇总检索到的数据,以方便解释。


Qdrant 使用 Kafka 的 Qdrant 汇连接器来存储数据并编制索引,从而实现高效的语义搜索功能。


控制平面:

  • 协调器: 管理工作流程和数据处理顺序。
  • 服务元数据: 存储与系统内服务和操作相关的元数据。


总结

在对强大的数据处理和检索系统的探索中,我们深入研究了与 Kafka 和 Qdrant 集成的 llama-agents 框架如何高效地管理用户查询和数据操作。通过利用 Salesforce、Jira 和 AWS CloudWatch 等组件,该系统可确保无缝摄取、处理和检索有价值的数据。该代理系统可动态处理 Kafka 主题,使不同团队能够与专门的代理互动,根据他们的需求提供精确的信息。高效的资源管理凸显了系统平衡工作负载和保持性能的能力。最终,这一架构体现了管理复杂数据工作流的先进方法,使其成为希望增强数据处理生成式人工智能能力的组织的宝贵蓝图。

文章来源:https://medium.com/@manthapavankumar11/advanced-realtime-rag-with-llama-agents-qdrant-and-kafka-3cd9163faa62
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消