在当今数据驱动的世界中,实时处理和检索信息的能力至关重要。本文将利用 Llama-Agents、Qdrant 和 Kafka 深入探讨高级实时检索增强生成(RAG)。通过集成这些功能强大的工具,我们创建了一个代理系统,可以有效地管理数据摄取、处理和检索。了解该架构如何动态处理用户查询、平衡工作负载并确保无缝访问有价值的数据,从而改变企业管理信息工作流的方式。
架构
该流程从一个大型 JSON 对象开始,该对象是初始数据源。该 JSON 对象被导入以灵活性和可扩展性著称的 NoSQL 数据库 MongoDB。MongoDB 可以高效地存储和检索大型 JSON 对象,为进一步处理做好准备。
然后,从 MongoDB 输入的数据通过 Confluent 的 Kafka 源连接器传输。这些连接器可将数据流导入 Kafka,确保数据流顺畅高效。
数据一旦进入Kafka,就会发布到Kafka消息队列。Kafka 在此架构中充当中心枢纽,管理和路由系统各组件之间的数据流。Qdrant 的 Kafka 连接器为集成提供了便利,可确保无缝的数据索引和检索。
来自用户的数据被移交给信息检索代理。该代理负责执行语义搜索操作。它使用汇总的数据,并与 Qdrant 的实时数据(为语义搜索而优化的矢量数据库)进行交互。
下一个组件是汇总代理,负责处理来自信息检索代理的数据。该代理利用大型语言模型 Mistral-7b 对输入的信息进行总结。
数据导入:
下面的代码使用指定参数创建了一个 Qdrant 集合。然后从 “data.json ”文件中加载数据。使用 MongoClient 与 MongoDB 建立连接后,脚本会选择 “startups ”数据库和其中的 “docs ”集合。
对于加载的 JSON 数据中的每个对象,脚本都会提取名称、图片、alt 文本、描述、链接和城市等属性。它使用指定的嵌入模型为描述创建文本嵌入,并用集合名称、唯一 ID、嵌入向量和包含提取属性的有效载荷构建文档。然后将该文档插入 MongoDB 集合。
from pymongo import MongoClient
from utils.app_utils import create_qdrant_collection
from fastembed import TextEmbedding
import json
collection_name: str = 'startups'
embed_model_name: str = 'snowflake/snowflake-arctic-embed-s'
# Ste 0: create qdrant_collection
create_qdrant_collection(collection_name=collection_name, embed_model=embed_model_name)
with open(file='data.json', mode='r') as file:
data = json.load(file)
# Step 1: Connect to MongoDB
client = MongoClient('mongodb://127.0.0.1:27017/?replicaSet=rs0&directConnection=true')
# Step 2: Select Database
db = client['startups']
# Step 3: Select Collection
collection = db['docs']
for index, obj in enumerate(data):
# Extract properties
name: str = obj.get('name')
images: str = obj.get('images')
alt: str = obj.get('alt')
description: str = obj.get('description')
link: str = obj.get('link')
city: str = obj.get('city')
# Step 4: Create a Document to Insert
embedding_model = TextEmbedding(model_name=embed_model_name)
vector = next(embedding_model.embed(documents=description)).tolist()
document = {
"collection_name": collection_name,
"id": index+1,
"vector": vector,
"payload": {
"name": name,
"city": city,
"description": description,
"images": images,
"url": link
}
}
# Step 5: Insert the Document into the Collection
result = collection.insert_one(document)
# Step 6: Print the Inserted Document's ID
print("Inserted document ID:", result.inserted_id)
一旦数据被摄取到Mongodb,我们的CDC就会启动,最终在Qdrant中看到数据。
实时数据上的 RAG:
下面的代码定义了一个类--HybridQdrantOperations,用于管理Qdrant服务器上的混合搜索操作。该类使用密集和稀疏嵌入模型进行初始化,连接本地 Qdrant 服务器,并加载 JSON 有效载荷。混合搜索(hybrid_search)方法以执行时间为衡量标准,使用 Qdrant 客户端执行搜索,返回给定查询文本的顶级相似结果的元数据。
import json
from qdrant_client import QdrantClient, models
from utils.decorator_utils import execution_time_decorator
class HybridQdrantOperations:
def __init__(self):
self.payload_path = "../data.json"
self.collection_name = "startups"
self.DENSE_MODEL_NAME = "snowflake/snowflake-arctic-embed-s"
self.SPARSE_MODEL_NAME = "prithivida/Splade_PP_en_v1"
# collect to our Qdrant Server
self.client = QdrantClient(host="localhost", port=6333, api_key="th3s3cr3tk3y")
self.client.set_model(self.DENSE_MODEL_NAME)
# comment this line to use dense vectors only
self.client.set_sparse_model(self.SPARSE_MODEL_NAME)
self.metadata = []
self.documents = []
@execution_time_decorator
def hybrid_search(self, text: str, top_k: int = 5):
# self.client.query will have filters also if you want to do query on filter data.
search_result = self.client.query(
collection_name=self.collection_name,
query_text=text,
limit=top_k, # 5 the closest results
)
# `search_result` contains found vector ids with similarity scores
# along with the stored payload
# Select and return metadata
metadata = [hit.metadata for hit in search_result]
return metadata
下面的代码使用 llama-agents 和 kafka 建立了一个基于代理的系统,用于管理和检索有关初创公司的信息。首先,它初始化了一个用于混合搜索的 HybridQdrantOperations 实例,并配置了日志和 Kafka 连接细节。使用特定参数创建 Ollama 模型实例,并使用 OllamaEmbedding 设置嵌入模型。
定义了一个函数 get_startup_info,用于通过 Qdrant 执行混合搜索,并以 JSON 格式返回结果。该函数使用 FunctionTool 封装成一个工具,并使用 ReActAgent 类创建了一个配备该工具的代理 startup_tool_agent。另一个代理(summarization_agent)也被实例化,但没有任何工具。
系统采用 KafkaMessageQueue 进行消息处理,并连接到指定的 Kafka URL。控制平面服务器(ControlPlaneServer)通过消息队列和 AgentOrchestrator 进行初始化,将控制平面的服务器端口设置为 8001。创建了两个 AgentService 实例:startup_service 用于检索启动信息,summarization_service 用于汇总信息,每个实例都与各自的代理和 Kafka 消息队列相关联,并分配了唯一的服务端口(8002 和 8003)。这种设置有助于通过代理交互实现启动信息的无缝检索和汇总。
from llama_index.core import Settings
from llama_agents import (
AgentService,
AgentOrchestrator,
ControlPlaneServer
)
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_agents.message_queues.apache_kafka import KafkaMessageQueue
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.ollama import Ollama
from retrievers.qdrant_ops import HybridQdrantOperations
import json
qdrant_ops = HybridQdrantOperations()
is_logging_enabled = True
# better managed from env
KAFKA_CONNECTION_URL = "localhost:9092"
llm = Ollama(base_url='http://localhost:11434', model='mistral:latest', temperature=0.8, request_timeout=300,
system_prompt="You are an agent who consider the context passed "
"in, to answer any questions dont consider your prior "
"knowledge to answer and if you dont find the answer "
"please respond that you dont know.")
Settings.embed_model = OllamaEmbedding(base_url='http://localhost:11434', model_name='snowflake-arctic-embed:33m')
# create an agent
def get_startup_info(query: str) -> str:
"""Returns the information about startups."""
if is_logging_enabled:
print(f"Query from agent: {query}")
resp = qdrant_ops.hybrid_search(text=query, top_k=5)
if is_logging_enabled:
print(f"Response from search: {resp}")
return json.dumps(resp)
startup_info_tool = FunctionTool.from_defaults(fn=get_startup_info)
startup_tool_agent = ReActAgent.from_tools(tools=[startup_info_tool], llm=llm)
summarization_agent = ReActAgent.from_tools(tools=[], llm=llm)
message_queue = KafkaMessageQueue(url=KAFKA_CONNECTION_URL)
control_plane = ControlPlaneServer(
message_queue=message_queue,
orchestrator=AgentOrchestrator(llm=llm),
port=8001,
)
startup_service = AgentService(
agent=startup_tool_agent,
message_queue=message_queue,
description="Useful for getting the information about startups.",
service_name="info_extract_agent",
port=8002,
)
summarization_service = AgentService(
agent=summarization_agent,
message_queue=message_queue,
description="Useful for consolidating or summarizing the information.",
service_name="info_summarization_agent",
port=8003,
)
下面的代码使用之前定义的 startup_service、summarization_service、control_plane 和 message_queue 设置了一个 LocalLauncher 来管理和执行代理服务。然后进入一个无限循环,提示用户输入查询。如果用户输入 “bye ”或 “exit”,循环就会中断,程序结束。对于其他任何输入,启动器都会使用 launch_single 方法执行查询并打印结果。通过这种设置,可以对基于代理的系统进行交互式查询,以进行信息检索和汇总。
from llama_agents import LocalLauncher
from agent_with_rag_tool import startup_service, summarization_service, control_plane, message_queue
# launch it
launcher = LocalLauncher(
[startup_service, summarization_service],
control_plane,
message_queue,
)
while True:
input_query = input("Query (type 'bye' or 'exit' to quit the program ):")
if input_query.lower() == 'bye' or input_query.lower() == 'exit':
break
result = launcher.launch_single(initial_task=input_query)
print(f"Result: {result}")
一切运行正常后,输出结果如下。
后台发生了什么?
系统采用 llama-agents 框架通过 Kafka 平台管理查询,确保高效的主题创建、平衡和删除。当用户提交查询时,框架会动态创建必要的 Kafka 主题,并为这些主题注册消费者。然后启动代理来处理查询。控制平面负责协调工作流程,发布消息以启动任务并处理任务的完成。在处理完查询并提供响应后,框架会停止消费者并删除不再需要的主题,从而确保高效的资源管理和系统性能。
未来(高度可扩展概念)
上述架构是一个代理系统(Agentic System),各团队可与专用代理互动,从 Salesforce、Jira 和 AWS CloudWatch 等多个数据源检索相关信息。这些数据源分别提供客户数据、项目管理信息和应用程序日志。以下是架构的详细分解:
数据源:
源连接器: 使用 Confluent 的 Kafka 源连接器将这些源的数据摄取到系统中,从而实现向 Kafka 的无缝数据流。
Kafka 消息队列: Kafka 充当中心枢纽,管理数据流并将其路由到系统内的各种代理和组件。
代理:
Qdrant 使用 Kafka 的 Qdrant 汇连接器来存储数据并编制索引,从而实现高效的语义搜索功能。
控制平面:
总结
在对强大的数据处理和检索系统的探索中,我们深入研究了与 Kafka 和 Qdrant 集成的 llama-agents 框架如何高效地管理用户查询和数据操作。通过利用 Salesforce、Jira 和 AWS CloudWatch 等组件,该系统可确保无缝摄取、处理和检索有价值的数据。该代理系统可动态处理 Kafka 主题,使不同团队能够与专门的代理互动,根据他们的需求提供精确的信息。高效的资源管理凸显了系统平衡工作负载和保持性能的能力。最终,这一架构体现了管理复杂数据工作流的先进方法,使其成为希望增强数据处理生成式人工智能能力的组织的宝贵蓝图。