让我们谈谈计算机视觉任务的一些真实案例。乍一看,分类问题似乎非常简单,而且这也是事实。但在现实世界中,你经常会遇到很多限制,比如模型的速度、大小以及能否在移动设备上运行的能力。此外,你可能还有几个任务,而为每个任务单独建立一个模型并不是最好的选择。至少在你可以优化系统架构并使用更少的模型时,你应该这样做。但是,你也不想失去准确性,对吗?
因此,当考虑到所有的限制和优化时,你的任务就变得更加复杂。我想举一个多类别分类问题的例子,即使在视觉上它们可能并不那么相似。
我将从一个简单的任务开始:分类判断一张图片是真正的纸质文件,还是屏幕上有某些文档的图像。这可能是一个平板电脑/手机或者一个大屏幕。
这个任务很简单。首先,从数据集中开始收集数据,确保数据能够代表性、干净且足够大。然后选择一个符合速度、准确性和可导出性等约束条件的模型,并使用普通的训练流程,只需注意数据的不平衡。这样可以获得相当不错的结果。
但是,假设现在你需要添加一个新的功能,使得你的模型可以分类判断输入是否是一个文档的图像,或者是一包薯片/罐头等非文档物品,或一些营销材料。这个任务与你最初的任务相比并不那么重要,也不那么困难。
这是我们数据集的结构:
dataset/
├── documents/
│ ├── img_1.jpg
| ...
│ └── img_100.jpg
├── screens/
│ ├── img_1.jpg
| ...
│ └── img_100.jpg
├── not a documents/
│ ├── img_1.jpg
│ ...
│ └── img_100.jpg
├── train.csv
├── val.csv
└── test.csv
而CSV文件结构:
documents/img_1.jpg | 0
not a document/img_1.jpg | 1
screens/img_1.jpg | 2
...
第一列包含图像的相对路径,第二列是类别标识。现在让我们讨论两种解决这个任务的方法。
三输出神经元方法(简单)
由于我们希望拥有一个最优的系统架构,我们不打算为每个小任务再添加一个新的二元分类器模型。首先想到的方法是将这个(不是文档类)作为第三个类别添加到我们的原始模型中,这样我们就会得到类别如下:'文档','屏幕','非文档'。
这是一个可行的选项,但也许这些任务的重要性不相等,而且在视觉上这些类别可能并不相似,你可能希望为分类层提取出稍微不同的特征。此外,我们不要忘记不要丢失原始任务的准确性是非常重要的。
采用二元分类的双头方法(自定义)
另一种方法是主要使用一个主干网络和两个二元分类的头部。这样我们就会有一个模型用于两个任务,每个任务都是独立的,并且我们可以对每个任务进行很好的控制。
速度实际上不会受到影响(我在一张图像上使用3060进行推理时,推理时间变慢了约5-7%),模型的大小会稍微增加(在我这里,导出到TFLlite后,模型大小从500KB增加到700KB)。我们的情况下另一个方便之处是加权损失函数,使得第一个头的损失函数的权重是第二头的N倍。这样我们可以确保我们的重点在第一个(主要)任务上,并且不容易在此任务上失去准确性。
以下是它的外观:
我正在使用SuffleNetV2来完成这个任务,并且我将这个架构分为两个部分,从最后的卷积层开始。每个head都有自己的最后一个卷积层、全局池化和用于分类的全连接层。
代码示例
现在我们了解了模型的架构,很明显我们需要对训练流程进行一些修改,从数据集生成器开始。在编写数据集和数据加载器的代码时,我们现在需要为每次迭代返回1张图像和2个标签。第一个标签将用于第一个head,第二个标签将用于第二个head,让我们看一个代码示例:
class CustomDataset(Dataset):
def __init__(
self,
root_path: Path,
split: pd.DataFrame,
train_mode: bool,
) -> None:
self.root_path = root_path
self.split = split
self.img_size = (256, 256)
self.norm = ([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
self._init_augs(train_mode)
def _init_augs(self, train_mode: bool) -> None:
if train_mode:
self.transform = transforms.Compose(
[
transforms.Resize(self.img_size),
transforms.Lambda(self._convert_rgb),
transforms.RandomRotation(10),
transforms.ToTensor(),
transforms.Normalize(*self.norm),
]
)
else:
self.transform = transforms.Compose(
[
transforms.Resize(self.img_size),
transforms.Lambda(self._convert_rgb),
transforms.ToTensor(),
transforms.Normalize(*self.norm),
]
)
def _convert_rgb(self, x: torch.Tensor) -> torch.Tensor:
return x.convert("RGB")
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, int, int]:
image_path, label = self.split.iloc[idx]
image = Image.open(self.root_path / image_path)
image.draft("RGB", self.img_size)
image = ImageOps.exif_transpose(image) # fix rotation
image = self.transform(image)
label_lcd = int(label == 2)
label_other = int(label == 1)
return image, label_lcd, label_other
def __len__(self) -> int:
return len(self.split)
我们只对__getitem__感兴趣,其中我们将标签分为label_lcd和label_other(我们的两个头)。label_lcd表示屏幕为1,其他情况为0。label_other表示非文档为1,其他情况为0。
对于我们的架构,我们有以下内容:
class CustomShuffleNet(nn.Module):
def __init__(self, n_outputs_1: int, n_outputs_2: int) -> None:
super(CustomShuffleNet, self).__init__()
self.base_model = models.shufflenet_v2_x0_5(
weights=models.ShuffleNet_V2_X0_5_Weights.DEFAULT
)
# Create head convolution layers
self.head1_conv = self._create_head_conv()
self.head2_conv = self._create_head_conv()
# Create fully connected layers for both heads
in_features = self.base_model.fc.in_features
del self.base_model.fc
self.fc1 = nn.Linear(in_features, n_outputs_1)
self.fc2 = nn.Linear(in_features, n_outputs_2)
def _create_head_conv(self) -> nn.Module:
return nn.Sequential(
nn.Conv2d(192, 1024, kernel_size=1, stride=1, bias=False),
nn.BatchNorm2d(1024),
nn.ReLU(inplace=True),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
x = self.base_model.conv1(x)
x = self.base_model.maxpool(x)
x = self.base_model.stage2(x)
x = self.base_model.stage3(x)
x = self.base_model.stage4(x)
# Pass through the separate convolutions for each head
x1 = self.head1_conv(x)
x1 = x1.mean([2, 3]) # globalpool for first head
out1 = self.fc1(x1)
x2 = self.head2_conv(x)
x2 = x2.mean([2, 3]) # globalpool for second head
out2 = self.fc2(x2)
return out1, out2
从最后一个卷积层(包括在内)开始,架构被分成两个并行的头部。现在模型有两个输出,因为我们需要。
训练循环:
def train(
train_loader: DataLoader,
val_loader: DataLoader,
device: str,
model: nn.Module,
loss_func: nn.Module,
optimizer: torch.optim.Optimizer,
scheduler: torch.optim.lr_scheduler,
epochs: int,
path_to_save: Path,
) -> None:
best_metric = 0
wandb.watch(model, log_freq=100)
for epoch in range(1, epochs + 1):
model.train()
with tqdm(train_loader, unit="batch") as tepoch:
for inputs, labels_1, labels_2 in tepoch:
inputs, labels_1, labels_2 = (
inputs.to(device),
labels_1.to(device),
labels_2.to(device),
)
tepoch.set_description(f"Epoch {epoch}/{epochs}")
optimizer.zero_grad()
outputs_1, outputs_2 = model(inputs)
loss_1 = loss_func(outputs_1, labels_1)
loss_2 = loss_func(outputs_2, labels_2)
loss = 2 * loss_1 + loss_2
loss.backward()
optimizer.step()
tepoch.set_postfix(loss=loss.item())
metrics = evaluate(
test_loader=val_loader, model=model, device=device, mode="val"
)
if scheduler is not None:
scheduler.step()
if metrics["f1_1"] > best_metric:
best_metric = metrics["f1_1"]
print("Saving new best model...")
path_to_save.parent.mkdir(parents=True, exist_ok=True)
torch.save(model.state_dict(), path_to_save)
wandb_logger(loss, metrics, mode="val")
我们从数据集中获得图像、label_1和label_2,将图像(实际上是一个批次)通过模型运行,然后计算两次损失(每个头输出1次)。我们将主要损失乘以2,以便专注于我们的“主”头部。当然,我们需要更改一些指标计算的方式来适应我们的双头模型你可以在仓库中找到一个完整的示例)。还有一个重要的事情是,我们根据从我们的“主”头部获得的度量保存我们的模型。
结果
将训练流水线中的F1分数进行比较没有意义,因为它们是针对3个类和2个类计算的,而我们对指标感兴趣。因此,我使用了一个特定的测试数据集,运行了两个模型,并分别比较了文档/屏幕任务和文档/非文档任务的精确度和召回率。
两个模型都使用256x256的输入尺寸,但我还添加了一个简单的3个输出神经元方法的320x320输入尺寸版本,因为它的推理时间与双头模型几乎相同,所以很有趣进行比较。第二个任务对于我的情况来说是一个简单的任务,两种方法得到了完全相同的结果,但是在主要任务中有一些差异。
+----------------------------+-----------+-----------+--------------+
| Model (img size) | Precision | Recall | Latency (s)* |
+----------------------------+-----------+-----------+--------------+
| Three output neurons (256) | 0.993 | 0.855 | 0.027 |
| Three output neurons (320) | 1.0 | 0.846 | 0.029 |
| Two heads (256) | 1.0 | 0.873 | 0.029 |
+----------------------------+-----------+-----------+--------------+
这是我们所需要的提升!双头模型在次要任务上具有相同的分数,但在主要任务上具有相同或更高的精度和召回率。而且这是在真实世界数据上得出的结果(而非来自训练/验证/测试集)。
总结
分类很简单,但面对真实世界的各种限制就变得更加困难。优化子任务,尽量避免为每个大任务创建K个模型。定制化模型和训练流程,以便更好地控制。测试假设,运行实验并保存结果(使用hydra、wandb等工具)。