我们着手构建一个真正多语言和多模式的检索增强生成 (RAG) 系统 — 该系统不仅可以提取、嵌入和检索英文文本,还可以提取、嵌入和检索西班牙语、中文甚至特定领域的文档。我们希望这一切都能快速完成,尽量减少最终用户的摩擦。在尝试了各种框架之后,我们确定了一种架构,该架构支持LlamaIndex灵活的基于代理的方法和Qdrant的高性能矢量数据库。突然间,我们感觉灯亮了:我们有了一条管道,可以处理从提取到检索的所有事情,并提供无缝的跨语言和多模式支持。
架构剖析:
用户
每一次伟大的冒险都始于某人提出问题或发出请求。在我们的设置中,用户处于主导地位。他们与系统交互——无论是想要添加新文档、检索信息,还是运行特定的工作流。
工作流代理
架构的核心是工作流代理,它是乐队的指挥。工作流代理基于LlamaIndex构建,其任务是监听用户的请求,并决定下一步该走哪条路。是应该开始全新文档的摄取,还是启动检索程序?这个代理决定一切,并协调整体流程。
文档(Docs)
这些是我们想要索引的知识宝藏。可以把它们看作是输入系统的原材料——无论是文本文件、PDF,还是任何其他形式的数据,你之后需要嵌入并查询这些数据。
嵌入代理
文档出现后,嵌入代理(同样由LlamaIndex驱动)会处理每个文档,并将其转化为被称为“嵌入”的数学魔法。你可以把嵌入想象成文本的指纹。它们捕捉上下文本质,以便之后当用户搜索某内容时,系统能迅速找出最相关的文档。
摄取代理
一旦文档被漂亮地嵌入,摄取代理就会接手,并负责将它们存储到我们的向量数据库中的繁重工作——在这个例子中,是Qdrant。可以把这个代理想象成图书管理员,他为每本新到的书编目,确保它们被放在正确的书架上,以便之后轻松检索。
Qdrant
我们实际上在哪里存储这些嵌入呢?在Qdrant中,这是一个专门的向量数据库。Qdrant就像我们的图书馆,专为快速和准确的相似搜索而构建。当系统说,“我想找到所有与X相关的文档”时,Qdrant就会迅速行动,找到最佳匹配项。
检索代理
当用户的问题需要查找现有数据时,就会召唤检索代理。它会对Qdrant运行查询,利用嵌入的“指纹”,并获取相关的文本片段。这个代理的任务是在正确的时间提供正确的数据——无需再手动翻阅成千上万的文档。
vdr-2b-multi-v1(模型)
这是用于生成嵌入的基础模型或“大脑”。文本和图像在这里被处理成稠密向量。可以把它想象成驱动你嵌入代理技能的专门引擎。
Langfuse
最后,我们还有Langfuse在监控一切。它就像一个控制塔,提供可观察性和分析。每当一个代理执行某项操作(如摄取一批文档或执行检索)时,Langfuse都会记录该活动、测量性能,并帮助你跟踪底层发生的事情。
实施阶段
在整个文章中,我们使用了LlamaIndex的技术栈,如AgentWorkflows和他们的最新多语言嵌入模型vdr-2b-multi-v1。
整个项目具有以下结构。
.
├── LICENSE
├── README.md
├── advanced-rag.py
├── architecture.png
├── documents.json
├── images
│ ├── image-1.png
│ ├── image-2.png
│ ├── image-3.png
│ ├── image-4.png
│ ├── image-5.png
│ └── image-6.png
├── requirements.txt
├── text
│ ├── image-1-description.txt
│ ├── image-2-description.txt
│ ├── image-3-description.txt
│ ├── image-4-description.txt
│ ├── image-5-description.txt
│ └── image-6-description.txt
└── tools
├── __init__.py
└── rag_tools.py
包含项目依赖项的 requirements.txt 文件如下所示。
# data orchestration
llama-index
llama-index-vector-stores-qdrant
llama-index-llms-ollama
llama-index-llms-openai
llama-index-llms-anthropic
llama-index-embeddings-huggingface
llama-index-embeddings-ollama
#
python-dotenv
# observability
langfuse==2.58.2
用于维护应用程序所有密钥的 env 文件如下所示:
QDRANT_URL=http://localhost:6333/
QDRANT_API_KEY=****
OPENAI_API_KEY=sk-****
ANTHROPIC_API_KEY=sk-****
HF_TOKEN=hf_****
TOKENIZERS_PARALLELISM=false
# Langfuse Observability Details
LANGFUSE_PUBLIC_KEY='pk-lf-****'
LANGFUSE_SECRET_KEY='sk-lf-****'
LANGFUSE_HOST='http://localhost:3000'
摄取
在检索增强生成(RAG)工作流中,摄取阶段非常重要,其中有几个代理相互协作以确保工作流的成功。嵌入代理和摄取代理这两个代理将相互协调,以获取文档、将它们嵌入到预定义格式中,并最终将它们摄取到Qdrant向量存储中。
async def invoke_ingest_workflow(self):async def invoke_ingest_workflow(self):
embed_agent = FunctionAgent(
name="embeddings creator",
description="Performs embeddings creation task",
system_prompt="You are a assistant to create embeddings for the given data",
tools=[
FunctionTool.from_defaults(fn=create_embeddings),
],
llm=self.llm,
can_handoff_to=["data ingestor"]
)
ingest_agent = FunctionAgent(
name="data ingestor",
description="Performs data ingestion task",
system_prompt="You are a assistant to ingest data into vector database",
tools=[
FunctionTool.from_defaults(fn=ingest_to_vec_db),
],
llm=self.llm
)
# Create and run the workflow
workflow = AgentWorkflow(
agents=[embed_agent, ingest_agent], root_agent="embeddings creator", timeout=300
)
await workflow.run(user_msg="embed the data and then ingest it to vector database")
检索(文本和图像)
在任何检索增强生成(RAG)流程中的下一步都是关于为用户查询检索正确的上下文。在我们的场景中,检索是指如果用户提出文本查询,则检索图像,反之亦然(如果用户发送图像的路径,则检索文本)。
async def invoke_text2img_rag_workflow(self):
retrieval_agent = FunctionAgent(
name="retrieval agent",
description="Performs retrieval for the given user query",
system_prompt="You are an assistant to perform retrival for the given user query",
tools=[
FunctionTool.from_defaults(fn=retrieve_image_from_store)
],
llm=self.llm
)
# Create and run the workflow
workflow = AgentWorkflow(
agents=[retrieval_agent], root_agent="retrieval agent", timeout=300
)
await workflow.run(user_msg="user interacting with Spring AI system")
async def invoke_img2text_rag_workflow(self):
retrieval_agent = FunctionAgent(
name="retrieval agent",
description="Performs retrieval for the given user query",
system_prompt="You are an assistant to perform retrival for the given user query",
tools=[
FunctionTool.from_defaults(fn=retrieve_text_from_store)
],
llm=self.llm
)
# Create and run the workflow
workflow = AgentWorkflow(
agents=[retrieval_agent], root_agent="retrieval agent", timeout=300
)
await workflow.run(user_msg="images/image-2.png")
工具
整个代理工作流程的核心是决定在何时调用哪个工具。我们为每项任务专门设计了四个工具:
from qdrant_client import QdrantClient, models
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from PIL import Image
from dotenv import load_dotenv, find_dotenv
import os
import json
load_dotenv(find_dotenv())
vec_db_client = QdrantClient(url=os.environ.get("QDRANT_URL"), api_key=os.environ.get("QDRANT_API_KEY"))
collection_name = "architectures"
# Initialize the documents list
documents = []
# Initialize the model to None
model: HuggingFaceEmbedding = HuggingFaceEmbedding(
model_name="llamaindex/vdr-2b-multi-v1",
device="cpu", # "mps" for mac, "cuda" for nvidia GPUs
trust_remote_code=True,
)
def create_embeddings():
"""Useful for creating embeddings for images and text"""
print("downloading the llamaindex/vdr-2b-multi-v1 model from huggingface")
global model
global documents
# Specify the paths to the folders containing the images and descriptions
images_folder = "images"
descriptions_folder = "text"
print("creating document structure")
# Create a dictionary mapping image file names to their corresponding description files
image_files = sorted(os.listdir(images_folder))
description_files = sorted(os.listdir(descriptions_folder))
# Generate the documents structure
for image_file, description_file in zip(image_files, description_files):
# Read the description content
with open(os.path.join(descriptions_folder, description_file), "r") as f:
description_content = f.read().strip()
# Add the entry to the documents list
documents.append({
"architecture_description": description_content,
"architecture_image": os.path.join(images_folder, image_file)
})
# Save the documents structure to a JSON file (optional)
output_file = "documents.json"
with open(output_file, "w") as f:
json.dump(documents, f, indent=4)
print("Generated documents structure:")
print(json.dumps(documents, indent=4))
def ingest_to_vec_db():
"""Useful for ingesting the data to vector database"""
print("starting ingestion...")
text_embeddings = model.get_text_embedding_batch([doc["architecture_description"] for doc in documents],
show_progress=True)
image_embeddings = []
for doc in documents:
image_embeddings.append(model.get_image_embedding(doc["architecture_image"]))
print("creating collection in qdrant...")
if not vec_db_client.collection_exists(collection_name=collection_name):
vec_db_client.create_collection(
collection_name=collection_name,
vectors_config={
"image": models.VectorParams(size=len(image_embeddings[0]), distance=models.Distance.COSINE),
"text": models.VectorParams(size=len(text_embeddings[0]), distance=models.Distance.COSINE),
}
)
print("inserting points into qdrant...")
vec_db_client.upload_points(
collection_name=collection_name,
points=[
models.PointStruct(
id=idx,
vector={
"text": text_embeddings[idx],
"image": image_embeddings[idx],
},
payload=doc
)
for idx, doc in enumerate(documents)
]
)
print("indexing completed.")
def retrieve_image_from_store(text_query: str):
"""
Useful for retrieval of the data from vector database for a given text query
:param text_query: (str) input query
:return: None
"""
print(f"started search...query: {text_query}")
find_image = model.get_query_embedding(query=text_query)
try:
response = vec_db_client.query_points(
collection_name=collection_name,
query=find_image,
using="image",
with_payload=["architecture_image"],
limit=1
).points[0].payload['architecture_image']
print(response)
image = Image.open(response)
image.show()
except Exception as e:
print(e)
def retrieve_text_from_store(image_path: str):
"""
Useful for retrieval of the data from vector database for a given image path
:param image_path: (str) input image path to search
:return:
"""
response = vec_db_client.query_points(
collection_name=collection_name,
query=model.get_image_embedding(image_path),
# Now we are searching only among text vectors with our image query
using="text",
with_payload=["architecture_description"],
limit=1
).points[0].payload['architecture_description']
print(response)
可观测性
借助Langfuse,我们可以监控摄取和检索这两个工作流程。
from langfuse.llama_index import LlamaIndexInstrumentor
# instrumenting observability (should be the first statements)
instrumentor = LlamaIndexInstrumentor()
instrumentor.start()
结果
运行以下代码以调用涉及嵌入代理和摄取代理的摄取工作流程。
if __name__ == "__main__":
import asyncio
adv_mm_rag = MultilingualMultimodalRAG()
# first time to ingest the data
asyncio.run(adv_mm_rag.invoke_ingest_workflow())
======= output ========
downloading the llamaindex/vdr-2b-multi-v1 model from huggingface
creating document structure
Generated documents structure:
[
{
"architecture_description": "This image depicts a hybrid search architecture using both dense and sparse embedding models integrated with Qdrant for vector search. It includes an orchestrator and message queue (RabbitMQ) to handle user queries, retrieve context, and process responses via tool agents.",
"architecture_image": "images/image-1.png"
},
{
"architecture_description": "This image illustrates a pipeline for a RAG (Retrieval-Augmented Generation) system. Data engineers process web documents, database records, and internal documents through a CDC (Change Data Capture) pipeline into the LlamaIndex-Qdrant Indexer, which saves indexes into MLflow for tracking and evaluation. Users interact with the RAG-Bot that uses the LlamaIndex-Qdrant retriever to fetch data from the indexed information for their queries.",
"architecture_image": "images/image-2.png"
},
{
"architecture_description": "This diagram illustrates a workflow where users interact with a Spring AI system via two endpoints: /load to upload complex survey reports (PDFs) and /ask to query the data. The system uses Ollama for embedding generation and inference, with Qdrant serving as the vector store to store and retrieve document embeddings efficiently.",
"architecture_image": "images/image-3.png"
},
{
"architecture_description": "This diagram showcases a document-based question-answering workflow. Input documents are processed to generate vector embeddings using Hugging Face's bge-small-en-v1.5 model, which are stored in Qdrant. When a user asks a question, the most relevant context is retrieved, reranked, and used by the Hugging Face Meta-Llama-3-8B model to synthesize a response for the user.",
"architecture_image": "images/image-4.png"
},
{
"architecture_description": "This image illustrates a document retrieval and query-response system. Documents are split into smaller chunks and converted into embeddings, which are stored in Qdrant as a vector store. Queries are processed using Meta Llama-3 and HyDE, generating query embeddings to perform similarity searches in Qdrant, retrieving relevant document embeddings for response synthesis.",
"architecture_image": "images/image-5.png"
},
{
"architecture_description": "This diagram outlines a system where financial documents (e.g., from Honeywell and GE) are parsed and indexed into Qdrant as a vector store. The LlamaIndex-based RAG (Retrieval-Augmented Generation) system uses query engine tools to retrieve relevant data from Qdrant in response to user queries, delivering context-aware responses via an agent interface.",
"architecture_image": "images/image-6.png"
}
]
starting ingestion...
creating collection in qdrant...
inserting points into qdrant...
indexing completed.
if __name__ == "__main__":
import asyncio
adv_mm_rag = MultilingualMultimodalRAG()
# keep it enabled if you want to ask queries - text2img
asyncio.run(adv_mm_rag.invoke_text2img_rag_workflow())
同样,我们也可以反过来测试,作为用户,我可以输入图像名称或路径,并搜索与该图像相关的架构描述(剩余部分留作自行实践)。
结论
该架构通过明确定义的嵌入、摄取和检索代理,从根本上改变了组织存储、搜索和利用其内部知识的方式。团队现在拥有了一个强大的统一系统,可以将原始文本转化为可操作的见解——所有这些都可以通过自然语言访问。无论你是在跨国公司、医疗机构还是律师事务所,影响都是相同的:更快地发现相关信息、更准确的决策制定,以及显著减少重复努力。
通过将过程分解为专门的代理——每个代理分别专注于嵌入、摄取或检索——你可以轻松地演进或更换组件,而不会破坏整个管道。同时,像Qdrant和Langfuse这样的工具完善了这一图景,提供了强大的存储、相似度搜索和分析功能,以保持一切透明且可扩展。