在Amazon SageMaker上使用Ray实现有效负载平衡

2023年09月05日 由 alex 发表 471 0

本文重点讨论我们在训练深度学习模型时遇到的更常见的性能瓶颈之一,即数据预处理瓶颈。当我们的GPU(或替代加速器)-通常是我们训练设置中最昂贵的资源-在等待过度任务的CPU资源的数据输入时发现自己处于空闲时,就会出现数据预处理瓶颈。


1


在本文中关于这个问题,我们讨论并演示了解决这类瓶颈的不同方法,包括:


1. 选择一个CPU与GPU计算比更适合你的工作量的训练实例。


2. 通过将部分CPU操作转移到GPU来改善CPU和GPU之间的工作负载平衡。


3. 将一些CPU计算卸载到辅助CPU工作设备。


我们使用TensorFlow Data Service API演示了第三个选项,这是TensorFlow特定的解决方案,其中可以使用gRPC作为底层通信协议将部分输入数据处理卸载到其他设备上。


使用Ray进行负载均衡


在本文中,我们将展示另一种使用辅助CPU worker的方法,该方法旨在将通用解决方案的可靠性与tensorflow特定API的简单性和易用性结合起来。我们将演示的方法将使用来自光线数据库的光线数据集。通过充分利用Ray的资源管理和分布式调度系统,Ray Data能够以可扩展和分布式的方式运行我们的培训数据输入管道。特别地,我们将以这样一种方式配置我们的Ray Dataset,即库将自动检测并利用所有可用的CPU资源来预处理训练数据。我们将进一步用Ray AIR Trainer来包装我们的模型训练循环,以便无缝缩放到多GPU设置。


在Amazon SageMaker上部署Ray集群


在多节点环境中使用Ray框架及其提供的实用程序的先决条件是Ray集群的部署。一般来说,设计、部署、管理和维护这样的计算集群可能是一项艰巨的任务,通常需要专门的开发工程师(或工程师团队)。这可能会给一些开发团队带来不可逾越的障碍。在本文呢中,我们将演示如何使用AWS的管理培训服务Amazon SageMaker来克服这个障碍。特别是,我们将创建一个包含GPU实例和CPU实例的SageMaker异构集群,并在启动时使用它来部署Ray集群。然后,我们将在这个Ray集群上运行Ray AIR训练应用程序,同时依靠Ray的后端在集群中的所有资源上执行有效的负载平衡。当培训申请完成后,Ray集群将自动拆除。以这种方式使用SageMaker,使我们能够部署和使用Ray集群,而不需要通常与集群管理相关的开销。


Ray是一个强大的框架,支持广泛的机器学习工作负载。在本文中,我们将使用Ray 2.6.1版本演示它的一些功能和API。


玩具的例子


为了方便我们的讨论,我们将定义和训练一个简单的PyTorch(2.0)基于视觉转换器的分类模型,我们将在由随机图像和标签组成的合成数据集上训练该模型。Ray AIR文档包括各种各样的示例,演示如何使用Ray AIR构建不同类型的训练工作负载。我们在这里创建的脚本大致遵循PyTorch图像分类器示例中描述的步骤。


定义光线数据集和预处理器


Ray AIR Trainer API区分原始数据集和预处理管道,预处理管道在将数据集元素输入训练循环之前应用于数据集元素。对于我们的原始Ray数据集,我们创建了一个大小为num_records的简单整数范围。接下来,我们定义我们想要应用于数据集的预处理器。我们的Ray preprocessor包含两个组件:第一个组件是BatchMapper,它将原始整数映射到随机的图像标签对。第二个是TorchVisionPreprocessor,它对我们的随机批执行torchvision变换,将它们转换为PyTorch张量并应用一系列GaussianBlur操作。GaussianBlur操作旨在模拟一个相对繁重的数据预处理管道。两个预处理器使用链式预处理器组合在一起。下面的代码块演示了光线数据集和预处理器的创建:


