使用PyTorch Profiler和TensorBoard解决数据输入管道上的瓶颈

2023年08月30日 由 alex 发表 835 0

这是关于GPU-based PyTorch工作负载的性能分析和优化的文章。本文中我们将聚焦于训练数据输入流水线。在典型的训练应用中,主机的CPU负责加载、预处理和整理数据,然后将其送入GPU进行训练。输入流水线中的瓶颈问题会出现在主机无法跟上GPU速度的情况下。这将导致GPU(训练设置中最昂贵的资源)在等待过载的主机提供数据输入时处于空闲状态。在本文我们详细讨论了输入流水线中的瓶颈问题,并给出不同的解决方法,例如:


1. 选择具有更适合你工作负载的CPU到GPU计算比率的训练实例。


3. 通过将部分CPU的预处理活动转移到GPU上改善CPU和GPU之间的工作负载平衡。


4. 将一些CPU计算卸载到辅助CPU工作设备上。


特色


当然,解决数据输入流水线的性能瓶颈的第一步是要识别和理解它。在本文中,我们将演示如何使用PyTorch Profiler及其相关的TensorBoard插件来做到这一点。


我们将定义一个简单的PyTorch模型,并以迭代的方式对其性能进行剖析,识别瓶颈并尝试解决它们。我们将在Amazon EC2 g5.2xlarge实例上运行实验(包含NVIDIA A10G GPU和8个虚拟CPU),并使用官方的AWS PyTorch 2.0 Docker镜像。请记住,我们描述的一些行为在不同版本的PyTorch之间可能会有所不同。


玩具模型


在接下来的代码块中,我们介绍了我们将用于演示的玩具示例。 我们首先定义一个简单的图像分类模型。 模型的输入是一个批次的256x256 YUV图像,输出是其相关的批次的语义类别预测。


from math import log2
import torch
import torch.nn as nn
import torch.nn.functional as F
img_size = 256
num_classes = 10
hidden_size = 30
# toy CNN classification model
class Net(nn.Module):
    def __init__(self, img_size=img_size, num_classes=num_classes):
        super().__init__()
        self.conv_in = nn.Conv2d(3, hidden_size, 3, padding='same')
        num_hidden = int(log2(img_size))
        hidden = []
        for i in range(num_hidden):
            hidden.append(nn.Conv2d(hidden_size, hidden_size, 3, padding='same'))
            hidden.append(nn.ReLU())
            hidden.append(nn.MaxPool2d(2))
        self.hidden = nn.Sequential(*hidden)
        self.conv_out = nn.Conv2d(hidden_size, num_classes, 3, padding='same')
    def forward(self, x):
        x = F.relu(self.conv_in(x))
        x = self.hidden(x)
        x = self.conv_out(x)
        x = torch.flatten(x, 1)
        return x


下面的代码块包含了我们的数据集定义。我们的数据集包含一万个jpeg图像文件路径及其相关的(随机生成的)语义标签。为了简化我们的演示,我们假设所有的jpeg文件路径都指向同一张图像 —— 文章开头的彩色“瓶颈”图片。


import numpy as np
from PIL import Image
from torchvision.datasets.vision import VisionDataset
input_img_size = [533, 800]
class FakeDataset(VisionDataset):
    def __init__(self, transform):
        super().__init__(root=None, transform=transform)
        size = 10000
        self.img_files = [f'0.jpg' for i in range(size)]
        self.targets = np.random.randint(low=0,high=num_classes,
                                         size=(size),dtype=np.uint8).tolist()
    def __getitem__(self, index):
        img_file, target = self.img_files[index], self.targets[index]
        with torch.profiler.record_function('PIL open'):
            img = Image.open(img_file)
        if self.transform is not None:
            img = self.transform(img)
        return img, target
    def __len__(self):
        return len(self.img_files)


请注意,我们使用torch.profiler.record_function上下文管理器包装了文件读取器。


我们的输入数据流水线对图像进行了以下转换:


1. PILToTensor将PIL图像转换为PyTorch Tensor。


