使用LlamaIndex Agents和Qdrant的混合搜索构建更智能的代理

2024年07月08日 由 alex 发表 362 0

在不断发展的人工智能领域,创建更智能的数据检索系统至关重要。本文将探讨如何使用利用最新功能(如 LlamaIndex-Agents 和 Qdrant 的混合搜索功能)构建的定制代理,结合稀疏和密集向量嵌入来实现精确的数据检索。我们将研究一种先进的架构,在这种架构中,来自 llamaindex-agents 和消息队列系统的协调器可以有效地管理各种代理和工具,并在它们之间进行通信。了解这些技术的集成如何增强查询处理能力,从而一窥人工智能驱动的数据解决方案的未来。


10


架构

该架构是LlamaIndex代理和Qdrant混合搜索功能的最佳组合,为高级数据检索和查询处理提供了复杂的解决方案。通过整合密集和稀疏嵌入模型,Qdrant可确保搜索既精确又全面,充分利用两种向量类型的优势。控制平面及其协调器和服务元数据可有效管理工作流程,确保将查询导向适当的工具,并高效生成响应。


当用户提交查询时,查询会通过协调器进行处理,协调器会与 RabbitMQ 协调,处理不同代理之间的通信。信息工具代理从 Qdrant 中检索必要的信息,利用混合搜索功能确保高质量的结果。然后,这些数据通过 RabbitMQ 传递给 summary_tool,由 summary_tool 为用户编译成连贯而有意义的响应。LlamaIndex 代理与 Qdrant 混合搜索的无缝集成不仅提高了数据检索的准确性和相关性,还确保了顺畅高效的查询处理工作流程。


实施

在深入实施之前,我们可以从这里获得本项目的数据。


在这个项目中,我们有两个核心模块,分别叫做 retriever 和 agents,为了实现这两个模块,首先让我们安装所需的 python libraires。


requirements.txt


llama-index-llms-ollama==0.1.5
llama-agents==0.0.5
qdrant-client==1.10.0
fastembed==0.3.1
aio_pika==9.4.2


/retrievers/qdrant_ops.py


import json
from qdrant_client import QdrantClient, models
from utils.decorator_utils import execution_time_decorator

class HybridQdrantOperations:
    def __init__(self):
        self.payload_path = "../data.json"
        self.collection_name = "hybrid-multi-stage-queries-collection"
        self.DENSE_MODEL_NAME = "snowflake/snowflake-arctic-embed-s"
        self.SPARSE_MODEL_NAME = "prithivida/Splade_PP_en_v1"
        # collect to our Qdrant Server
        self.client = QdrantClient(host="5496bdf1-fe1b-4e36-8715-aa5319aa1bf7.us-east4-0.gcp.cloud.qdrant.io",
                                   port=6333,
                                   api_key="<YOUR API KEY>")
        self.client.set_model(self.DENSE_MODEL_NAME)
        # comment this line to use dense vectors only
        self.client.set_sparse_model(self.SPARSE_MODEL_NAME)
        self.metadata = []
        self.documents = []
    def load_data(self):
        with open(self.payload_path) as fd:
            for line in fd:
                obj = json.loads(line)
                self.documents.append(obj.pop("description"))
                self.metadata.append(obj)
    def create_collection(self):
        if not self.client.collection_exists(collection_name=f"{self.collection_name}"):
            self.client.create_collection(
                collection_name=f"{self.collection_name}",
                vectors_config=self.client.get_fastembed_vector_params(),
                # comment this line to use dense vectors only
                sparse_vectors_config=self.client.get_fastembed_sparse_vector_params(on_disk=True),
                optimizers_config=models.OptimizersConfigDiff(
                    default_segment_number=5,
                    indexing_threshold=0,
                ),
                quantization_config=models.BinaryQuantization(
                    binary=models.BinaryQuantizationConfig(always_ram=True),
                ),
                shard_number=4
            )
    @execution_time_decorator
    def insert_documents(self):
        self.client.add(
            collection_name=self.collection_name,
            documents=self.documents,
            metadata=self.metadata,
            # parallel=0,  # Use all available CPU cores to encode data if the value is 0
        )
        # self._optimize_collection_after_insert()
    @execution_time_decorator
    def hybrid_search(self, text: str, top_k: int = 5):
        # self.client.query will have filters also if you want to do query on filter data.
        search_result = self.client.query(
            collection_name=self.collection_name,
            query_text=text,
            limit=top_k,  # 5 the closest results
        )
        # `search_result` contains found vector ids with similarity scores
        # along with the stored payload
        # Select and return metadata
        metadata = [hit.metadata for hit in search_result]
        return metadata
    def _optimize_collection_after_insert(self):
        self.client.update_collection(
            collection_name=f'{self.collection_name}',
            optimizer_config=models.OptimizersConfigDiff(indexing_threshold=30000)
        )
# only to be used as a driver code else just use the class from else where.
if __name__ == '__main__':
    ops = HybridQdrantOperations()
    # only run the below for the first time when you newly create collection and want to ingest the data.
    ops.load_data()
    ops.create_collection()
    ops.insert_documents()
    # results = ops.hybrid_search(text="What are the gaming companies in bangalore?", top_k=10000)
    # print(results)