import ray
from typing import Dict, Tuple
import numpy as np
import torchvision.transforms as transforms
from ray.data.preprocessors import Chain, BatchMapper, TorchVisionPreprocessor
def get_ds(batch_size, num_records):
    # create a raw Ray tabular dataset
    ds = ray.data.range(num_records)
    # map an integer to a random image-label pair
    def synthetic_ds(batch: Tuple[int]) -> Dict[str, np.ndarray]:
        labels = batch['id']
        batch_size = len(labels)
        images = np.random.randn(batch_size, 224, 224, 3).astype(np.float32)
        labels = np.array([label % 1000 for label in labels]).astype(
                                                               dtype=np.int64)
        return {"image": images, "label": labels}
    # the first step of the prepocessor maps batches of ints to
    # random image-label pairs
    synthetic_data = BatchMapper(synthetic_ds, 
                                 batch_size=batch_size, 
                                 batch_format="numpy")
    # we define a torchvision transform that converts the numpy pairs to 
    # tensors and then applies a series of gaussian blurs to simulate
    # heavy preprocessing   
    transform = transforms.Compose(
        [transforms.ToTensor()] + [transforms.GaussianBlur(11)]*10
    )
    # the second step of the prepocessor appplies the torchvision tranform
    vision_preprocessor = TorchVisionPreprocessor(columns=["image"], 
                                                  transform=transform)
    # combine the preprocessing steps
    preprocessor = Chain(synthetic_data, vision_preprocessor)
    return ds, preprocessor


请注意,Ray数据管道将自动使用Ray集群中可用的所有CPU。这包括GPU实例上的CPU资源以及集群中任何其他辅助实例的CPU资源。


定义训练循环


下一步是定义将在每个训练工作线程(例如,gpu)上运行的培训序列。首先,我们使用流行的timm (0.6.13) Python包定义模型,并使用train.torch.prepare_model API对其进行包装。接下来,我们从数据集中提取适当的分片,并定义一个迭代器,该迭代器生成具有所请求批大小的数据批,并将它们复制到训练设备。然后是训练循环本身,它由标准PyTorch代码组成。当我们退出循环时,我们报告结果损失度量。每个工人的培训序列在下面的代码块中演示:


import time
from ray import train
from ray.air import session
import torch.nn as nn
import torch.optim as optim
from timm.models.vision_transformer import VisionTransformer
# build a ViT model using timm
def build_model():
    return VisionTransformer()
# define the training loop per worker
def train_loop_per_worker(config):
    # wrap the PyTorch model with a Ray object
    model = train.torch.prepare_model(build_model())
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    # get the appropriate dataset shard
    train_dataset_shard = session.get_dataset_shard("train")
    # create an iterator that returns batches from the dataset
    train_dataset_batches = train_dataset_shard.iter_torch_batches(
        batch_size=config["batch_size"],
        prefetch_batches=config["prefetch_batches"],
        device=train.torch.get_device()
    )
    t0 = time.perf_counter()
    for i, batch in enumerate(train_dataset_batches):
        # get the inputs and labels
        inputs, labels = batch["image"], batch["label"]
        # zero the parameter gradients
        optimizer.zero_grad()
        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        # print statistics
        if i % 100 == 99:  # print every 100 mini-batches
            avg_time = (time.perf_counter()-t0)/100
            print(f"Iteration {i+1}: avg time per step {avg_time:.3f}")
            t0 = time.perf_counter()
    metrics = dict(running_loss=loss.item())
    session.report(metrics)


定义射线火炬训练器


一旦我们定义了数据管道和训练循环,我们就可以继续设置Ray TorchTrainer。我们以考虑集群中可用资源的方式配置训练器。具体来说,我们根据GPU的数量来设置训练工人的数量,并根据目标GPU上可用的内存来设置批处理大小。我们用训练1000步所需的记录数来构建数据集。


from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
def train_model():
    # we will configure the number of workers, the size of our
    # dataset, and the size of the data storage according to the
    # available resources 
    num_gpus = int(ray.available_resources().get("GPU", 0))
    
    # set the number of training workers according to the number of GPUs
    num_workers = num_gpus if num_gpus > 0 else 1
    # we set the batch size based on the GPU memory capacity of the
    # Amazon EC2 g5 instance family
    batch_size = 64
    # create a synthetic dataset with enough data to train for 1000 steps
    num_records = batch_size * 1000 * num_workers
    ds, preprocessor = get_ds(batch_size, num_records)
    ds = preprocessor(ds) 
    trainer = TorchTrainer(
        train_loop_per_worker=train_loop_per_worker,
        train_loop_config={"batch_size": batch_size},
        datasets={"train": ds},
        scaling_config=ScalingConfig(num_workers=num_workers, 
                                     use_gpu=num_gpus > 0),
    )
    trainer.fit()


部署Ray集群并运行训练序列


