在LlamaIndex中引入Airbyte源

2023年08月30日 由 alex 发表 365 0

内容


现在可以直接在基于LlamaIndex的应用程序中利用Airbyte的Gong、Hubspot、Salesforce、Shopify、Stripe、Typeform和Zendesk Support源,作为数据加载器实现。


特se2


例如,要加载用户的Stripe发票,您可以使用AirbyteStripeLoader。安装它非常简单,只需在本地安装LlamaIndex,然后安装您感兴趣的源即可开始使用:


pip install airbyte-source-stripe
pip install llama-hub


之后,只需下载加载器并传入配置和要加载的流即可:


from llama_hub.airbyte_stripe.base import AirbyteStripeReader
config = {
  "client_secret": "<secret key>",
  "account_id": "<account id>",
  "start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020–10–20T00:00:00Z>"
}
reader = AirbyteStripeReader(config=config)
documents = reader.load_data(stream_name="invoices")


为什么这很重要?


这标志着将Airbyte的300多个数据源作为数据加载器提供给LlamaHub的开端。


Airbyte可以将数据从几乎任何数据源移动到您的数据仓库或向量数据库,为您的LLM用例提供支持。通常情况下,使用Airbyte Cloud或本地Airbyte实例,设置连接,并按计划运行(或通过API触发)以确保数据保持新鲜。


但是,如果您刚开始并且正在本地运行所有内容,使用完整的Airbyte实例(包括UI、调度服务、扩展能力等)可能过于冗长。


通过此版本,您可以在Python运行时直接在LlamaIndex中运行任何基于Python的数据源,而无需启动Airbyte实例或进行API调用到Airbyte Cloud。


在托管和嵌入式Airbyte之间切换


由于底层运行相同的代码,因此每个Airbyte构建的加载器与Airbyte服务中的相应源兼容。这意味着将嵌入式加载流程转移到自托管的Airbyte安装或Airbyte Cloud实例中非常简单。加载器配置对象的模式与输出记录的模式完全兼容。


在托管Airbyte上运行同步时,意味着:


1. 可以使用UI跟踪正在运行的流水线


2. 可通过事件通知来警报同步失败或运行后同步操作


3. 可轻松按计划运行流水线


4. 具有扩展能力


5. 可提供支持程序化用例的API


6. 内置连接状态管理


7. 支持等等


8. 更多


在使用LlamaIndex加载器进行同步时,意味着:


1. 不需要为运行另一个服务而增加负载


2. 对于时间和流水线执行具有完全控制权


结合Airbyte加载器与索引和查询引擎


由于Airbyte加载器的行为类似于常规加载器,因此可以轻松与所有LlamaIndex实用工具结合使用,构建强大的LLM-based应用程序。


relevant_keys = ["customer_name", "total", "currency"]
reader = AirbyteStripeReader(
    config=strip_config,
    record_handler=lambda record, id: Document(
        doc_id=id,
        text=record.data["description"] or "",
        extra_info={
            key: record.data[key] for key in relevant_keys if key in record.data
        },
    ),
)
index = ListIndex.from_documents(reader.load_data(stream_name="invoices"))
query_engine = index.as_query_engine()
question = input("What do you want to know about your customers?")
print(query_engine.query(question))


增量加载


由于您的Python应用程序基本上充当了Airbyte平台,因此您可以完全控制“同步”是如何执行的。例如,如果您的流支持增量同步,则可以通过访问加载器的“last_state”属性仍然从中受益。这样可以仅加载自上次加载以来发生更改的文档,从而有效地更新现有的向量数据库:


import airbyte_cdk.models.airbyte_protocol import AirbyteMessage
with open('stripe_sync_checkpoint.json', 'w') as file:
  file.write(reader.last_state.json())
# later
with open('stripe_sync_checkpoint.json', 'r') as file:
  current_state = AirbyteStateMessage.parse_raw(file.read())
new_docs = reader.load_data(stream_name="invoices", state=current_state)


将 Airbyte 记录映射到 LlamaIndex 文档


默认情况下,每个记录都会作为加载器的一部分映射到一个Document中,记录中的所有不同字段将成为Document的extra_info属性的一部分(extra_info表示每个文档的结构化元数据)。文档的文本部分设置为记录的JSON表示。默认情况下,在文档中定义的任何元数据都将与下游模块中的文本连接在一起,因此记录中的所有字段都将用于在LlamaIndex应用程序中进行嵌入和合成。您可以传递一个记录处理器来自定义此行为,根据数据构建记录的文本部分:


def handle_record(record, id):
  return Document(doc_id=id, text=record.data["title"], extra_info=record.data)
reader = AirbyteGongReader(config=gong_config, record_handler=handle_record)


定制来源


目前,以下Airbyte源已经作为pip软件包提供(更多软件包将陆续推出):


1. gong pip install airbyte-source-gong


2. hubspot pip install airbyte-source-hubspot


3. salesforce pip install airbyte-source-salesforce


4. shopify pip install airbyte-source-shopify


5. stripe pip install airbyte-source-stripe


6. typeform pip install airbyte-source-typeform


7. zendesk Support pip install airbyte-source-zendesk-support


但是,如果您已经实现了自己的自定义Airbyte源,也可以使用AirbyteCDKReader基类将其集成,该基类与Airbyte CDK的源接口兼容:


from llama_index import download_loader
from my_source.source import MyCustomSource # plug in your own source here
AirbyteCDKReader = download_loader(AirbyteCDKReader)
config = {
  # your custom configuration
}
reader = AirbyteCDKReader(source_class=MyCustomSource, config=config)
documents = reader.load_data(stream_name="my-stream")


您还可以通过直接从git安装来安装主Airbyte存储库中的源代码。例如,要获取Github源代码,只需运行以下命令:


pip install "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"


之后,该源代码就可以被应用到AirbyteCDKLoader中了。


from source_github.source import SourceGithub
issues_loader = AirbyteCDKReader(source_class=SourceGithub, config=config)
documents = reader.load_data(stream_name="issues")


文章来源:https://medium.com/llamaindex-blog/introducing-airbyte-sources-within-llamaindex-42209071722f
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消