这篇文章我们将深入研究如何使用 Indexify 构建数据提取管道,然后使用 OpenAI 基于提取的数据创建检索增强生成 (RAG) 应用程序。我们还将使用 Llama-index 和 Mistral 开发 Streamlit RAG 应用程序,以演示 Indexify 如何无缝集成到你现有的 LLM 和编排堆栈中,包括 Llama-index 和 LangChain 等工具。让我们开始吧!
什么是 Indexify
Indexify 是一个开源数据框架,包括一个实时提取引擎和现成的提取适配器。它为所有类型的非结构化数据(包括文档、演示文稿、视频和音频)提供可靠的开箱即用提取功能。
项目概述
在本文中,我们将介绍如何构建一个在线数据摄取管道,该管道可以完成以下几件事:
为了更易于理解,请看下图:
下载 Indexify 服务器
我们要做的第一件事就是下载 Indexify 服务器。在终端运行以下代码,确保系统中已下载了 curl。
在你需要的位置创建一个项目文件夹,将文件夹命名为 indexfiy_basics 。完成后,导航进入该文件夹。
进入该文件夹后,运行以下命令。
$ curl https://getindexify.ai | sh
这将在本地计算机上下载 Indexify 服务器,并在计算机上创建一个名为 indexify 的文件。请使用下面的命令来执行。
$ ./indexify server -d
运行该文件后,我们就可以访问以下端点:用户界面端点、用于上传内容和从索引(某些情况下是 SQL 表)检索内容的 API 端点,我们还将重点关注索引。
你可以在浏览器中访问 http://localhost:8900/ui,查看使用 Indexify 后的用户界面。
安装依赖项
为了与服务器交互、创建嵌入和基本构建管道,我们必须创建一个虚拟环境并安装所有依赖项。为此,请使用以下命令:
$ python3 -m venv venv
这将在你的项目目录中创建一个名为 venv 的文件夹。现在,我们要激活这个虚拟环境,安装使用 Indexify 所需的所有软件包。使用下面的命令:
$ source venv/bin/activate
在我的终端中,你可以注意到行首的虚拟环境文件夹名称 venv。这表明虚拟环境已激活,我们可以在其中安装东西了。
为此,我们将使用下面的命令安装依赖项:
$ pip3 install indexify-extractor-sdk indexify wikipedia openai langchain_community
安装 Indexify 提取器
提取器有助于将非结构化数据转化为结构化数据或嵌入式数据,使我们能够使用矢量数据库或简单的 SQL 查询信息。在我们的案例中,我们不会使用 SQL 表,我们只会专注于创建嵌入式数据,这些嵌入式数据将在稍后构建 RAG 代理时使用。
要安装这些提取器,请确保虚拟环境已激活。我们已经介绍过如何做到这一点,以及如何知道虚拟环境是否已激活。
$ indexify-extractor download tensorlake/openai
$ indexify-extractor download tensorlake/minilm-l6
$ indexify-extractor download tensorlake/chunk-extractor
在上述命令中,第一个命令是安装 OpenAI 提取器,我们将用它从维基百科获取的数据中提取命名实体(NER 管道的一部分)。你可能知道,OpenAI 如今更像是 ClosedAI,因此我们需要一个 OpenAI 密钥,并确保你已经支付了足够的令牌,才能继续阅读本文。
你可以从这里获取 OpenAI API 密钥。有了 API 密钥后,我们需要将其添加到系统变量中:
$ export OPENAI_API_KEY=xxxxxx
我已将我的变量添加到系统变量中,完成后我们就开始吧。
还记得我们之前在一个终端中运行 indexify 服务器吗?
现在继续运行,不要终止服务器,而是打开另一个终端,运行以下命令启动一些 Worker。这些 Worker 将用于运行提取器。确保在新打开的终端中激活了虚拟环境。
$ indexify-extractor join-server
创建数据提取管道
基本上,我们将在管道中并行运行两个进程,首先是管道中的命名实体识别部分,以及由两个主要节点(我称之为分块节点和嵌入节点)组成的管道。
这两个管道将并行运行,因为命名实体识别管道不依赖于其他任何管道(分块和嵌入管道)的输出。来看看这张图片:
基本术语
提取图表
提取图是一种多阶段数据管道,可从任何类型的数据中转换或提取信息。它们之所以被称为提取图,是因为可以在单个管道中创建分支,因此它们可以像图而不是线性阶段序列一样运行。
提取策略
提取策略指的是提取器并将其绑定到提取图中。它们还用于对提取器进行参数化,根据不同的使用情况,提取器的配置也会略有不同。
打开另一个终端会话,确保虚拟环境也已激活。完成后,打开你最喜欢的文本编辑器,本文将使用 VS-code。
完成后创建一个名为 pipeline_graph.yaml 的文件,并在文件中添加以下内容:
name: "wiki_extraction_pipeline"
extraction_policies:
- extractor: "tensorlake/openai"
name: "entity-extractor"
input_params:
system_prompt: "Extract entities from text, and return the output in JSON format."
- extractor: "tensorlake/chunk-extractor"
name: "chunker"
input_params:
chunk_size: 1000
overlap: 100
- extractor: "tensorlake/minilm-l6"
name: "wikiembedding"
content_source: "chunker"
通过上面的代码,我们创建了一个名为 wiki_extractor_pipeline 的提取图。在这个提取图下,我们有提取策略,你可以把它们看作是提取图的主要流程。在本例中,我们有三个主要提取策略,具体如下
我们还了解了这些提取策略如何并行运行
你还可以注意到,这些提取策略的名称与我们安装的提取策略包相似。
还记得这个吗?
$ indexify-extractor download tensorlake/openai
$ indexify-extractor download tensorlake/minilm-l6
$ indexify-extractor download tensorlake/chunk-extractor
设置完成后,我们还需要设置 OpenAI API 密钥:
$ export OPENAI_API_KEY=xxxxxxxxxxxx
好了,现在我们可以开始创建管道了,创建一个名为 create_pipeline.py 的文件,在该文件中添加以下代码:
from indexify import IndexifyClient, ExtractionGraph
client = IndexifyClient()
def create_extraction_graph():
extraction_graph = ExtractionGraph.from_yaml_file("./pipeline_graph.yaml")
client.create_extraction_graph(extraction_graph)
if __name__ == "__main__":
create_extraction_graph()
请注意,我们使用创建的 yaml 文件来定义这些提取器和策略的结构。
完成后,我们就可以运行代码来创建我们的管道了,确保你在第三个终端,Indexify 服务器和提取器工作者在各自的终端运行。
在打开的第三个终端中,运行以下命令来执行脚本,以创建包含 yaml 文件中定义的所有提取器策略的提取器图。
$ python3 create_pipeline.py
完成后,你可以返回 Indexify 用户界面,使用侧边面板进行导航,下面是我拍摄的一些图片,向你展示这一点:
数据输入
最后,也是最重要的部分,Indexify 做了一项伟大的工作,即在你完成前面所有步骤后,将数据通过管道运行到矢量数据库的过程变得非常简单。在项目目录中创建一个文件,将其命名为 ingest.py,并在其中添加以下代码块。事实上,我不应该用 "代码块 "这个词,而应该用 "代码行"。
from indexify import IndexifyClient, ExtractionGraph
from langchain_community.document_loaders import WikipediaLoader
client = IndexifyClient()
def load_data(player):
docs = WikipediaLoader(query=player, load_max_docs=1).load()
for doc in docs:
client.add_documents("wiki_extraction_pipeline", doc.page_content)
if __name__ == "__main__":
load_data("Micheal Jordan")
load_data("Micheal Jackson")
在这里,我们只是使用维基百科文档加载器来获取一些有关Micheal Jordan。Micheal Jackson具体信息。你可以加载任意多的信息。
在运行代码之前,请确保导出环境变量,这是你的 OpenAI API 密钥,在此使用以下命令
$ export OPENAI_API_KEY=xxxxxxxxxxxxxxx
现在,让我们运行这些函数,看看会得到什么结果:
得益于 Indexify 的并行能力,这个过程很快,比我预期的还要快。
确保 Indexify 服务器和提取器工作者都在另外两个终端上运行。
现在,让我们回到浏览器中的 Indexify UI,在内容选项卡下,我们现在应该能看到一些内容:
点击其中一个创建的内容后,我得到了这样的结果:
你可以看到这里的文本,即分块提取器策略是如何对文本进行分块的
在索引选项卡下,你可以点击策略名称列下的 wikiembedding 链接。
你将看到以下内容
然后,你可以点击内容 ID 查看内容。
太棒了!现在你知道如何使用 Indexify 来摄取数据了,这里的数据来自互联网,可以是视频、音频、文本或 PDF,也可以是我们刚刚看到的在线内容。
接下来,我们将查询矢量存储,以获得存储的实体和块的列表。
查询 Indexify
我希望我们能以编程方式与我们的矢量索引进行交互,并获取所有已识别实体的列表,以及由我们分别传入提取图的提取策略创建的所有块。
要查询 Indexify,我们必须输入管道名称和我们要检索信息的提取策略名称。
为此,让我们创建一个名为 query_indexify.py 的文件,并在其中添加以下内容。
from indexify import IndexifyClient
client = IndexifyClient()
ingested_content_list = client.list_content("wiki_extraction_pipeline")
content_id = ingested_content_list[2].id
entities = client.get_extracted_content(
content_id,
"wiki_extraction_pipeline",
"entity-extractor")
print("########### Entities ###########")
print(entities)
chunks = client.get_extracted_content(
content_id,
"wiki_extraction_pipeline",
"chunker")
print("########### Chunks ###########")
print(chunks)
创建 RAG 应用程序
既然我们已经对数据进行了分块和嵌入,现在就可以开始创建一个简单的 RAG 应用程序了。我将向你展示如何使用 OpenAI 和 Llama-index 来完成。
使用 OpenAI 创建 RAG
要使用 OpenAI 创建 RAG 应用程序,请创建一个名为 rag_openai.py 的文件,并添加以下代码行:
from openai import OpenAI
from indexify import IndexifyClient
client = IndexifyClient()
client_openai = OpenAI(
api_key="your_openai_api_key",
)
def query_database(question: str, index: str, top_k=3):
retrieved_results = client.search_index(
name=index, query=question, top_k=top_k)
context = "\n-".join([item["text"] for item in retrieved_results])
print(context)
response = client_openai.chat.completions.create(
messages=[
{
"role": "user",
"content": f"Answer the question, based on the context.\n question: {question} \n context: {context}",
},
],
model="gpt-3.5-turbo",
)
return response.choices[0].message.content
if __name__ == "__main__":
index_name = "wiki_extraction_pipeline.wikiembedding.embedding"
indexes = client.indexes()
print(f"Vector indexes present: {indexes}, querying index: {index_name}")
print(
query_database(
"Compare Michael Jordan and Michael Jackson",
"wiki_extraction_pipeline.wikiembedding.embedding",
4,
)
)
使用 Indexify 和 Mistral 的 Streamlit RAG 应用程序
作为总结,我希望我们基于 Indexify、Llama-index 和用于 LLM 协调功能的 Mistral 创建一个简单的 Streamlit 应用程序。我的目的是让你对 Indexify 有一个更广阔的模块化设计视角。
首先安装以下库
$ pip install streamlit python-decouple llama-index
$ pip install llama-index-llms-mistralai
你还需要一个 Mistral API 密钥。
获得 API 密钥后,创建一个 .env 文件并将其添加到文件中。该 .env 文件应位于项目的根目录下。
.env 文件的内容应如下:
MISTRAL_API_KEY=xxxxxxxxxxxxxxxxxx
完成后,打开 rag_llama_index.py 文件并添加以下内容:
import streamlit as st
from llama_index.llms.mistralai import MistralAI
from llama_index.core.llms import ChatMessage
from indexify import IndexifyClient
from decouple import config
llm = MistralAI(api_key=config("MISTRAL_API_KEY"))
client = IndexifyClient()
def query_database(question: str, index: str, top_k=3):
retrieved_results = client.search_index(
name=index, query=question, top_k=top_k)
context = "\n-".join([item["text"] for item in retrieved_results])
return context
def ai_responses(user_query):
context = query_database(
user_query, "wiki_extraction_pipeline.wikiembedding.embedding", 4)
print(context)
messages = [
ChatMessage(role="system",
content=(
"You are a helpful AI assistant. Answer user queries based on the context.\n"
f"Context: {context}"
)),
ChatMessage(role="user", content=user_query),
]
resp = llm.stream_chat(messages)
for r in resp:
yield r.delta
st.title("Hello, Welcome To Indexify")
# Initialize chat history
if "messages" not in st.session_state:
st.session_state.messages = []
# Display chat messages from history on app rerun
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Accept user input
if prompt := st.chat_input("What is up?"):
# Display user message in chat message container
with st.chat_message("user"):
st.markdown(prompt)
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
# Display assistant response in chat message container
with st.chat_message("assistant"):
response = st.write_stream(ai_responses(prompt))
# Add assistant response to chat history
st.session_state.messages.append(
{"role": "assistant", "content": response})
现在,进入终端运行以下命令:
$ streamlit run rag_llama_index.py
将链接复制并粘贴到浏览器中,完成后就可以进行测试了。以下是我提出的一些问题:
以下是我终端中的输出结果,可以看到 Indexity 正在检索相关上下文,Mistral LLM 在 Llama-index 的协调下使用这些上下文来回答用户问题。
结论
在这篇文章中,我们探讨了使用 Indexify 构建数据提取管道,了解了提取图、提取策略,并为我们的 LLM 应用程序构建了整个数据提取管道。