计算机视觉有几个子领域,而图像分割就是其中之一。如果你正在对图像进行分割,那么你就是在决定图像中哪些内容是可见的——这可以通过在像素级别对图像进行分类(classification)来实现——或者通过在像素级别从图像中推断相关的实数值信息(当执行回归任务时)来实现。
在图像分割领域,一个突出的架构是U-Net。该全卷积架构因其形状而得名,它首先对图像进行压缩,然后扩展得到输出结果。在这个压缩路径中,模型会构建一个学习到的特征的层次结构,而跳跃连接(skip connections)则有助于在扩展路径中将这些特征转换回相关的模型输出。
本文将重点介绍其实用实现。今天,我们将学习如何从零开始构建一个U-Net架构。我们将使用TensorFlow和Keras来完成这一任务。首先,我们将从高层次上简要回顾U-Net的组件。然后,我们将逐步介绍如何自己实现U-Net。最后,我们将使用Oxford-IIIT宠物数据集从头开始训练该网络,向你展示可以实现的效果以及如何进一步改进!
什么是U-Net?
当你向一位计算机视觉工程师询问图像分割时,U-Net这个词很可能会出现在他们的解释中。
U-Net以其形状而得名,是一种卷积架构,最初由Ronneberger等人(2015年)提出,用于生物医学科学领域。更具体地说,它用于细胞分割,并且与该领域之前使用的方法相比效果非常好。
U-Net由三个组件组组成:
请注意,在原始的U-Net架构中,输出的宽度和高度低于输入的宽度和高度(572x572像素与388x388像素)。这是由架构本身导致的,可以通过使用另一种默认架构(如ResNet)作为你的骨干架构来避免。
通过使用U-Net等架构,可以学习对特定图像重要的特征,同时利用这些信息生成更高分辨率的输出。表示像素级别类别索引的映射图就是这样的输出。通过继续阅读,你将学习如何构建一个U-Net来实现这一目标!
使用 TensorFlow 和 Keras 构建 U-Net 模型
既然你已经从高层次上理解了 U-Net 的工作原理,那么是时候构建一个了。打开你的集成开发环境 (IDE),创建一个 Python 文件(例如 unet.py),或者打开一个 Jupyter Notebook。同时,确保你已经安装了所需的依赖项,接下来将介绍这些依赖项。然后,我们就可以开始编写代码了!
前提条件
要运行今天的代码,重要的是你需要在环境中安装一些依赖项。
首先,你需要一个较新的 Python 版本——最好是 3.12+。
此外,你还需要安装 tensorflow 和 matplotlib。这些可以通过 pip 包管理器进行安装。安装完成后,你就可以开始了。
当今的结构
构建 U-Net 模型可以分为三个独立的任务,除了指定模型导入外:
之后,你将把所有内容合并成一个完整的可运行程序。
让我们从模型配置开始。
导入库
你的代码的第一行将涵盖你在代码其余部分中需要的导入。让我们简要介绍一下:
import os
import tensorflow
from tensorflow.keras.layers import Conv2D,\
MaxPool2D, Conv2DTranspose, Input, Activation,\
Concatenate, CenterCrop
from tensorflow.keras import Model
from tensorflow.keras.initializers import HeNormal
from tensorflow.keras.optimizers import schedules, Adam
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.utils import plot_model
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt
U-Net 配置定义
在我看来,将各种配置选项散布在模型中是一种不好的做法。相反,我更喜欢将它们定义在一个地方,这样我就可以在整个模型中重复使用它们(而且,如果我需要将模型部署到生产环境中,我可以例如通过 JSON 环境变量提供我的配置,该变量可以很容易地作为字典读取到 Python 中)。以下是配置定义的样子。下面,我们将讨论各个组件:
'''
U-NET CONFIGURATION
'''
def configuration():
''' Get configuration. '''
return dict(
data_train_prc = 80,
data_val_prc = 90,
data_test_prc = 100,
num_filters_start = 64,
num_unet_blocks = 3,
num_filters_end = 3,
input_width = 100,
input_height = 100,
mask_width = 60,
mask_height = 60,
input_dim = 3,
optimizer = Adam,
loss = SparseCategoricalCrossentropy,
initializer = HeNormal(),
batch_size = 50,
buffer_size = 50,
num_epochs = 50,
metrics = ['accuracy'],
dataset_path = os.path.join(os.getcwd(), 'data'),
class_weights = tensorflow.constant([1.0, 1.0, 2.0]),
validation_sub_splits = 5,
lr_schedule_percentages = [0.2, 0.5, 0.8],
lr_schedule_values = [3e-4, 1e-4, 1e-5, 1e-6],
lr_schedule_class = schedules.PiecewiseConstantDecay
)
回顾一下,数据集必须被划分为训练集、验证集和测试集。训练集是最大且最主要的数据集,允许你在训练过程中进行前向和后向传递以及优化。然而,由于你已经见过这个数据集,因此在训练过程中会使用验证集来评估每个 epoch 后的性能。最后,由于模型最终也可能在验证集上过拟合,因此存在一个测试集,该测试集在训练过程中根本不使用。相反,它用于模型评估,以检查你的模型在未见过的数据上的表现。如果模型在未见过的数据上表现良好,那么它更有可能在现实世界中也表现良好。
在你的模型配置中,data_train_prc、data_val_prc 和 data_test_prc 用于表示特定划分的结束百分比。在上述配置中,80、90 和 100 意味着你的数据集的 0-80% 将用于训练目的,80-90%(即总共 10%)用于验证,90-100%(也是 10%)用于测试。你稍后会看到,以这种方式指定它们是很好的,因为 tfds.load 允许我们将两个数据集(训练/测试)重新组合并划分为三个!
在第一个 U-Net 卷积块中生成的特征图数量将是 64。总共,你的网络将由 3 个 U-Net 块组成(上面的草图有 5 个,但我们发现 3 个在这个数据集上效果更好),并且在最终的 1x1 卷积层中将有 3 个特征图。它被设置为 3,因为我们的数据集有三个可能的类别可以分配给每个像素——换句话说,它应该等于你数据集中的类别数量。
我们的输入图像的宽度和高度将是 100 像素。输入的维度将是 3 个通道(它是一个 RGB 图像)。
输出掩码的宽度和高度将是 60 像素。确实,在原始的 U-Net 架构中,输入和输出大小并不相等!
在模型方面,我们使用 Adam 优化器、稀疏分类交叉熵和 He 正态初始化。对于 Adam 优化器,我们使用一种称为分段常数衰减(PiecewiseConstantDecay)的学习率调度。这种调度确保在预定义的训练时间后,学习率被设置为预配置的值。我们从 3e-4(即 0.0003)的学习率开始,并在训练的 20%、50% 和 80% 后分别降低到 1e-4、1e-5 和 1e-6。降低学习率将帮助你更好地向最优解移动。
在训练方面,我们生成 50 个像素的批次,并使用 50 的缓冲区大小进行洗牌,然后训练模型 50 个 epoch。
作为额外的指标,我们使用准确率。
我们的数据集将位于当前工作目录的 data 子文件夹中。为了验证目的,使用了 5 个子划分。
当你使用不平衡的数据集进行训练时,为目标预测分配类别权重可能是一个好主意。这将使权重较低的类别得到更多的重视。
好了,那是最重要但相对枯燥的部分。现在让我们构建一些 U-Net 块吧!
U-Net 构建块
回顾一下,U-Net 由一个收缩路径组成,该路径本身由卷积块构建而成,以及一个由上采样块构建的扩展路径。在每个单独的层级(除了收缩路径中连接到扩展路径头部的最后一个层级)上,卷积块的输出通过跳跃连接连接到上采样块。
你将从构建卷积块开始,并在收缩路径中创建许多这样的块。然后,你将为上采样块和扩展路径做同样的事情。
卷积块
以下是你的 conv_block 的结构:
'''
U-NET BUILDING BLOCKS
'''
def conv_block(x, filters, last_block):
'''
U-Net convolutional block.
Used for downsampling in the contracting path.
'''
config = configuration()
# First Conv segment
x = Conv2D(filters, (3, 3),\
kernel_initializer=config.get("initializer"))(x)
x = Activation("relu")(x)
# Second Conv segment
x = Conv2D(filters, (3, 3),\
kernel_initializer=config.get("initializer"))(x)
x = Activation("relu")(x)
# Keep Conv output for skip input
skip_input = x
# Apply pooling if not last block
if not last_block:
x = MaxPool2D((2, 2), strides=(2,2))(x)
return x, skip_input
根据Ronneberger等人(2015年)的论文,每个卷积块(convolutional block)由两个3×3的卷积层(convolutional layers)组成,每个卷积层的输出都会经过ReLU激活函数处理。根据该配置,由于使用了ReLU激活函数,因此采用了He初始化方法。
该卷积块的结构是重复应用两次3×3卷积(不进行填充的卷积),每次卷积后都接一个修正线性单元(ReLU)以及一个步幅为2的2×2最大池化(max pooling)操作以实现下采样。
Ronneberger等人(2015年)
回顾上图可知,在每一层中,卷积块中卷积操作的输出会作为跳跃连接(skip connection)传递给对应层级上采样块(upsampling block)中的第一个上采样层。
同时,会对相同的输出应用最大池化操作,以便该输出能够被下一个卷积块使用。
在上面的代码中,你可以看到卷积层的输出被赋值给skip_input。随后,如果这不是最后一个卷积块,你会看到应用了池化大小为2×2、步幅为2的MaxPool2D操作。
处理后的张量x和跳跃连接skip_input都会被返回。需要注意的是,这种情况也发生在最后一层。真正重要的是我们如何处理返回的值,你会看到在构建完整的收缩路径时,当它是最后一层时,我们不会使用跳跃连接。
收缩路径和跳跃连接
让我们创建另一个名为contracting_path的函数。在这个函数中,你将构建属于收缩路径的卷积块。根据你上面的代码,这些卷积块将在其层次级别上执行特征学习,随后执行最大池化操作,为下一个卷积块准备张量。
在原始的U-Net中,在每个“下采样步骤”(即最大池化,尽管严格来说,常规卷积也是一个下采样步骤)中,特征通道的数量都会加倍。
在每个下采样步骤中,我们将特征通道的数量加倍。
Ronneberger等人(2015年)
在创建收缩路径时,你需要考虑到这一点。这就是为什么你将使用实用函数compute_number_of_filters(你接下来会定义它)来计算每个卷积块中使用的滤波器数量。给定起始数量为64,对于你今天正在构建的3块U-Net(根据你的模型配置),那将是64、128和256。对于Ronneberger等人(2014年)中的原始5块U-Net,那将是64、128、256、512和1024。
接下来,你创建一个列表,用于存储卷积提供的张量。它作为跳跃连接的容器。
现在,是创建实际块的时候了。通过使用enumerate,你可以创建一个枚举器,该枚举器输出(索引,值),并且你正在这样做以创建一个for循环,该循环同时提供块编号(索引)和该特定块中的滤波器数量(block_num_filters)。在循环中,你检查它是否是最后一个块,并让输入通过卷积块,根据卷积块的级别设置滤波器数量。
然后,如果它不是最后一个块,你将skip_input添加到skip_inputs容器中。
最后,你返回x(它现在已经通过了整个收缩路径)和在执行此操作时产生的跳跃连接张量skip_inputs。
def contracting_path(x):
'''
U-Net contracting path.
Initializes multiple convolutional blocks for
downsampling.
'''
config = configuration()
# Compute the number of feature map filters per block
num_filters = [compute_number_of_filters(index)\
for index in range(config.get("num_unet_blocks"))]
# Create container for the skip input Tensors
skip_inputs = []
# Pass input x through all convolutional blocks and
# add skip input Tensor to skip_inputs if not last block
for index, block_num_filters in enumerate(num_filters):
last_block = index == len(num_filters)-1
x, skip_input = conv_block(x, block_num_filters,\
last_block)
if not last_block:
skip_inputs.append(skip_input)
return x, skip_inputs
实用函数:计算特征图的数量
在contracting_path的定义中,你使用了compute_number_of_filters来计算在特定卷积块中必须使用的滤波器数量/必须生成的特征图数量。
这个实用函数实际上非常简单:你取第一个卷积块中的滤波器数量(根据你的模型配置,这是64),然后将其乘以2的level次方。例如,在第三层(索引 = 2)时,你的卷积块有64 x 2² = 256个滤波器。
def compute_number_of_filters(block_number):
'''
Compute the number of filters for a specific
U-Net block given its position in the contracting path.
'''
return configuration().get("num_filters_start") * (2 ** block_number)
上采样块
到目前为止,你已经编写了用于对输入数据进行下采样的代码。现在是时候为扩展路径构建模块了。让我们添加另一个函数,你将其称为upconv_block。它接受一些输入、预期的滤波器数量、与上采样块的层次级别相对应的跳跃输入张量,以及关于它是否是最后一个块的信息。
根据U-Net的设计,第一步是执行上采样。例如,在右侧的图像中,一个52x52x512的张量被上采样为一个104x104x512的张量。
在计算机视觉模型中,有两种主要的上采样方法:
因此,对你的输入张量x进行的第一个处理是通过Conv2DTranspose进行上采样。
然后,让我们讨论以下重要细节——应用于跳跃连接(skip connection)的裁剪。
请注意,在任意级别L的卷积块的输出的前两个维度的形状大于相应上采样块中这些维度的形状。例如,在下面的示例中,你看到一个形状为136x136像素的跳跃连接必须与一个104x104像素的张量连接。
扩展路径中的每个步骤都包括对特征图进行上采样,然后进行2x2卷积(“上卷积”),将特征通道数减半,与收缩路径中相应裁剪的特征图连接,然后进行两个3x3卷积,每个卷积后面都跟着一个ReLU。
Ronneberger等人(2015)
这是不可能的。Ronneberger等人(2015)在U-Net的原始实现中,通过从卷积块生成的特征图中取中心裁剪来缓解这个问题。这个中心裁剪与上采样后的张量具有相同的宽度和高度;在我们的例子中,这是104x104像素。现在,两个张量可以连接了。
为了进行这种裁剪,你使用TensorFlow的CenterCrop层,从上采样张量指定的目标宽度和高度出发,从跳跃输入中取中心裁剪。
然后,你使用Concatenate层将裁剪后的跳跃输入与上采样后的张量连接起来,之后你可以继续处理整个张量。根据Ronneberger等人(2015)的论文和上面的引用,这一步使用两个3x3卷积,每个卷积后面都跟着一个ReLU激活函数。
在最后一层,使用1x1卷积将每个64分量的特征向量映射到所需的类别数。
Ronneberger等人(2015)
最后,在最后一层,你应用一个1x1卷积(保持宽度和高度维度不变),输出一个第三维度为C的张量。在这里,C代表所需的类别数——这是我们在模型配置中作为num_filters_end所拥有的,确实,对于今天的数据集来说,这是三个类别!:)
以下是创建你的上采样块的代码:
def upconv_block(x, filters, skip_input, last_block = False):
'''
U-Net upsampling block.
Used for upsampling in the expansive path.
'''
config = configuration()
# Perform upsampling
x = Conv2DTranspose(filters//2, (2, 2), strides=(2, 2),\
kernel_initializer=config.get("initializer"))(x)
shp = x.shape
# Crop the skip input, keep the center
cropped_skip_input = CenterCrop(height = x.shape[1],\
width = x.shape[2])(skip_input)
# Concatenate skip input with x
concat_input = Concatenate(axis=-1)([cropped_skip_input, x])
# First Conv segment
x = Conv2D(filters//2, (3, 3),
kernel_initializer=config.get("initializer"))(concat_input)
x = Activation("relu")(x)
# Second Conv segment
x = Conv2D(filters//2, (3, 3),
kernel_initializer=config.get("initializer"))(x)
x = Activation("relu")(x)
# Prepare output if last block
if last_block:
x = Conv2D(config.get("num_filters_end"), (1, 1),
kernel_initializer=config.get("initializer"))(x)
return x
使用跳跃连接的扩展路径
与收缩路径类似,你也需要组合扩展路径中的上采样层。
与收缩路径类似,你还需要计算扩展路径中各块的过滤器数量。不过,这次你是从末尾开始计数——即,从块的数量减一开始,因为你是从大量的过滤器减少到少量的过滤器。
然后,你遍历过滤器数量,判断是否是最后一个块,并计算要从中获取跳跃输入的层级,然后将张量通过你的上采样块。
现在,如果你将张量输入到所有已组合的块中,它们将完整地通过收缩路径和扩展路径。是时候将你的U-Net组件缝合在一起了!
def expansive_path(x, skip_inputs):
'''
U-Net expansive path.
Initializes multiple upsampling blocks for upsampling.
'''
num_filters = [compute_number_of_filters(index)\
for index in range(configuration()\
.get("num_unet_blocks")-1, 0, -1)]
skip_max_index = len(skip_inputs) - 1
for index, block_num_filters in enumerate(num_filters):
skip_index = skip_max_index - index
last_block = index == len(num_filters)-1
x = upconv_block(x, block_num_filters,\
skip_inputs[skip_index], last_block)
return x
U-Net 构建器
这是一个相对简单的函数。它通过配置的输入数据的高度、宽度和维度来构建输入形状,然后将其传递给一个 Input 层——这是 TensorFlow 表示输入数据的方式。
然后,你的输入会通过 contracting_path(收缩路径),该路径会生成收缩后的数据以及每个卷积块的输出,用于跳跃连接。
这些输出随后被传递给 expansive_path(扩展路径),该路径生成扩展后的数据。请注意,我们选择显式地不建模 Softmax 激活函数,因为按照 TensorFlow 的建议,我们将其推到损失函数中。最后,我们使用输入数据作为起点,扩展后的数据作为终点,来初始化 Model 类。该模型被命名为 U-Net。
def build_unet():
''' Construct U-Net. '''
config = configuration()
input_shape = (config.get("input_height"),\
config.get("input_width"), config.get("input_dim"))
# Construct input layer
input_data = Input(shape=input_shape)
# Construct Contracting path
contracted_data, skip_inputs = contracting_path(input_data)
# Construct Expansive path
expanded_data = expansive_path(contracted_data, skip_inputs)
# Define model
model = Model(input_data, expanded_data, name="U-Net")
return model
U-Net 训练过程函数
既然你已经创建了模型构建块,那么是时候开始创建用于训练 U-Net 的函数了。这些是你将要创建的函数:
初始化模型
你有一个用于创建模型的函数。然而,那只是一个框架——因为模型需要使用损失函数进行初始化,需要配置优化器,等等。
因此,让我们创建一个名为 init_model 的函数,该函数允许你执行这些操作。它接受每个 epoch 的步数(steps per epoch),这些步数来自你稍后将添加的数据集配置。
在该定义中发生以下操作:
'''
U-NET TRAINING PROCESS BUILDING BLOCKS
'''
def init_model(steps_per_epoch):
'''
Initialize a U-Net model.
'''
config = configuration()
model = build_unet()
# Retrieve compilation input
loss_init = config.get("loss")(from_logits=True)
metrics = config.get("metrics")
num_epochs = config.get("num_epochs")
# Construct LR schedule
boundaries = [int(num_epochs * percentage * steps_per_epoch)\
for percentage in config.get("lr_schedule_percentages")]
lr_schedule = config.get("lr_schedule_class")(boundaries, config.get("lr_schedule_values"))
# Init optimizer
optimizer_init = config.get("optimizer")(learning_rate = lr_schedule)
# Compile the model
model.compile(loss=loss_init, optimizer=optimizer_init, metrics=metrics)
# Plot the model
plot_model(model, to_file="unet.png")
# Print model summary
model.summary()
return model
这是你的模型(在视觉上)呈现的样子。确实,它是个 U 型!
加载数据集
在今天的教程中,为了训练你的模型,你将使用 Parkhi 等人(2012)发布的 Oxford-IIT Pets 数据集:
我们创建了一个包含 37 个类别的宠物数据集,每个类别大约有 200 张图像。这些图像在尺度、姿态和光照方面有很大的变化。所有图像都有相关的品种、头部感兴趣区域(ROI)和像素级三值图(trimap)分割的标注。
—— Parkhi 等人(2012)
我们使用这个数据集是因为它在 TensorFlow 数据集中可用,这使得加载它更加容易,并且它自带了分割掩码(segmentation mask)。例如,下面是一张输入图像及其对应的分割掩码:
加载数据集非常简单。由于 TensorFlow 数据集仅包含训练数据和测试数据,并且由于你需要三个数据划分(训练集、验证集和测试集),你将根据模型配置重新定义数据划分,并将其传递给 tfds.load。通过返回信息(with_info=True),你将能够稍后读取一些有趣的元数据。
def load_dataset():
''' Return dataset with info. '''
config = configuration()
# Retrieve percentages
train = config.get("data_train_prc")
val = config.get("data_val_prc")
test = config.get("data_test_prc")
# Redefine splits over full dataset
splits = [f'train[:{train}%]+test[:{train}%]',\
f'train[{train}%:{val}%]+test[{train}%:{val}%]',\
f'train[{val}%:{test}%]+test[{val}%:{test}%]']
# Return data
return tfds.load('oxford_iiit_pet:3.*.*', split=splits, data_dir=configuration()\
.get("dataset_path"), with_info=True)
数据预处理
数据集在用于深度学习模型之前需要进行预处理。这就是为什么今天的教程也需要你编写一些预处理代码。更准确地说,你将执行以下预处理步骤:
现在,让我们为每个步骤编写代码。
执行图像归一化只需将张量转换为 float32 格式并除以 255.0。除此之外,你还需要从掩码的类别中减去 1,因为它们的范围是 1-3,而我们希望它们的范围是 0-2:
def normalize_sample(input_image, input_mask):
''' Normalize input image and mask class. '''
# Cast image to float32 and divide by 255
input_image = tensorflow.cast(input_image, tensorflow.float32) / 255.0
# Bring classes into range [0, 2]
input_mask -= 1
return input_image, input_mask
接下来,你在样本级别预处理的定义中实现这一操作。输入图像会被调整为模型配置中指定的大小,掩码(mask)也同样如此。最后,输入图像和掩码都会被归一化处理,并返回。
def preprocess_sample(data_sample):
''' Resize and normalize dataset samples. '''
config = configuration()
# Resize image
input_image = tensorflow.image.resize(data_sample['image'],\
(config.get("input_width"), config.get("input_height")))
# Resize mask
input_mask = tensorflow.image.resize(data_sample['segmentation_mask'],\
(config.get("mask_width"), config.get("mask_height")))
# Normalize input image and mask
input_image, input_mask = normalize_sample(input_image, input_mask)
return input_image, input_mask
数据增强(Data augmentation)允许 TensorFlow 对输入张量(Tensors)执行任意的图像操作。在今天的教程中,你将通过随机水平翻转和垂直翻转样本(samples)来实现数据增强。我们在调用这些操作时使用相同的随机种子(seed),以确保输入和标签以相同的方式进行变换。
def data_augmentation(inputs, labels):
''' Perform data augmentation. '''
# Use the same seed for deterministic randomness over both inputs and labels.
seed = 36
# Feed data through layers
inputs = tensorflow.image.random_flip_left_right(inputs, seed=seed)
inputs = tensorflow.image.random_flip_up_down(inputs, seed=seed)
labels = tensorflow.image.random_flip_left_right(labels, seed=seed)
labels = tensorflow.image.random_flip_up_down(labels, seed=seed)
return inputs, labels
接下来是计算样本权重。给定每个类别的权重,你通过 reduce_sum 计算这些类别权重的相对比例(或相对影响力)。随后,你计算每个类别的样本权重,并将其作为额外的数组返回,以便在 model.fit 中使用。
def compute_sample_weights(image, mask):
''' Compute sample weights for the image given class. '''
# Compute relative weight of class
class_weights = configuration().get("class_weights")
class_weights = class_weights/tensorflow.reduce_sum(class_weights)
# Compute same-shaped Tensor as mask with sample weights per
# mask element.
sample_weights = tensorflow.gather(class_weights,indices=\
tensorflow.cast(mask, tensorflow.int32))
return image, mask, sample_weights
最后,你可以将上述所有定义整合到数据集级别的预处理中。根据数据集类型的不同,预处理方式也会有所不同:
def preprocess_dataset(data, dataset_type, dataset_info):
''' Fully preprocess dataset given dataset type. '''
config = configuration()
batch_size = config.get("batch_size")
buffer_size = config.get("buffer_size")
# Preprocess data given dataset type.
if dataset_type == "train" or dataset_type == "val":
# 1. Perform preprocessing
# 2. Cache dataset for improved performance
# 3. Shuffle dataset
# 4. Generate batches
# 5. Repeat
# 6. Perform data augmentation
# 7. Add sample weights
# 8. Prefetch new data before it being necessary.
return (data
.map(preprocess_sample)
.cache()
.shuffle(buffer_size)
.batch(batch_size)
.repeat()
.map(data_augmentation)
.map(compute_sample_weights)
.prefetch(buffer_size=tensorflow.data.AUTOTUNE))
else:
# 1. Perform preprocessing
# 2. Generate batches
return (data
.map(preprocess_sample)
.batch(batch_size))
训练回调
接下来要写一些实用函数。如果你熟悉 TensorFlow,那么你可能了解 Keras 回调(callbacks)。这些回调可以在训练过程的特定步骤中执行某些操作。
今天,我们使用这些回调将 TensorBoard 日志记录集成到你的模型中。这样,你就能够在训练过程中以及训练结束后评估模型的进展和训练情况。
def training_callbacks():
''' Retrieve initialized callbacks for model.fit '''
return [
TensorBoard(
log_dir=os.path.join(os.getcwd(), "unet_logs"),
histogram_freq=1,
write_images=True
)
]
数据可视化
最后一个实用函数与数据可视化相关。我们希望了解模型的性能会如何,因此我们将构建一个可视化工具,用于显示源图像、实际掩码(mask)、预测掩码以及叠加在源图像之上的预测掩码。为此,我们需要创建一个函数,该函数能够根据模型预测生成掩码:
def probs_to_mask(probs):
''' Convert Softmax output into mask. '''
pred_mask = tensorflow.argmax(probs, axis=2)
return pred_mask
在第三个维度(即通道维度)上,它只是选取具有最大值的类别索引并返回该索引。实际上,这等同于选择一个类别。
你将这个功能集成到 generate_plot 函数中,该函数使用 Matplotlib 来生成四个图,分别是源图像、实际掩码、预测掩码以及叠加后的图像:
def generate_plot(img_input, mask_truth, mask_probs):
''' Generate a plot of input, truthy mask and probability mask. '''
fig, axs = plt.subplots(1, 4)
fig.set_size_inches(16, 6)
# Plot the input image
axs[0].imshow(img_input)
axs[0].set_title("Input image")
# Plot the truthy mask
axs[1].imshow(mask_truth)
axs[1].set_title("True mask")
# Plot the predicted mask
predicted_mask = probs_to_mask(mask_probs)
axs[2].imshow(predicted_mask)
axs[2].set_title("Predicted mask")
# Plot the overlay
config = configuration()
img_input_resized = tensorflow.image.resize(img_input, (config.get("mask_width"), config.get("mask_height")))
axs[3].imshow(img_input_resized)
axs[3].imshow(predicted_mask, alpha=0.5)
axs[3].set_title("Overlay")
# Show the plot
plt.show()
将所有内容整合到一个可运行的示例中
最后一步是将所有内容整合到一个可运行的示例中:
def main():
''' Run full training procedure. '''
# Load config
config = configuration()
batch_size = config.get("batch_size")
validation_sub_splits = config.get("validation_sub_splits")
num_epochs = config.get("num_epochs")
# Load data
(training_data, validation_data, testing_data), info = load_dataset()
# Make training data ready for model.fit and model.evaluate
train_batches = preprocess_dataset(training_data, "train", info)
val_batches = preprocess_dataset(validation_data, "val", info)
test_batches = preprocess_dataset(testing_data, "test", info)
# Compute data-dependent variables
train_num_samples = tensorflow.data.experimental.cardinality(training_data).numpy()
val_num_samples = tensorflow.data.experimental.cardinality(validation_data).numpy()
steps_per_epoch = train_num_samples // batch_size
val_steps_per_epoch = val_num_samples // batch_size // validation_sub_splits
# Initialize model
model = init_model(steps_per_epoch)
# Train the model
model.fit(train_batches, epochs=num_epochs, batch_size=batch_size,\
steps_per_epoch=steps_per_epoch, verbose=1,
validation_steps=val_steps_per_epoch, callbacks=training_callbacks(),\
validation_data=val_batches)
# Test the model
score = model.evaluate(test_batches, verbose=0)
print(f'Test loss: {score[0]} / Test accuracy: {score[1]}')
# Take first batch from the test images and plot them
for images, masks in test_batches.take(1):
# Generate prediction for each image
predicted_masks = model.predict(images)
# Plot each image and masks in batch
for index, (image, mask) in enumerate(zip(images, masks)):
generate_plot(image, mask, predicted_masks[index])
if index > 4:
break
if __name__ == '__main__':
main()
训练我们的 U-Net
现在,让我们训练我们的模型!打开一个终端,导航到你的 Python 脚本所在的位置,然后运行它。你应该会很快看到训练过程开始 :)
当我从头开始训练(即使用 He 初始化权重)我们的 U-Net 时,训练结果如下:
使用我们的模型生成的图像分割示例
回想一下,训练完成后,模型会从测试集中选取一些示例并输出结果。以下是你们的 U-Net 将产生的结果:
通过模型预训练提升模型性能
尽管有些示例(比如狗)生成了相当不错的叠加效果,但其他示例(比如其中一只猫)的预测结果就差很多。
其中一个关键原因是数据集的大小——尽管 Pets 数据集已经是一个相对较大的数据集,但与其他更贴近实际应用的数据集相比,它还是太小了。虽然数据增强可能已经改善了结果,但它并不是一种能解决所有问题的神奇方法。
然而,除了增加数据集的大小之外,还有一种方法也有效——即不使用随机初始化的权重,而是使用预训练的模型。例如,可以使用 ImageNet 数据集对模型进行预训练。这样,你的模型就已经学会了检测特定的模式,并允许你使用这些预训练的权重来初始化模型。
有很多软件包允许你使用当代卷积神经网络(如 ResNet 等)作为骨干网络,为 TensorFlow 和 Keras 构建 U-Net。更好的是,它们会为这些骨干网络生成权重,让你能够从一个更好的起点开始!