现在我们定义训练脚本的入口点。在这里,我们设置了Ray集群,并在头部节点上启动了训练序列。我们使用来自SageMaker -training库的Environment类来发现本教程中描述的异构SageMaker集群中的实例。我们将GPU实例组的第一个节点定义为Ray集群头节点,并在所有其他节点上运行适当的命令以将它们连接到集群。我们对头部节点进行编程,让它等待所有节点连接完毕,然后开始训练序列。这确保了Ray在定义和分配底层Ray任务时将利用所有可用资源。


import time
import subprocess
from sagemaker_training import environment
if __name__ == "__main__":
    # use the Environment() class to auto-discover the SageMaker cluster
    env = environment.Environment()
    if env.current_instance_group == 'gpu' and \
             env.current_instance_group_hosts.index(env.current_host) == 0:
        # the head node starts a ray cluster
        p = subprocess.Popen('ray start --head --port=6379',
                             shell=True).wait()
        ray.init()
        # calculate the total number of nodes in the cluster
        groups = env.instance_groups_dict.values()
        cluster_size = sum(len(v['hosts']) for v in list(groups))
        # wait until all SageMaker nodes have connected to the Ray cluster
        connected_nodes = 1
        while connected_nodes < cluster_size:
            time.sleep(1)
            resources = ray.available_resources().keys()
            connected_nodes = sum(1 for s in list(resources) if 'node' in s)
        # call the training sequence
        train_model()
        # tear down the ray cluster
        p = subprocess.Popen("ray down", shell=True).wait()
    else:
        # worker nodes attach to the head node
        head = env.instance_groups_dict['gpu']['hosts'][0]
        p = subprocess.Popen(
            f"ray start --address='{head}:6379'",
            shell=True).wait()
        # utility for checking if the cluster is still alive
        def is_alive():
            from subprocess import Popen
            p = Popen('ray status', shell=True)
            p.communicate()[0]
            return p.returncode

        # keep node alive until the process on head node completes
        while is_alive() == 0:
            time.sleep(10)


基于Amazon SageMaker异构集群的训练


训练脚本完成后,我们现在的任务是将其部署到Amazon SageMaker异构集群。为此,我们按照本教程中描述的步骤进行操作。我们首先创建一个source_dir目录,将train.py脚本放入其中,并创建一个requirements.txt文件,其中包含脚本所依赖的两个pip包timm和ray[air]。这些将自动安装在SageMaker集群中的每个节点上。我们定义了两个SageMaker实例组,第一个实例组只有一个ml.g5.xlarge(包含1个GPU和4个vcpu)、ml.c5.4xlarge(包含16个vCPU)。然后,我们使用SageMaker PyTorch估计器来定义和部署我们的培训作业到云中。


from sagemaker.pytorch import PyTorch
from sagemaker.instance_group import InstanceGroup
cpu_group = InstanceGroup("cpu", "ml.c5.4xlarge", 1)
gpu_group = InstanceGroup("gpu", "ml.g5.xlarge", 1)
estimator = PyTorch(
    entry_point='train.py',
    source_dir='./source_dir',
    framework_version='2.0.0',
    role='<arn role>',
    py_version='py310',
    job_name='hetero-cluster',
    instance_groups=[gpu_group, cpu_group]
)
estimator.fit()


结果


在下表中,我们比较了在两种不同设置下运行训练脚本的运行时结果:单个ml.g5.xlarge GPU实例和包含ml.g5的异构集群。Xlarge实例和ml.c5.4 xlarge。我们使用Amazon CloudWatch评估系统资源利用率,并使用撰写本文时可用的Amazon SageMaker定价估算培训成本(ml.c5.4 xlarge实例每小时0.816美元,ml.g5.xlarge实例每小时1.408美元)。


1-2


单实例实验中相对较高的CPU利用率和较低的GPU利用率表明数据预处理管道存在性能瓶颈。在迁移到异构集群时,这些问题将得到明确解决。不仅GPU的利用率提高了,而且训练速度也提高了。总的来说,培训的价格效率提高了23%。


总结


在本文中,我们展示了如何使用Ray数据集来平衡集群中所有可用CPU工作器的繁重数据预处理管道的负载。这使我们能够通过简单地向训练环境中添加辅助CPU实例来轻松地解决CPU瓶颈。Amazon SageMaker的异构集群支持是在云中运行Ray培训作业的一种引人注目的方式,因为它处理集群管理的所有方面,避免了对专用开发支持的需要。



文章来源:https://medium.com/@chaimrand/effective-load-balancing-with-ray-on-amazon-sagemaker-d3b9020679d3
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消