公司背景
我们创建 Chronon 的目的是为了缓解 ML 从业人员的一个共同痛点:他们将大部分时间都花在了管理为模型提供支持的数据上,而不是建模本身。
在 Chronon 诞生之前,从业人员会使用以下两种方法之一:
Chronon 方法可实现两全其美。Chronon 只需要人工智能从业人员定义一次特征,就能同时支持用于模型训练的离线流程和用于模型推理的在线流程。此外,Chronon 还为特征链、可观测性和数据质量以及特征共享和管理提供了强大的工具。
工作原理
下面,我们将使用快速入门指南中的一个简单示例,探讨支持 Chronon 大部分功能的主要组件。
假设我们是一家大型在线零售商,我们根据用户购物后退货的情况检测到了欺诈向量。我们想训练一个模型来预测给定交易是否可能导致欺诈性退货。每次用户开始结账流程时,我们都将调用该模型。
定义特征
购买数据: 我们可以将购买日志数据汇总到用户级别,以便了解该用户之前在我们平台上的活动。具体来说,我们可以计算用户在不同时间窗口内之前购买金额的总和、数量和平均值。
source = Source(
events=EventSource(
table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily"data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
topic="events/purchases", # The streaming source topic
query=Query(
selects=select("user_id","purchase_price"), # Select the fields we care about
time_column="ts") # The event time
))
window_sizes = [Window(length=day, timeUnit=TimeUnit.DAYS) for day in [3, 14, 30]] # Define some window sizes to use below
v1 = GroupBy(
sources=[source],
keys=["user_id"], # We are aggregating by user
online=True,
aggregations=[Aggregation(
input_column="purchase_price",
operation=Operation.SUM,
windows=window_sizes
), # The sum of purchases prices in various windows
Aggregation(
input_column="purchase_price",
operation=Operation.COUNT,
windows=window_sizes
), # The count of purchases in various windows
Aggregation(
input_column="purchase_price",
operation=Operation.AVERAGE,
windows=window_sizes
), # The average purchases by user in various windows
Aggregation(
input_column="purchase_price",
operation=Operation.LAST_K(10),
), # The last 10 purchase prices aggregated as a list
],
)
这将创建一个 `GroupBy`,通过聚合不同时间窗口的不同字段,以 `user_id` 作为主键,将 `purchases` 事件数据转换为有用的特征。
这将原始购买日志数据转换为用户级别的有用特征。
用户数据: 将用户数据转化为特征数据要简单一些,主要是因为我们不必担心执行聚合。在这种情况下,源数据的主键与特征的主键相同,因此我们只需提取列值,而无需对行执行聚合:
source = Source(
entities=EntitySource(
snapshotTable="data.users", # This points to a table that contains daily snapshots of all users"data.users", # This points to a table that contains daily snapshots of all users
query=Query(
selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about
)
))
v1 = GroupBy(
sources=[source],
keys=["user_id"], # Primary key is the same as the primary key for the source table
aggregations=None, # In this case, there are no aggregations or windows to define
online=True,
)
这将创建一个 `GroupBy`,从 `data.users` 表中提取维度作为特征,并将 `user_id` 作为主键。
将这些特征连接在一起: 接下来,我们需要将之前定义的特征组合成一个视图,该视图既可以回填用于模型训练,也可以作为完整的向量在线用于模型推理。我们可以使用 "连接 "应用程序接口(Join API)来实现这一目标。
对于我们的用例来说,以正确的时间戳计算特征是非常重要的。因为我们的模型是在结账流开始时运行的,所以我们希望在回填中使用相应的时间戳,这样模型训练的特征值在逻辑上就与模型在在线推理中看到的一致。
定义如下所示。请注意,它结合了我们之前在 API 的 right_parts 部分中定义的特征(以及另一个名为 returns 的特征集)。
source = Source(
events=EventSource(
table="data.checkouts", "data.checkouts",
query=Query(
selects=select("user_id"), # The primary key used to join various GroupBys together
time_column="ts",
) # The event time used to compute feature values as-of
))
v1 = Join(
left=source,
right_parts=[JoinPart(group_by=group_by) for group_by in [purchases_v1, returns_v1, users]] # Include the three GroupBys
)
回填/离线计算
用户在使用上述 Join 定义时可能做的第一件事就是对其进行回填,以生成用于模型训练的历史特征值。Chronon 执行这种回填有几个主要优点:
在线计算
Chronon 为在线功能计算抽象了大量复杂性。在上述示例中,它会根据特征是批量特征还是流特征来计算特征。
批处理特征(例如上述用户特征)
由于用户功能建立在批处理表之上,Chronon 只需每天运行一次批处理作业,在批处理数据存储区出现新数据时计算新功能值,并将其上传到在线 KV 存储区以供使用。
流功能(例如上述购买功能)
购买 "功能建立在包含流组件的源上,如源中包含的 "主题 "所示。在这种情况下,除了实时更新的流作业外,Chronon 还将运行批量上传。批处理作业负责:
然后,流作业会向 KV 存储写入更新,以便在获取时保持特征值的最新状态。
在线服务/获取 API
Chronon 提供了一个应用程序接口(API),可在低延迟的情况下获取特征值。我们既可以获取单个 GroupBys(即上文定义的用户或购买功能)的值,也可以获取 Join 的值。下面是一个例子,说明了一个 Join 的请求和响应:
// Fetching all features for user=123
Map<String, String> keyMap = new HashMap<>();
keyMap.put("user", "123")
Fetcher.fetch_join(new Request("quickstart_training_set_v1", keyMap));
// Sample response (map of feature name to value)
'{"purchase_price_avg_3d":14.2341, "purchase_price_avg_14d":11.89352, ...}'
获取用户 123 所有特征的 Java 代码。返回类型是特征名称到特征值的映射。
上述示例使用的是 Java 客户端。此外还有 Scala 客户端和 Python CLI 工具,便于测试和调试:
run.py --mode=fetch -k '{"user_id":123}' -n quickstart/training_set -t join'{"user_id":123}' -n quickstart/training_set -t join
> {"purchase_price_avg_3d":14.2341, "purchase_price_avg_14d":11.89352, ...}
run.py 是快速测试 Chronon 工作流(如获取)的便捷方法。
另一种方法是将这些 API 包装成服务,通过 REST 端点发出请求。Airbnb 采用这种方法在 Ruby 等非 Java 环境中获取功能。
线上线下一致性
Chronon 不仅有助于提高在线-离线准确性,还提供了一种测量方法。测量管道从在线获取请求的日志开始。这些日志包括请求的主键和时间戳,以及获取的特征值。然后,Chronon 将主键和时间戳作为左侧传递给 Join backfill,要求计算引擎回填特征值。然后,它将回填值与实际获取值进行比较,以衡量一致性。
结论
开源只是第一步,我们的愿景是创建一个平台,让人工智能从业者能够就如何利用数据做出最佳决策,并尽可能轻松地实施这些决策。