TFX方式:在生产管道中验证数据

2024年06月26日 由 alex 发表 138 0

数据验证是一种守护系统,它将验证数据是否具有适合管道使用的格式。


阅读本文,了解为什么验证在 ML 管道中至关重要以及机器学习验证的 5 个阶段。


TensorFlow 数据验证

TensorFlow 数据验证(TFDV)是 TFX 生态系统的一部分,可用于验证 ML 管道中的数据。


TFDV 通过比较训练数据和服务数据,计算描述性统计、模式并识别异常。这可以确保训练数据和服务数据保持一致,不会在管道中破坏或产生意外预测。


谷歌希望 TFDV 从 ML 流程的最初阶段就开始使用。因此,他们确保 TFDV 可以与笔记本一起使用。我们在这里也要这样做。


首先,我们需要使用 pip 安装 tensorflow-data-validation 库。最好创建一个虚拟环境,然后开始安装。


需要注意的是: 安装前,请确保 TFX 库的版本兼容性


pip install tensorflow-data-validationdata-validation


以下是我们在数据验证过程中要遵循的步骤:


  1. 从训练数据生成统计数据
  2. 从训练数据推断模式
  3. 生成评估数据的统计数据并与训练数据进行比较
  4. 识别和修复异常
  5. 检查漂移和数据偏差
  6. 保存模式


我们将在此使用 3 种数据集:训练数据、评估数据和服务数据,以模拟实时使用情况。使用训练数据训练 ML 模型。评估数据又称测试数据,是数据的一部分,用于在训练阶段结束后立即测试模型。服务数据将在生产环境中呈现给模型,以便进行预测。


第 1 步:准备工作

我们将使用来自 Kaggle 的泰坦尼克号宇宙飞船数据集。


4


数据由数值和分类数据混合组成。它是一个分类数据集,类标签为Transported。它保存的值为 True 或 False。


5


完成必要的导入,并定义 csv 文件的路径。实际数据集包含训练数据和测试数据。我手动引入了一些错误,并将文件保存为 "titanic_test_anomalies.csv"。


在这里,我们将使用 ANOMALOUS_DATA 作为评估数据,使用 TEST_DATA 作为服务数据。


import tensorflow_data_validation as tfdv
import tensorflow as tf
TRAIN_DATA = '/data/titanic_train.csv'
TEST_DATA = '/data/titanic_test.csv'
ANOMALOUS_DATA = '/data/titanic_test_anomalies.csv'


第 2 步:从训练数据生成统计数据

第1步是分析训练数据并确定其统计属性。TFDV 具有 generate_statistics_from_csv 函数,可直接从 csv 文件读取数据。如果数据是 TFRecord 文件,TFDV 还有一个 generate_statistics_from_tfrecord 函数。


visualize_statistics(可视化统计)函数会显示一个 8 点摘要,以及一些有用的图表,帮助我们了解数据的基本统计信息。这就是 Facets 视图。一些需要我们注意的关键细节用红色突出显示。这里还有大量其他分析数据的功能。


# Generate statistics for training data
train_stats=tfdv.generate_statistics_from_csv(TRAIN_DATA)
tfdv.visualize_statistics(train_stats)


6


在这里,我们可以看到年龄和 RoomService 特征中的缺失值需要进行估算。我们还看到 RoomService 有 65.52% 的零值。这是特定数据的分布方式,因此我们不认为这是异常现象,继续向前推进。


第3步:从训练数据推断模式

在圆满解决所有问题后,我们就可以使用函数 infer_schema 来推断模式。


schema=tfdv.infer_schema(statistics=train_stats)
tfdv.display_schema(schema=schema)


模式通常分为两部分。第一部分介绍数据类型、存在、有效性及其域等详细信息。第二部分介绍域构成的值。


7


8


第4步:生成评估数据的统计数据并与训练数据进行比较

现在我们获取评估数据并生成统计数据。我们需要了解如何处理异常数据,因此我们将使用 ANOMALOUS_DATA 作为评估数据。我们手动将异常情况引入该数据。


生成统计数据后,我们将数据可视化。可视化可以单独用于评估数据(就像我们对训练数据所做的那样),但将评估数据的统计数据与训练数据的统计数据进行比较更有意义。这样,我们就能了解评估数据与训练数据的不同之处。


# Generate statistics for evaluation data
eval_stats=tfdv.generate_statistics_from_csv(ANOMALOUS_DATA)
tfdv.visualize_statistics(lhs_statistics = train_stats, rhs_statistics = eval_stats,
                          lhs_name = "Training Data", rhs_name = "Evaluation Data")


9


在这里,我们可以看到评估数据中没有 RoomService 特征(大红旗)。其他特征看起来还不错,因为它们的分布与训练数据类似。


不过,在生产环境中,仅靠目测是不够的,所以我们要请 TFDV 进行实际分析并报告是否一切正常。


第 5 步:识别并修复异常情况

下一步是验证从评估数据中获得的统计数据。我们将把它与使用训练数据生成的模式进行比较。display_anomalies 函数将为我们提供 TFDV 已识别异常的表格视图和说明。


# Identifying Anomalies
anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)


10


