在我之前我探讨过如何使用具有非常大上下文规模的新模型(如Gemini Flash 2.0)进行语义分块,可以显著提高从非结构化数据(如PDF)中的整体检索准确性。
在探索这一过程中,我开始寻找其他策略,以进一步提高响应的准确性,因为在大多数大型企业中,对不准确性的容忍度几乎为零,也应该如此。为此,我尝试了许多不同的方法。在这篇文章中,让我们来看看最终帮助提高准确性的整体步骤。
不过,在我们深入这些步骤之前,让我们从稍高的层面来看看整个过程,并理解为了获得更准确的结果,我们需要显著做好两件事:
现在,让我们来看看反映项目当前状态的具体步骤。我在每个步骤中插入了一些伪代码,以使整篇文章更易于理解。
从PDF中提取知识
流程
当PDF进入系统时,需要发生几件事:存储、处理、分块、嵌入和用结构化知识丰富。以下是整个过程:
第一步:上传和记录创建
# Pseudo-codecode
save_file_to_disk(pdf)
db_insert(document_record, status=”started”)
queue_processing_task(pdf)
第二步:解析和分块PDF
# Pseudo-code
validate_pdf(pdf)
text = extract_text(pdf)
chunks = semantic_chunking(text) or fallback_chunking(text)
add_overlaps(chunks)
第三步:生成嵌入
每个块都使用一个嵌入模型转换成一个高维向量。我使用了1536维,因为我选用的是OpenAI的大型ada模型。
接下来,将块及其嵌入都存储在数据库中。在SingleStore中,我们将块和文本存储在同一个表中的两个不同列中,以便于维护和检索。
# Pseudo-code
for chunk in chunks:
vector = generate_embedding(chunk.text)
db_insert(embedding_record, vector)
第四步:使用大型语言模型(LLM)提取实体和关系
这一步对整体准确性有着至关重要的影响。在这一步中,我将语义上组织好的块发送给OpenAI,并通过一些特定的提示,要求它从每个块中返回实体和关系。结果包括关键实体(名称、类型、描述、别名)。
实体之间的关系被绘制出来。在这里,如果我们发现多个实体,我们会用丰富的数据每次更新类别,而不是添加重复项。
提取出的“知识”现在被存储在结构化表中。
# Pseudo-code
for chunk in chunks:
entities, relationships = extract_knowledge(chunk.text)
db_insert(entities)
db_insert(relationships)
第五步:最终处理状态
如果一切处理正确,状态将更新为“已完成”。这样,前端可以持续轮询,并随时显示正确的状态。
如果某个环节失败,状态将标记为“失败”,并且任何临时数据都将被清理。
# Pseudo-code
if success:
update_status(“completed”)
else:
update_status(“failed”)
cleanup_partial_data()
当这些步骤完成后,我们现在拥有了语义块、它们对应的嵌入,以及在文档中找到的实体和关系,这些都被存储在相互引用的表中。
检索知识(检索增强生成(RAG)管道)
流程
现在数据已经被结构化并存储起来,当用户提出问题时,我们需要有效地检索它。系统处理查询,找到相关信息,并生成响应。
第一步:用户查询
用户向系统提交一个查询。
# Pseudo-code
query = get_user_query()
第二步:预处理和扩展查询
系统对查询进行规范化处理(去除标点符号、规范空白字符、使用同义词进行扩展)。在这里,我再次使用了一个大型语言模型(Groq,以加快处理速度)。
# Pseudo-code
query = preprocess_query(query)
expanded_query = expand_query(query)
第三步:嵌入查询并搜索向量
查询被嵌入到一个高维向量中。我使用的是之前用于提取时相同的ada模型。
系统使用语义搜索在文档嵌入数据库中搜索最接近的匹配项。我在SingleStore中使用dot_product来实现这一点。
# Pseudo-code
query_vector = generate_embedding(expanded_query)
top_chunks = vector_search(query_vector)
第四步:全文搜索
为了补充向量搜索,同时进行全文搜索。在SingleStore中,我们通过使用MATCH语句来实现这一点。
# Pseudo-code
text_results = full_text_search(query)
第五步:合并并排序结果
将向量搜索和文本搜索的结果合并,并根据相关性重新排序。在这里,我们可以调整的一个配置是前k个结果。当k值设为10或更高时,我得到了更好的结果。
低置信度的结果被过滤掉。
# Pseudo-code
merged_results = merge_and_rank(top_chunks, text_results)
filtered_results = filter_low_confidence(merged_results)
第六步:检索实体和关系
接下来,如果检索到的块中存在实体和关系,则将它们包含在响应中。
# Pseudo-code
for result in filtered_results:
entities, relationships = fetch_knowledge(result)
enrich_result(result, entities, relationships)
第七步:生成最终答案
现在,我们考虑整体上下文,并通过提示来丰富上下文,然后将相关数据发送给大型语言模型(我使用的是gpt3o-mini)以生成最终响应。
# Pseudo-code
final_answer = generate_llm_response(filtered_results)
第八步:将答案返回给用户
系统将响应作为结构化JSON数据包发送回用户,同时附带原始的数据库搜索结果,以便在需要时识别来源进行进一步的调试和调整。
# Pseudo-code
return_response(final_answer)
现在,关键来了。总的来说,检索过程对我来说大约需要8秒,这是不可接受的。
在追踪调用时,我发现最大响应时间来自大型语言模型的调用(大约1.5到2秒)。SingleStore数据库查询的响应时间始终在600毫秒或更少。在将几个大型语言模型的调用切换到Groq后,整体响应时间降到了3.5秒。我认为,如果我们进行并行调用而不是串行调用,这可以进一步改进,但这是另一个项目的任务了。
最后,关键来了。
鉴于我们使用的是SingleStore,我想看看是否可以通过一个查询来完成整个检索过程,这样不仅便于将来管理、更新和改进,而且因为我还希望数据库能提供更快的响应时间。这里的假设是,大型语言模型在不久的将来会变得更好、更快,而我对这些无法控制(当然,如果你真的对延迟非常在意,可以在同一网络中部署一个本地大型语言模型)。
最后,这里是代码(为方便起见,放在一个文件中),它现在可以通过一次查询完成检索。
import os
import json
import mysql.connector
from openai import OpenAI
# Define database connection parameters (assumed from env vars)
DB_CONFIG = {
"host": os.getenv("SINGLESTORE_HOST", "localhost"),
"port": int(os.getenv("SINGLESTORE_PORT", "3306")),
"user": os.getenv("SINGLESTORE_USER", "root"),
"password": os.getenv("SINGLESTORE_PASSWORD", ""),
"database": os.getenv("SINGLESTORE_DATABASE", "knowledge_graph")
}
def get_query_embedding(query: str) -> list:
"""
Generate a 1536-dimensional embedding for the query using OpenAI embeddings API.
"""
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = client.embeddings.create(
model="text-embedding-ada-002",
input=query
)
return response.data[0].embedding # Extract embedding vector
def retrieve_rag_results(query: str) -> list:
"""
Execute the hybrid search SQL query in SingleStore and return the top-ranked results.
"""
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(dictionary=True)
# Generate query embedding
query_embedding = get_query_embedding(query)
embedding_str = json.dumps(query_embedding) # Convert to JSON for SQL compatibility
# Set the query embedding session variable
cursor.execute("SET @qvec = %s", (embedding_str,))
# Hybrid Search SQL Query (same as provided earlier)
sql_query = """
SELECT
d.doc_id,
d.content,
(d.embedding <*> @qvec) AS vector_score,
MATCH(TABLE Document_Embeddings) AGAINST(%s) AS text_score,
(0.7 * (d.embedding <*> @qvec) + 0.3 * MATCH(TABLE Document_Embeddings) AGAINST(%s)) AS combined_score,
JSON_AGG(DISTINCT JSON_OBJECT(
'entity_id', e.entity_id,
'name', e.name,
'description', e.description,
'category', e.category
)) AS entities,
JSON_AGG(DISTINCT JSON_OBJECT(
'relationship_id', r.relationship_id,
'source_entity_id', r.source_entity_id,
'target_entity_id', r.target_entity_id,
'relation_type', r.relation_type
)) AS relationships
FROM Document_Embeddings d
LEFT JOIN Relationships r ON r.doc_id = d.doc_id
LEFT JOIN Entities e ON e.entity_id IN (r.source_entity_id, r.target_entity_id)
WHERE MATCH(TABLE Document_Embeddings) AGAINST(%s)
GROUP BY d.doc_id, d.content, d.embedding
ORDER BY combined_score DESC
LIMIT 10;
"""
# Execute the query
cursor.execute(sql_query, (query, query, query))
results = cursor.fetchall()
cursor.close()
conn.close()
return results # Return list of retrieved documents with entities and relationships
总结
在很多方面,我记录这些是为了提醒自己,在考虑企业需求的情况下,今天构建一个企业级检索增强生成(RAG)或知识增强生成(KAG)系统需要哪些工作。