使用OSS Connector for AI/ML高效完成数据训练任务

本文将为您详细介绍如何快速使用OSS Connector for AI/ML来高效配合数据模型的创建以及训练工作。

部署环境

  • 操作系统:Linux x86-64

  • glibc:>=2.17

  • Python:3.8-3.12

  • PyTorch: >=2.0

  • 使用OSS Checkpoint功能需Linux内核支持userfaultfd

    说明

    Ubuntu系统为例,您可以执行sudo grep CONFIG_USERFAULTFD /boot/config-$(uname -r)命令确认Linux是否支持userfaultfd,当返回结果中显示CONFIG_USERFAULTFD=y时,则表示内核支持。返回结果显示CONFIG_USERFAULTFD=n时,则表示内核不支持,即无法使用OSS Checkpoint功能。

快速安装

以下内容为Python3.12版本安装OSS Connector for AI/ML示例:

  1. Linux操作系统或基于Linux操作系统构建镜像所生成容器空间内,执行pip3.12 install osstorchconnector命令安装OSS Connector for AI/ML。

    pip3.12 install osstorchconnector
  2. 执行pip3.12 show osstorchconnector查看是否安装成功。

    pip3.12 show osstorchconnector

    当返回结果中显示osstorchconnector的版本信息时表示OSS Connector for AI/ML安装成功。

    image

配置

  1. 创建访问凭证配置文件。

    mkdir -p /root/.alibabacloud && touch /root/.alibabacloud/credentials
  2. 添加访问凭证配置并保存。

    示例中的<Access-key-id><Access-key-secret>请分别替换为RAM用户的AccessKey ID、AccessKeySecret。关于如何创建AccessKey IDAccessKeySecret,请参见创建AccessKey,配置项说明以及使用临时访问凭证配置请参见配置访问凭证

    {
      "AccessKeyId": "LTAI************************",
      "AccessKeySecret": "At32************************"
    }
  3. 创建OSS Connector配置文件。

    mkdir -p /etc/oss-connector/ && touch /etc/oss-connector/config.json
  4. 添加OSS Connector相关配置并保存。配置项说明请参见配置OSS Connector

    正常情况下使用以下默认配置即可。

    {
        "logLevel": 1,
        "logPath": "/var/log/oss-connector/connector.log",
        "auditPath": "/var/log/oss-connector/audit.log",
        "datasetConfig": {
            "prefetchConcurrency": 24,
            "prefetchWorker": 2
        },
        "checkpointConfig": {
            "prefetchConcurrency": 24,
            "prefetchWorker": 4,
            "uploadConcurrency": 64
        }
    }
    

示例

以下示例旨在使用PyTorch创建一个手写数字识别模型。该模型使用由OssMapDataset构建的MNIST数据集,同时借助OssCheckpoint来实现模型检查点的保存和加载。

import io
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import DataLoader
from osstorchconnector import OssMapDataset
from osstorchconnector import OssCheckpoint

# 定义超参数。
EPOCHS = 1
BATCH_SIZE = 64
LEARNING_RATE = 0.001
CHECKPOINT_READ_URI = "oss://you_bucketname/epoch.ckpt"  # 读取OSS中检查点的地址。
CHECKPOINT_WRITE_URI = "oss://you_bucketname/epoch.ckpt" # 保存检查点到OSS的地址。
ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com"       # 访问OSS的内网地域节点地址,使用此地址需ECS实例与OSS实例处于同一地域。
CONFIG_PATH = "/etc/oss-connector/config.json"           # OSS Connector配置文件路径。
CRED_PATH = "/root/.alibabacloud/credentials"            # 访问凭证配置文件路径。
OSS_URI = "oss://you_bucketname/mninst/"                 # OSS中Bucket资源路径地址。

# 创建OssCheckpoint对象,用于训练过程中将检查点保存到OSS以及从OSS中读取检查点。
checkpoint = OssCheckpoint(endpoint=ENDPOINT, cred_path=CRED_PATH, config_path=CONFIG_PATH)

# 定义简单的卷积神经网络。
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        input_size = 224
        after_conv1 = (input_size - 3 + 2*1) 
        after_pool1 = after_conv1 // 2  
        after_conv2 = (after_pool1 - 3 + 2*1) // 1 + 1  
        after_pool2 = after_conv2 // 2  
        flattened_size = 64 * after_pool2 * after_pool2
        self.fc1 = nn.Linear(flattened_size, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = nn.ReLU()(self.conv1(x)) 
        x = nn.MaxPool2d(2)(x)
        x = nn.ReLU()(self.conv2(x))
        x = nn.MaxPool2d(2)(x)
        x = x.view(x.size(0), -1)
        x = nn.ReLU()(self.fc1(x))
        x = self.fc2(x)
        return x

# 数据预处理。
trans = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])
def transform(object):
    try:
        img = Image.open(io.BytesIO(object.read())).convert('L')
        val = trans(img)
    except Exception as e:
        raise e
    label = 0
    return val, torch.tensor(label)

# 加载OssMapDataset数据集。
train_dataset = OssMapDataset.from_prefix(OSS_URI, endpoint=ENDPOINT, transform=transform, cred_path=CRED_PATH, config_path=CONFIG_PATH)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, num_workers=32, prefetch_factor=2,shuffle=True)

# 初始化模型、损失函数与优化器。
model = SimpleCNN()
criterion = nn.CrossEntropyLoss()  
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)  

# 训练模型。
for epoch in range(EPOCHS):
    for i, (images, labels) in enumerate(train_loader):
        optimizer.zero_grad()  
        outputs = model(images)  
        loss = criterion(outputs, labels)  
        loss.backward()  
        optimizer.step()  
        if (i + 1) % 100 == 0:
            print(f'Epoch [{epoch + 1}/{EPOCHS}], Step [{i + 1}/{len(train_loader)}], Loss: {loss.item():.4f}')
    # 使用OssCheckpoint对象保存检查点。
    with checkpoint.writer(CHECKPOINT_WRITE_URI) as writer:
        torch.save(model.state_dict(), writer)
        print("-------------------------")
        print(model.state_dict)

# 使用OssCheckpoint对象读取检查点。
with checkpoint.reader(CHECKPOINT_READ_URI) as reader:
   state_dict = torch.load(reader)

# 加载模型。
model = SimpleCNN()
model.load_state_dict(state_dict)
model.eval()