从表中我们可以看到,我们的评估数据缺少两列(Transported 和 RoomService),目的地特征的域中多了一个名为 "异常 "的值(训练数据中没有),CryoSleep 和 VIP 特征的值为 "TRUE "和 "FALSE",训练数据中没有,最后,有 5 个特征包含整数值,而模式期望的是浮点数。


这可真不少。那么,让我们开始吧。


修复异常有两种方法:要么(手动)处理评估数据以确保其符合模式,要么修改模式以确保这些异常被接受。同样,领域专家必须决定哪些异常可以接受,哪些必须进行数据处理。


让我们从 "目的地 "功能开始。我们发现了一个新值 "异常",它在训练数据的域列表中是缺失的。让我们把它添加到域中,并说它也是该特征的一个可接受值。


# Adding a new value for 'Destination'
destination_domain=tfdv.get_domain(schema, 'Destination')
destination_domain.value.append('Anomaly')
anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)


我们已经删除了这个异常点,异常点列表中不再显示它。让我们进入下一个异常。


11


观察 VIP 和 CryoSleep 领域,我们会发现训练数据中的值都是小写,而评估数据中的相同值都是大写。一种方法是对数据进行预处理,确保所有数据都转换为小写或大写。不过,我们将在域中添加这些值。由于 VIP 和 CryoSleep 使用相同的值集(true 和 false),我们将 CryoSleep 的域设置为使用 VIP 的域。


# Adding data in CAPS to domain for VIP and CryoSleep
vip_domain=tfdv.get_domain(schema, 'VIP')
vip_domain.value.extend(['TRUE','FALSE'])
# Setting domain of one feature to another
tfdv.set_domain(schema, 'CryoSleep', vip_domain)
anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)


12


将整数特征转换为浮点特征是相当安全的。因此,我们要求评估数据从训练数据的模式中推断数据类型。这就解决了与数据类型相关的问题。


# INT can be safely converted to FLOAT. So we can safely ignore it and ask TFDV to use schema
options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
eval_stats=tfdv.generate_statistics_from_csv(ANOMALOUS_DATA, stats_options=options)
anomalies=tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)


13


最后,我们发现了最后一组异常情况:训练数据中存在的两列在评估数据中缺失。


Transported "是类别标签,显然不会出现在评估数据中。为了解决训练和评估特征可能不同的情况,我们可以创建多个环境。在这里,我们创建了一个训练环境和一个服务环境。我们指定 "Transported "特征在训练环境中可用,但在服务环境中不可用。


# Transported is the class label and will not be available in Evaluation data.
# To indicate that we set two environments; Training and Serving
schema.default_environment.append('Training')
schema.default_environment.append('Serving')
tfdv.get_feature(schema, 'Transported').not_in_environment.append('Serving')
serving_anomalies_with_environment=tfdv.validate_statistics(
    statistics=eval_stats, schema=schema, environment='Serving')
tfdv.display_anomalies(serving_anomalies_with_environment)


RoomService "是服务环境中不存在的必要功能。这种情况需要领域专家进行人工干预。


继续解决问题,直到获得此输出。


14


所有异常都已解决


第6步:训练服务漂移和偏斜检测

下一步是检查漂移和偏斜。偏斜是由于数据分布不规则造成的。在训练模型的初期,其预测通常是完美的。然而,随着时间的推移,数据分布会发生变化,误分类错误也会开始增加,这就是所谓的漂移。这些问题都需要对模型进行重新训练。


L-infinity 距离用于测量偏斜和漂移。根据 L-infinity 距离设置阈值。如果训练环境中的分析特征与服务环境中的分析特征之间的差异超过了给定的阈值,则认为该特征发生了漂移。对于偏斜,也采用了类似的基于阈值的方法。在我们的示例中,我们将漂移和偏斜的阈值都设置为 0.01。


serving_stats = tfdv.generate_statistics_from_csv(TEST_DATA)
# Skew Comparator
spa_analyze=tfdv.get_feature(schema, 'Spa')
spa_analyze.skew_comparator.infinity_norm.threshold=0.01
# Drift Comparator
CryoSleep_analyze=tfdv.get_feature(schema, 'CryoSleep')
CryoSleep_analyze.drift_comparator.infinity_norm.threshold=0.01
skew_anomalies=tfdv.validate_statistics(statistics=train_stats, schema=schema,
                                        previous_statistics=eval_stats,
                                        serving_statistics=serving_stats)
tfdv.display_anomalies(skew_anomalies)


我们可以看到,"Spa "显示的偏斜水平是可以接受的(因为它没有被列入异常列表),但是 "CryoSleep "显示的漂移水平很高。在创建自动管道时,这些异常情况可用作自动模型再训练的触发器。


15


第7步:保存模式

解决所有异常后,模式可保存为工件,或保存在元数据存储库中,并可在 ML 管道中使用。


# Saving the Schema
from tensorflow.python.lib.io import file_io
from google.protobuf import text_format
file_io.recursive_create_dir('schema')
schema_file = os.path.join('schema', 'schema.pbtxt')
tfdv.write_schema_text(schema, schema_file)


# Loading the Schema
loaded_schema= tfdv.load_schema_text(schema_file)
loaded_schema


文章来源:https://medium.com/towards-data-science/validating-data-in-a-production-pipeline-the-tfx-way-9770311eb7ce
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
写评论取消
回复取消