内容
现在可以直接在基于LlamaIndex的应用程序中利用Airbyte的Gong、Hubspot、Salesforce、Shopify、Stripe、Typeform和Zendesk Support源,作为数据加载器实现。
例如,要加载用户的Stripe发票,您可以使用AirbyteStripeLoader。安装它非常简单,只需在本地安装LlamaIndex,然后安装您感兴趣的源即可开始使用:
pip install airbyte-source-stripe
pip install llama-hub
之后,只需下载加载器并传入配置和要加载的流即可:
from llama_hub.airbyte_stripe.base import AirbyteStripeReader
config = {
"client_secret": "<secret key>",
"account_id": "<account id>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020–10–20T00:00:00Z>"
}
reader = AirbyteStripeReader(config=config)
documents = reader.load_data(stream_name="invoices")
为什么这很重要?
这标志着将Airbyte的300多个数据源作为数据加载器提供给LlamaHub的开端。
Airbyte可以将数据从几乎任何数据源移动到您的数据仓库或向量数据库,为您的LLM用例提供支持。通常情况下,使用Airbyte Cloud或本地Airbyte实例,设置连接,并按计划运行(或通过API触发)以确保数据保持新鲜。
但是,如果您刚开始并且正在本地运行所有内容,使用完整的Airbyte实例(包括UI、调度服务、扩展能力等)可能过于冗长。
通过此版本,您可以在Python运行时直接在LlamaIndex中运行任何基于Python的数据源,而无需启动Airbyte实例或进行API调用到Airbyte Cloud。
在托管和嵌入式Airbyte之间切换
由于底层运行相同的代码,因此每个Airbyte构建的加载器与Airbyte服务中的相应源兼容。这意味着将嵌入式加载流程转移到自托管的Airbyte安装或Airbyte Cloud实例中非常简单。加载器配置对象的模式与输出记录的模式完全兼容。
在托管Airbyte上运行同步时,意味着:
1. 可以使用UI跟踪正在运行的流水线
2. 可通过事件通知来警报同步失败或运行后同步操作
3. 可轻松按计划运行流水线
4. 具有扩展能力
5. 可提供支持程序化用例的API
6. 内置连接状态管理
7. 支持等等
8. 更多
在使用LlamaIndex加载器进行同步时,意味着:
1. 不需要为运行另一个服务而增加负载
2. 对于时间和流水线执行具有完全控制权
结合Airbyte加载器与索引和查询引擎
由于Airbyte加载器的行为类似于常规加载器,因此可以轻松与所有LlamaIndex实用工具结合使用,构建强大的LLM-based应用程序。
relevant_keys = ["customer_name", "total", "currency"]
reader = AirbyteStripeReader(
config=strip_config,
record_handler=lambda record, id: Document(
doc_id=id,
text=record.data["description"] or "",
extra_info={
key: record.data[key] for key in relevant_keys if key in record.data
},
),
)
index = ListIndex.from_documents(reader.load_data(stream_name="invoices"))
query_engine = index.as_query_engine()
question = input("What do you want to know about your customers?")
print(query_engine.query(question))
增量加载
由于您的Python应用程序基本上充当了Airbyte平台,因此您可以完全控制“同步”是如何执行的。例如,如果您的流支持增量同步,则可以通过访问加载器的“last_state”属性仍然从中受益。这样可以仅加载自上次加载以来发生更改的文档,从而有效地更新现有的向量数据库:
import airbyte_cdk.models.airbyte_protocol import AirbyteMessage
with open('stripe_sync_checkpoint.json', 'w') as file:
file.write(reader.last_state.json())
# later
with open('stripe_sync_checkpoint.json', 'r') as file:
current_state = AirbyteStateMessage.parse_raw(file.read())
new_docs = reader.load_data(stream_name="invoices", state=current_state)
默认情况下,每个记录都会作为加载器的一部分映射到一个Document中,记录中的所有不同字段将成为Document的extra_info属性的一部分(extra_info表示每个文档的结构化元数据)。文档的文本部分设置为记录的JSON表示。默认情况下,在文档中定义的任何元数据都将与下游模块中的文本连接在一起,因此记录中的所有字段都将用于在LlamaIndex应用程序中进行嵌入和合成。您可以传递一个记录处理器来自定义此行为,根据数据构建记录的文本部分:
def handle_record(record, id):
return Document(doc_id=id, text=record.data["title"], extra_info=record.data)
reader = AirbyteGongReader(config=gong_config, record_handler=handle_record)
定制来源
目前,以下Airbyte源已经作为pip软件包提供(更多软件包将陆续推出):
1. gong pip install airbyte-source-gong
2. hubspot pip install airbyte-source-hubspot
3. salesforce pip install airbyte-source-salesforce
4. shopify pip install airbyte-source-shopify
5. stripe pip install airbyte-source-stripe
6. typeform pip install airbyte-source-typeform
7. zendesk Support pip install airbyte-source-zendesk-support
但是,如果您已经实现了自己的自定义Airbyte源,也可以使用AirbyteCDKReader基类将其集成,该基类与Airbyte CDK的源接口兼容:
from llama_index import download_loader
from my_source.source import MyCustomSource # plug in your own source here
AirbyteCDKReader = download_loader(AirbyteCDKReader)
config = {
# your custom configuration
}
reader = AirbyteCDKReader(source_class=MyCustomSource, config=config)
documents = reader.load_data(stream_name="my-stream")
您还可以通过直接从git安装来安装主Airbyte存储库中的源代码。例如,要获取Github源代码,只需运行以下命令:
pip install "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"
之后,该源代码就可以被应用到AirbyteCDKLoader中了。
from source_github.source import SourceGithub
issues_loader = AirbyteCDKReader(source_class=SourceGithub, config=config)
documents = reader.load_data(stream_name="issues")