HybridQdrantOperations 集成了密集和稀疏嵌入模型,可使用 Qdrant 向量数据库实现高级搜索功能。该类使用必要的配置进行初始化,包括有效载荷路径和模型名称。它使用特定凭证连接到 Qdrant 服务器,并同时设置密集和稀疏模型。load_data 方法从 JSON 文件中读取数据,分别存储文档描述和元数据。create_collection 方法会检查指定的集合是否存在,如果不存在则会创建它,并为快速嵌入向量和稀疏向量进行配置,同时启用二进制量化以优化性能。insert_documents 方法用于测量执行时间,它将文档和元数据添加到集合中。hybrid_search 方法使用文本查询执行混合搜索,返回最接近结果的元数据。一个优化函数会在插入后更新集合配置。


/agents/agent_with_rag_tool.py


from llama_index.core import Settings
from llama_agents import (
    AgentService,
    AgentOrchestrator,
    ControlPlaneServer
)
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_agents.message_queues.rabbitmq import RabbitMQMessageQueue
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.ollama import Ollama
from retrievers.qdrant_ops import HybridQdrantOperations
import json
qdrant_ops = HybridQdrantOperations()
is_logging_enabled = True
# better managed from env
RABBITMQ_CONNECTION_URL = "amqp://<USER>:<PASSWORD>@localhost/"
llm = Ollama(base_url='http://localhost:11434', model='mistral:latest', temperature=0.8, request_timeout=300,
             system_prompt="You are an agent who consider the context passed "
                           "in, to answer any questions dont consider your prior "
                           "knowledge to answer and if you dont find the answer "
                           "please respond that you dont know.")
Settings.embed_model = OllamaEmbedding(base_url='http://localhost:11434', model_name='mxbai-embed-large:latest ')

# create an agent
def get_startup_info(query: str) -> str:
    """Returns the information about startups."""
    if is_logging_enabled:
        print(f"Query from agent: {query}")
    resp = qdrant_ops.hybrid_search(text=query, top_k=5)
    if is_logging_enabled:
        print(f"Response from search: {resp}")
    return json.dumps(resp)

startup_info_tool = FunctionTool.from_defaults(fn=get_startup_info)
startup_tool_agent = ReActAgent.from_tools(tools=[startup_info_tool], llm=llm)
summarization_agent = ReActAgent.from_tools(tools=[], llm=llm)
message_queue = RabbitMQMessageQueue(url=RABBITMQ_CONNECTION_URL, exchange_name="agents")
control_plane = ControlPlaneServer(
    message_queue=message_queue,
    orchestrator=AgentOrchestrator(llm=llm),
    port=8001,
)
startup_service = AgentService(
    agent=startup_tool_agent,
    message_queue=message_queue,
    description="Useful for getting the information about startups.",
    service_name="info_extract_agent",
    port=8002,
)
summarization_service = AgentService(
    agent=summarization_agent,
    message_queue=message_queue,
    description="Useful for consolidating or summarizing the information.",
    service_name="info_summarization_agent",
    port=8003,
)


上述代码设置了一个高级定制人工智能代理系统,该系统集成了 LlamaIndex-agents 框架、Qdrant 混合搜索和用于消息队列的 RabbitMQ。它初始化了 HybridQdrantOperations 以处理混合搜索操作,并配置了 Ollama LLM 以生成响应和嵌入。使用指定的连接 URL 为代理通信建立 RabbitMQ 消息队列。get_startup_info 函数记录其操作,通过混合搜索检索启动信息,并以 JSON 格式返回结果。该函数被封装在一个 FunctionTool 中,并分配给一个配置了搜索工具和 LLM 的 ReAct 代理 startup_tool_agent。另一个 ReAct 代理(summarization_agent)在没有工具的情况下被设置为执行汇总任务。ControlPlaneServer 通过消息队列管理代理协调,监听端口为 8001。创建了两个代理服务:用于启动信息检索的启动服务(startup_service),监听端口为 8002;用于信息汇总的汇总服务(summarization_service),监听端口为 8003。这些组件协同工作,利用混合搜索功能和高效的消息队列,有效地处理查询。


/agents/agent_launcher.py


from llama_agents import LocalLauncher
from agent_with_rag_tool import startup_service, summarization_service, control_plane, message_queue
# launch it
launcher = LocalLauncher(
    [startup_service, summarization_service],
    control_plane,
    message_queue,
)
while True:
    input_query = input("Query (type 'bye' or 'exit' to quit the program ):")
    if input_query.lower() == 'bye' or input_query.lower() == 'exit':
        break
    result = launcher.launch_single(initial_task=input_query)
    print(f"Result: {result}")


上述代码使用 LlamaAgents 框架为人工智能代理设置了一个本地启动器。LocalLauncher 使用必要的组件进行初始化,以便在本地管理和运行指定的代理和控制平面。在一个循环中,程序会提示用户输入查询,使用 LocalLauncher 的 launch_single 方法进行处理,并打印结果。这个循环一直持续到用户输入 "再见 "或 "退出 "为止,从而为查询人工智能系统和实时获取处理结果提供了一个交互式界面。


结果


11


12


结论

总之,LlamaIndex 代理与 Qdrant 混合搜索功能的整合为高级数据检索和查询处理提供了强大而高效的解决方案。通过利用密集和稀疏嵌入模型,该系统可提供精确而全面的搜索结果。该架构以协调器、RabbitMQ 消息队列以及用于提取和汇总的专门代理为特色,确保了高效的查询处理和信息检索。这种设置凸显了将先进的人工智能模型与搜索技术相结合的潜力,从而提供反应灵敏的智能数据解决方案。各组件之间的无缝互动提高了搜索准确性和工作流程效率,展示了混合搜索技术在人工智能驱动应用中的实际优势。

文章来源:https://medium.com/@manthapavankumar11/building-smarter-agents-using-llamaindex-agents-and-qdrants-hybrid-search-50c0ecbbfb0d
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
写评论取消
回复取消