2. RandomCrop在图像中随机偏移处返回一个256x256的裁剪。


3. RandomMask是一个自定义转换,它创建一个随机的256x256布尔掩码,并将其应用于图像。该转换包括对掩码进行四邻域膨胀操作。


4. ConvertColor是一个自定义转换,将图像格式从RGB转换为YUV。


5. Scale是一个自定义转换,将像素值缩放到[0,1]范围内。


class RandomMask(torch.nn.Module):
    def __init__(self, ratio=0.25):
        super().__init__()
        self.ratio=ratio
    def dilate_mask(self, mask):
        # perform 4 neighbor dilation on mask
        with torch.profiler.record_function('dilation'):
            from scipy.signal import convolve2d
            dilated = convolve2d(mask, [[0, 1, 0],
                                     [1, 1, 1],
                                     [0, 1, 0]], mode='same').astype(bool)
        return dilated
    def forward(self, img):
        with torch.profiler.record_function('random'):
            mask = np.random.uniform(size=(img_size, img_size)) < self.ratio
        dilated_mask = torch.unsqueeze(torch.tensor(self.dilate_mask(mask)),0)
        dilated_mask = dilated_mask.expand(3,-1,-1)
        img[dilated_mask] = 0.
        return img
    def __repr__(self):
        return f"{self.__class__.__name__}(ratio={self.ratio})"
