在这个数据和生成智能的新时代,数据基础设施团队正在努力以方便、高效且符合法规的方式提供公司数据。这对于大型语言模型 (LLM) 和代理检索增强生成 (RAG) 的开发尤为重要,这两个模型已经席卷了分析界。在本文中,我们将介绍如何从头开始构建数据代理并使用开源数据目录与其交互。
什麼是 LLM Agent?
在开始之前,我们应该先回顾一下代理在 RAG 管道中的作用。虽然 LLM 本身缺乏高级推理能力,并且只提供理解和生成语言的一般能力,但代理可以更进一步,能够接受指令执行更复杂、特定领域的推理,然后将其反馈给 LLM。
代理可用于不同的目的和不同的领域,例如数学问题解决、检索增强聊天、个人助理等。数据代理通常设计用于提取目标,通过直接与数据本身交互。通过帮助完成辅助推理任务,一般应用程序的性能可以大大提高,响应也更准确。
以下是資料代理程式的一般架構。
如你所见,代理将接受 LLM 的指令,并根据设计,通过一组应用程序接口或其他代理与用户或 LLM 接口。然后,它通过规划将大型任务分解为较小的任务,并具有一定的反思和改进能力。与记忆相结合,代理将通过使用矢量存储和检索方法,在较长的上下文窗口中保留和调用信息。代理还可以调用外部应用程序接口,从其他数据源填补缺失的信息,这一点非常有用。
RAG 开发中的生产问题
已经有很多演示、POC 和教程介绍了如何构建一个简单的数据代理,但当我们转向生产应用时,仍然面临着一些挑战。
数据质量和完整性
无论使用哪种 LLM,数据质量和完整性都会直接影响答案的准确性。结构化数据元数据的质量将直接影响生成的 SQL 语句的准确性,这可能会产生意想不到的效果。无论采用哪种分块策略,糟糕的源数据和文档都会污染矢量嵌入的质量,并导致糟糕的检索结果,这些结果可能是无意义的或令人产生幻觉的。在生成式人工智能时代,“垃圾进,垃圾出 ”这句话比以往任何时候都更加重要。
从各种来源检索信息
在任何组织或公司中,都可能从各种来源获取数据。除了特定的各种格式和存储解决方案外,数据还可能需要从一个数据中心/区域穿越到跨区域、跨云分布。如果我们不能成功地从整个组织的广泛来源中高效地连接和检索数据,我们就会因为遗漏关键数据和关系而面临巨大的劣势,从而难以实施知识图谱或绘制出相似性地图,以便我们的 LLM 提供准确的答案。同时,传统的 ETL 集中数据方式也会降低答案的有效性,因为通常需要 T+1 才能准备好数据。
数据隐私、安全性和合规性
在构建任何生产级数据系统(包括数据代理和应用程序接口)时,数据隐私、安全性和合规性都是最重要的。在实施 LLM 时,这个问题变得更具挑战性,因为 LLM 在规模上往往具有令人难以置信的高维性和复杂性,因此需要从源头追踪其输出。对此类系统进行故障诊断,尤其是在多次调用外部工具和应用程序接口的情况下,是很难做到的,更不用说在保证隐私和安全的前提下了。设计我们的数据基础设施和端到端系统,使其具有高可见性、可观察性、可测量性和持续稳健性,这一点非常重要。
什么是 Apache Gravitino(孵化中)?
Apache Gravitino(孵化中)是一个高性能、地理分布式和联合元数据湖。通过使用技术数据目录和元数据湖,你可以管理所有数据源(包括文件库、关系数据库和事件流)的访问并执行数据治理,同时在不同云提供商的多种格式上安全地使用多个引擎(如 Spark、Trino 或 Flink)。当我们试图让 LlamaIndex 同时在众多数据源上快速启动和运行时,这对我们的数据架构非常有用。
使用 Gravitino,你可以实现以下目标:
如果没有 Gravitino,典型的代理 RAG 系统将是这样的:
用户需要使用不同的阅读器逐一连接各种数据源,当数据分布在具有不同安全策略的云中时,困难将成倍增加。
有了 Gravitino,新架构变得更加精简:
使用 Gravitino 和 LlamaIndex 构建通用数据代理
环境设置
下面我们提取了你自己重现此过程所需的代码。请记住,要运行此演示,你需要一个 OpenAI API 密钥。
git clone git@github.com:apache/gravitino-playground.gitclone git@github.com:apache/gravitino-playground.git
cd gravitino-playground
./launch-playground.sh
本地游乐场中包含的演示的整体架构如下所示:
使用 Gravitino 管理数据集
首先,我们需要建立第一个目录,并将其连接到我们的文件集。在我们的例子中,数据源是 Hadoop。然后,我们需要定义模式并提供存储位置。
demo_catalog = NoneNone
try:
demo_catalog = gravitino_client.load_catalog(name=catalog_name)
except Exception as e:
demo_catalog = gravitino_client.create_catalog(name=catalog_name,
catalog_type=Catalog.Type.FILESET,
comment="demo",
provider="hadoop",
properties={})
# Create schema and fileset
schema_countries = None
try:
schema_countries = demo_catalog.as_schemas().load_schema(ident=schema_ident)
except Exception as e:
schema_countries = demo_catalog.as_schemas().create_schema(ident=schema_ident,
comment="countries",
properties={})
fileset_cities = None
try:
fileset_cities = demo_catalog.as_fileset_catalog().load_fileset(ident=fileset_ident)
except Exception as e:
fileset_cities = demo_catalog.as_fileset_catalog().create_fileset(ident=fileset_ident,
fileset_type=Fileset.Type.EXTERNAL,
comment="cities",
storage_location="/tmp/gravitino/data/pdfs",
properties={})
构建 Gravitino 结构化数据阅读器
连接数据源后,我们需要以某种方式进行查询。在本例中,我们决定使用通过 sqlalchemy 连接的 Trino 来帮助我们。不过,如果你已经使用 PySpark,你也可以使用它。
from sqlalchemy import create_engine
from trino.sqlalchemy import URL
from sqlalchemy.sql.expression import select, text
trino_engine = create_engine('trino://admin@trino:8080/catalog_mysql/demo_llamaindex')
connection = trino_engine.connect();
with trino_engine.connect() as connection:
cursor = connection.exec_driver_sql("SELECT * FROM catalog_mysql.demo_llamaindex.city_stats")
print(cursor.fetchall())
构建 Gravitino 非结构化数据阅读器
一旦我们建立了基本的数据基础架构,现在就可以直接将其读取到 LlamaIndex 中。Gravitino 将使用虚拟文件系统将数据作为目录提供给 LlamaIndex 作为输入。
from llama_index.core import SimpleDirectoryReader
from gravitino import gvfs
fs = gvfs.GravitinoVirtualFileSystem(
server_uri=gravitino_url,
metalake_name=metalake_name
)
fileset_virtual_location = "fileset/catalog_fileset/countries/cities"
reader = SimpleDirectoryReader(
input_dir=fileset_virtual_location,
fs=fs,
recursive=True)
wiki_docs = reader.load_data()
从结构化数据连接建立 SQL 元数据索引
建立索引后,我们就可以开始仅从元数据建立索引和向量存储了。
from llama_index.core import SQLDatabase
sql_database = SQLDatabase(trino_engine, include_tables=["city_stats"])
从非结构化数据建立矢量索引
from llama_index.core import VectorStoreIndex
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI
# Insert documents into vector index
# Each document has metadata of the city attached
vector_indices = {}
vector_query_engines = {}
for city, wiki_doc in zip(cities, wiki_docs):
vector_index = VectorStoreIndex.from_documents([wiki_doc])
query_engine = vector_index.as_query_engine(
similarity_top_k=2, llm=OpenAI(model="gpt-3.5-turbo")
)
vector_indices[city] = vector_index
vector_query_engines[city] = query_engine
定义查询引擎并提出问题
要使其成为一个功能完善的聊天应用程序,我们需要提供一个文本到 SQL 的接口,将所有功能整合在一起。在这种情况下,我们将使用 LlamaIndex 的本地函数来直接连接我们在前面步骤中定义的索引。
from llama_index.core.query_engine import NLSQLTableQueryEngine
from llama_index.core.query_engine import SQLJoinQueryEngine
# Define the NL to SQL engine
sql_query_engine = NLSQLTableQueryEngine(
sql_database=sql_database,
tables=["city_stats"],
)
# Define the vector query engines for each city
from llama_index.core.tools import QueryEngineTool
from llama_index.core.tools import ToolMetadata
from llama_index.core.query_engine import SubQuestionQueryEngine
query_engine_tools = []
for city in cities:
query_engine = vector_query_engines[city]
query_engine_tool = QueryEngineTool(
query_engine=query_engine,
metadata=ToolMetadata(
name=city, description=f"Provides information about {city}"
),
)
query_engine_tools.append(query_engine_tool)
s_engine = SubQuestionQueryEngine.from_defaults(
query_engine_tools=query_engine_tools, llm=OpenAI(model="gpt-3.5-turbo")
)
# Convert engines to tools and combine them together
sql_tool = QueryEngineTool.from_defaults(
query_engine=sql_query_engine,
description=(
"Useful for translating a natural language query into a SQL query over"
" a table containing: city_stats, containing the population/country of"
" each city"
),
)
s_engine_tool = QueryEngineTool.from_defaults(
query_engine=s_engine,
description=(
f"Useful for answering semantic questions about different cities"
),
)
query_engine = SQLJoinQueryEngine(
sql_tool, s_engine_tool, llm=OpenAI(model="gpt-4")
)
# Issue query
response = query_engine.query(
"Tell me about the arts and culture of the city with the highest"
" population"
)
最终答案由两部分组成:
一个是来自 SQL 引擎的答案,数据代理从自然语言中生成一条 SQL 语句 “SELECT city_name、population、country FROM city_stats ORDER BY population DESC LIMIT 1”,并从结构化数据中得到东京人口最多的答案。
根据第一个答案,数据代理生成了三个子问题:"你能提供有关东京博物馆、剧院和演出场所的更多详细信息吗?
最终答案综合了这两部分内容,如下所示:
最终答案: 人口最多的城市是日本东京。东京以其充满活力的艺术和文化氛围而闻名,融合了传统和现代的影响。从古老的寺庙和传统茶道到尖端技术和当代艺术画廊,东京为游客和居民提供了多种多样的文化体验。这座城市还有众多博物馆、剧院和表演场所,展示着日本丰富的历史和创造力。遗憾的是,根据所提供的背景信息,我无法提供有关东京博物馆、剧院和表演场所的更多详细信息。
结论
这里的演示展示了如何使用 Gravitino 进行数据摄取和使用 LlamaIndex 进行高效数据检索。借助 Gravitino 的生产就绪功能,用户可以轻松构建通用数据代理。