class ConvertColor(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.A=torch.tensor(
            [[0.299, 0.587, 0.114],
             [-0.16874, -0.33126, 0.5],
             [0.5, -0.41869, -0.08131]]
        )
        self.b=torch.tensor([0.,128.,128.])
    def forward(self, img):
        img = img.to(dtype=torch.get_default_dtype())
        img = torch.matmul(self.A,img.view([3,-1])).view(img.shape)
        img = img + self.b[:,None,None]
        return img
    def __repr__(self):
        return f"{self.__class__.__name__}()"

class Scale(object):
    def __call__(self, img):
        return img.to(dtype=torch.get_default_dtype()).div(255)
    def __repr__(self):
        return f"{self.__class__.__name__}()"


我们使用Compose类链式连接这些转换,我们稍微修改了它,以便在每个转换调用周围包含一个torch.profiler.record_function上下文管理器。


import torchvision.transforms as T
class CustomCompose(T.Compose):
    def __call__(self, img):
        for t in self.transforms:
            with torch.profiler.record_function(t.__class__.__name__):
                img = t(img)
        return img
transform = CustomCompose(
    [T.PILToTensor(),
     T.RandomCrop(img_size),
     RandomMask(),
     ConvertColor(),
     Scale()])


在下面的代码块中,我们定义了数据集和数据加载器。我们配置了DataLoader使用一个自定义的collate函数,在该函数中我们使用torch.profiler.record_function上下文管理器包装了默认的collate函数。


train_set = FakeDataset(transform=transform)
def custom_collate(batch):
    from torch.utils.data._utils.collate import default_collate
    with torch.profiler.record_function('collate'):
        batch = default_collate(batch)
    image, label = batch
    return image, label

train_loader = torch.utils.data.DataLoader(train_set, batch_size=256,
                                           collate_fn=custom_collate,
                                           num_workers=4, pin_memory=True)


最后,我们定义了模型、损失函数、优化器和训练循环,我们使用一个性能分析器上下文管理器将它们包装起来。


from statistics import mean, variance
from time import time
device = torch.device("cuda:0")
model = Net().cuda(device)
criterion = nn.CrossEntropyLoss().cuda(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
model.train()
t0 = time()
times = []
with torch.profiler.profile(
    schedule=torch.profiler.schedule(wait=10, warmup=2, active=10, repeat=1),
    on_trace_ready=torch.profiler.tensorboard_trace_handler('/tmp/prof'),
    record_shapes=True,
    profile_memory=True,
    with_stack=True
) as prof:
    for step, data in enumerate(train_loader):
        with torch.profiler.record_function('h2d copy'):
            inputs, labels = data[0].to(device=device, non_blocking=True), \
                             data[1].to(device=device, non_blocking=True)
        if step >= 40:
            break
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()
        prof.step()
        times.append(time()-t0)
        t0 = time()
print(f'average time: {mean(times[1:])}, variance: {variance(times[1:])}')



在接下来的部分中,我们将使用PyTorch Profiler及其相关的TensorBoard插件来评估我们模型的性能。我们的重点将放在性能分析器报告的跟踪视图上。


初始性能结果


我们定义的脚本报告的平均步骤时间为1.3秒,平均GPU利用率非常低,只有18.21%。在下面的图片中,我们捕获了作为TensorBoard插件跟踪视图中显示的性能结果:


1


我们可以看到,每第4个训练步骤中都包含着一个长时间(约5.5秒)的数据加载期间,在这个期间GPU完全空闲。这种情况之所以发生在每4个步骤中,与我们选择的DataLoader worker的数量直接相关 —— 四个。每4个步骤中,我们发现所有worker都在忙于为下一个批次产生样本,而GPU在等待。这明显表明数据输入流水线存在瓶颈。问题是要如何分析它?复杂化的是,我们在代码中插入的许多record_function标记在分析跟踪中无处可寻。


DataLoader中使用多个worker是优化性能的关键。不幸的是,这也使得性能分析过程更加困难。虽然存在支持多进程分析的性能分析工具(例如,可以查看VizTracer),但我们在本文中的方法是在单进程模式下运行、分析和优化我们的模型(即,没有DataLoader worker),然后将优化方法应用于多worker模式。诚然,优化独立函数的速度并不能保证多个(并行)调用相同函数的性能也会得到改善。但正如我们将在本文中看到的,这种策略将使我们能够识别和解决一些核心问题,这些问题在其他方式下我们无法识别,并且至少在这里讨论的问题中,我们会发现两种模式上性能影响之间存在很强的相关性。但就在我们应用这种策略之前,让我们调整一下我们对worker数量的选择。


优化1:调整多进程策略


确定多进程/多线程应用程序中线程或进程的最佳数量可能有些棘手。一方面,如果我们选择一个过低的数字,可能会浪费CPU资源。另一方面,如果我们选择的数字过高,就有可能出现thrashing的情况,这是一种不希望出现的情况,操作系统在大部分时间内都在处理多线程/多进程,而非运行我们的代码。对于PyTorch训练工作负载,建议测试不同的DataLoader num_workers设置。一个好的起点是根据主机上的CPU数量来设置此数字(例如,num_workers:=num_cpus/num_gpus)。在我们的例子中,Amazon EC2 g5.2xlarge有八个虚拟CPU,实际上将DataLoader worker的数量增加到八个可以略微提高平均步骤时间,为1.17秒(提高了11%)。


重要的是,还要注意其他一些不太明显的配置设置可能会影响数据输入流水线使用的线程或进程数。例如,常用于计算机视觉工作负载中的图像预处理的opencv-python库,包括cv2.setNumThreads(int)函数来控制线程的数量。


在下面的图片中,我们捕获了在将num_workers设置为零时运行脚本时的部分跟踪视图。


1-2


以这种方式运行脚本使我们能够看到我们设置的record_function标签,并且可以识别出RandomMask转换,或者更具体地说是我们的膨胀函数,作为检索每个单独样本中耗时最长的操作。


优化2:优化膨胀函数


我们当前的膨胀函数实现使用了2D卷积,通常使用矩阵乘法实现,而这在CPU上运行并不特别快。一种选择是在GPU上运行膨胀函数(如本文所述)。但是,主机设备事务涉及的开销可能超过这种解决方案的潜在性能增益,更不用说我们希望不增加GPU的负载。


在下面的代码块中,我们提出了一个替代方案,更加友好于CPU的膨胀函数实现,它使用布尔运算而不是卷积:


def dilate_mask(self, mask):
        # perform 4 neighbor dilation on mask
        with torch.profiler.record_function('dilation'):
            padded = np.pad(mask, [(1,1),(1,1)])
            dilated = padded[0:-2,1:-1] | padded[1:-1,1:-1] | padded[2:,1:-1] | padded[1:-1,0:-2]| padded[1:-1,2:]
        return dilated


在进行这个修改后,我们的步骤时间降至0.78秒,额外提高了50%。下面是更新的单进程Trace-View显示:


1-3


我们可以看到,膨胀操作已经显著缩小,而最耗时的操作现在是PILToTensor转换。


对PILToTensor函数进行更详细的观察,我们可以看到它包含三个基本操作:


1. 加载PIL图像 —— 由于Image.open的惰性加载属性,在这里加载图像。


2. 将PIL图像转换为numpy数组。


3. 将numpy数组转换为PyTorch Tensor。


尽管图像加载占据了大部分时间,但我们注意到对完整大小的图像应用后续操作只是为了立即对其进行裁剪是非常浪费的。这带给我们下一个优化点。


优化3:重新排序转换


幸运的是,RandomCrop转换可以直接应用于PIL图像,使我们能够将图像尺寸的缩小作为流水线上的第一个操作进行:


transform = CustomCompose(
    [T.RandomCrop(img_size),
     T.PILToTensor(),
     RandomMask(),
     ConvertColor(),
     Scale()])


在进行这个优化之后,我们的步骤时间降至0.72秒,额外优化了8%。下面的Trace视图捕获显示RandomCrop转换现在是主要操作:


1-4


实际上,与之前一样,实际上是PIL图像(延迟)加载导致了瓶颈,而不是随机裁剪。


理想情况下,我们希望能够通过将读取操作限制在我们感兴趣的裁剪区域来进一步优化。不幸的是,在撰写本文时,torchvision不支持此选项。 在将来的文章中,我们将演示如何通过实现自己的自定义decode_and_crop PyTorch运算符来克服这个缺点。


优化4:应用批次转换


在我们当前的实现中,每个图像变换都是在每个图像上独立应用的。然而,一些转换可能在一次性应用于整个批次时运行得更加高效。在下面的代码块中,我们修改了我们的流水线,使得ColorTransformation和Scale转换在我们的自定义collate函数中应用于图像批次:


def batch_transform(img):
    img = img.to(dtype=torch.get_default_dtype())
    A = torch.tensor(
        [[0.299, 0.587, 0.114],
         [-0.16874, -0.33126, 0.5],
         [0.5, -0.41869, -0.08131]]
    )
    b = torch.tensor([0., 128., 128.])
    A = torch.broadcast_to(A, ([img.shape[0],3,3]))
    t_img = torch.bmm(A,img.view(img.shape[0],3,-1))
    t_img = t_img + b[None,:, None]
    return t_img.view(img.shape)/255
def custom_collate(batch):
    from torch.utils.data._utils.collate import default_collate
    with torch.profiler.record_function('collate'):
        batch = default_collate(batch)
    image, label = batch
    with torch.profiler.record_function('batch_transform'):
        image = batch_transform(image)
    return image, label


这个改变实际上导致了步骤时间的轻微增加,达到了0.75秒。虽然对于我们的玩具模型来说没有什么帮助,但将某些操作作为批次转换而不是每个样本转换的能力具有优化某些工作负载的潜力。


结果


我们在本文中应用的连续优化措施使运行时性能提高了80%。然而,尽管不那么严重,输入流水线仍存在瓶颈,GPU仍然被严重低效使用(约30%)。


总结


在本文中,我们聚焦于训练数据输入流水线的性能问题。我们选择了PyTorch Profiler及其相关的TensorBoard插件作为我们的工具,并展示了它们在加速训练速度方面的使用。特别是,我们展示了在零worker下运行DataLoader如何增强我们识别、分析和优化数据输入流水线中的瓶颈的能力。





文章来源:https://towardsdatascience.com/solving-bottlenecks-on-the-data-input-pipeline-with-pytorch-profiler-and-tensorboard-5dced134dbe9
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
写评论取消
回复取消