数据流动流式任务最佳实践

更新时间:

为了便于CPFS智算版与OSS Bucket中的单文件粒度持续性的数据流动,您可以通过创建流式任务实现。

方案概览

实现某一个目录下不同文件的导入导出,只需要4步:

  1. 创建数据流动:通过创建数据流动,建立CPFS智算版文件系统任意子目录到OSS Bucket下任意prefix的映射。

  2. 创建流式任务:通过调用CreateDataFlowTask创建流式导入或导出任务,建立源端目录到目的端的通道。创建成功后,流式任务的状态一直保持为运行中,但实际不会流动数据,还需要为其创建流式子任务。

  3. 创建流式任务的子任务:接着通过调用CreateDataFlowSubTask提交不同文件的导入或导出子任务。

  4. 查询流式子任务状态:最后通过调用DescribeDataFlowSubTask查询已提交的子任务进度与状态。当调用结果中Status值为COMPLETEProgress值为10000时,则表示源数据已全部导入或导出至目标目录。

前提条件

  • 已创建CPFS智算版文件系统。具体操作,请参见创建文件系统

  • 已为目标OSS Bucket设置标签(key: cpfs-dataflow, value: true),且在数据流动的使用过程中,不能删除和修改该标签,否则CPFS智算版数据流动无法访问Bucket的数据。具体操作,请参见管理存储空间标签

  • 为了防止多个数据流动向同一个OSS Bucket导出数据时产生数据冲突,需要该OSS Bucket开启版本控制。更多信息,请参见版本控制概述

重要

使用数据流动流式任务,CPFS智算版文件系统的版本号必须为2.6.0及以上版本。关于如何查看文件系统的版本号,请参见查询文件系统版本号

流式导入任务

本示例以将OSS Bucket(examplebucket)中/bmcpfs/test/file.xml下的子目录(/test/file)数据迁移至CPFS智算版文件系统(bmcpfs-370jz26fkr2st9****)中/oss/mnt/file.xml下的子目录(/mnt/file)为例,介绍如何创建流式导入任务和流式导入子任务,实现单文件粒度持续性的数据流动。

  1. 创建数据流动。

    您可以通过调用API或控制台为目标文件系统创建数据流动,并获取数据流动ID(例如,df-37bae1804cc6****)。

    • 通过调用CreateDataFlow API创建数据流动。

      {
        "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系统ID。
        "SourceStorage": "oss://examplebucket", //源端OSS Bucket的访问地址。
        "FileSystemPath": "/oss/",  //指定链接到OSSCPFS智算版文件系统目录,且必须是已有目录。
        "SourceStoragePath": "/bmcpfs/", //源端存储Bucket内的访问路径。
      }

      预期输出:

      {
        "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E",
        "DataFlowId": "df-37bae1804cc6****"
      }
    • 通过控制台创建数据流动。具体操作,请参见管理数据流动

  2. 创建数据流动流式导入任务。

    通过调用CreateDataFlowTask API创建数据流动流式导入任务,并保存TaskId返回值。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。
      "DataFlowId": "df-37bae1804cc6****", //数据流动ID。
      "TaskAction": "StreamImport", // 数据流动流式任务类型,导入StreamImport,导出StreamExport。
      "DataType": "MetaAndData", // 数据类型,目前仅支持MetaAndData。
      "Directory": "/test/", // 同步源相对目录。此场景为OSS BucketBucket Prefix。
      "DstDirectory": "/mnt/", // 同步目标相对目录。此场景为CPFS智算版文件系统的目录。
      "ConflictPolicy": "SKIP_THE_FILE" // 同名文件冲突策略。OVERWRITE_EXISTING:强制覆盖同名文件;SKIP_THE_FILE:跳过同名文件;KEEP_LATEST:比较更新时间,保留最新版本。
    }

    预期输出:

    {
      "RequestId": "2D69A58F-345C-4FDE-88E4-BF518948F518",
      "TaskId": "task-376a61ab2d80****"
    }
  3. 创建流式导入任务的子任务。

    通过调用CreateDataFlowSubTask API提交流式任务的导入子任务。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。
      "DataFlowId": "df-37bae1804cc****", //数据流动ID。
      "DataFlowTaskId": "task-376a61ab2d80****", //流式导入任务ID。
      "SrcFilePath": "/file.xml", // 流式任务中源目录下的某个文件路径。此场景为OSS BucketBucket对象路径。
      "DstFilePath": "/mnt/file.xml" // 流式任务中目标目录下的某个文件路径。此场景为CPFS智算版文件系统的目录。
    }

    预期输出:

    {
      "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3",
      "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****"
    }
  4. 查询流式子任务的执行进度和任务状态。

    通过调用DescribeDataFlowSubTasks API查询已提交的子任务执行进度与任务状态。不同的Key值对应不同的Value值。更多信息,请参见DescribeDataFlowSubTasks

    本示例通过筛选DataFlowIds(数据流动ID)方式查询子任务的信息。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系统ID。
      "Filters": [
        {
          "Key": "DataFlowIds",
          "Value": "df-37bae1804cc****"
        }
      ]
    }

    预期输出:

    {
      "RequestId": "98696EF0-1607-4E9D-B01D-F20930B6****",
      "DataFlowSubTask": {
        "DataFlowSubTask": [
          {
            "FileSystemId": "bmcpfs-370jz26fkr2st9****", //文件系统ID。
            "DataFlowId": "df-37bae1804cc****", //数据流动ID。
            "DataFlowTaskId": "task-37b705830bcb****", //数据流动流式任务ID。
            "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****",//数据流动流式子任务ID。
            "SrcFilePath": "/bmcpfs/test/file.xml",//源文件路径。
            "DstFilePath": "/oss/mnt/file.xml", //目标文件路径。
            "Status": "COMPLETE",
            "Progress": 10000,
            "CreateTime": "2024-10-23 16:28:16",
            "StartTime": "2024-10-23 16:28:17",
            "EndTime": "2024-10-23 16:29:22",
            "ErrorMsg": "",//未返回或者返回为空时,表示没有错误信息。
            "ProgressStats": {
              "BytesTotal": 68,
              "BytesDone": 68,
              "ActualBytes": 68,
              "AverageSpeed": 34
            },
            "FileDetail": {
              "ModifyTime": 1725897600000000000,
              "Size": 68,
              "Checksum": "crc64:850309505450944****"//文件校验码。
            }
          }
        ]
      }
    }

    调用结果中的ProgressStatus参数值即为子任务的执行进度和任务状态信息。当任务状态Status值为COMPLETE时,表示任务已完成;当Progress值为10000时,表示数据已全部导入或导出至目标目录。

流式导出任务

本示例以将CPFS智算版文件系统(bmcpfs-370jz26fkr2st9****)中/oss_test/yaml/test/file.png数据迁移至OSS Bucket(examplebucket)/bmcpfs_test/dataflows/mnt/file.png为例,介绍如何创建流式导出任务和流式导出子任务,实现单文件粒度持续性的数据流动。

  1. 创建数据流动。

    您可以通过调用API或控制台为目标文件系统创建数据流动,并获取数据流动ID(例如,df-37bae1804cc6****)。

    • 通过调用CreateDataFlow API创建数据流动。

      {
        "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系统ID。
        "SourceStorage": "oss://examplebucket", //源端OSS Bucket的访问地址。
        "FileSystemPath": "/oss/",  //指定链接到OSSCPFS智算版文件系统目录,且必须是已有目录。
        "SourceStoragePath": "/bmcpfs/", //源端存储Bucket内的访问路径。
      }

      预期输出:

      {
        "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E",
        "DataFlowId": "df-37bae1804cc6****"
      }
    • 通过控制台创建数据流动。具体操作,请参见管理数据流动

  2. 创建数据流动流式导出任务。

    通过调用CreateDataFlowTask API创建数据流动流式导出任务,并保存TaskId返回值。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。
      "DataFlowId": "df-37bae1804cc6****", //数据流动ID。
      "TaskAction": "StreamImport", // 数据流动流式任务类型,此场景为导出StreamExport。
      "DataType": "MetaAndData", // 数据类型,目前仅支持MetaAndData。
      "Directory": "/yaml/", // 同步源相对目录。流式导出场景时为CPFS智算版文件系统CPFS目录的相对路径。
      "DstDirectory": "/dataflows/", // 同步目标相对目录。流式导出场景为OSS BucketBucket Prefix的相对路径。
      "ConflictPolicy": "SKIP_THE_FILE" // 同名文件冲突策略。OVERWRITE_EXISTING:强制覆盖同名文件;SKIP_THE_FILE:跳过同名文件;KEEP_LATEST:比较更新时间,保留最新版本。
    }

    预期输出:

    {
      "RequestId": "BC7C825C-5F65-4B56-BEF6-98C56C7C930B",
      "TaskId": "task-37b705830bcb****"
    }
  3. 创建流式导出任务的子任务。

    通过调用CreateDataFlowSubTask API提交流式导出任务的子任务。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。
      "DataFlowId": "df-37bae1804cc****", //数据流动ID。
      "DataFlowTaskId": "task-37b705830bcb****", //流式导出任务ID。
      "SrcFilePath": "/test/file.png", // 流式任务中Directory目录下的某个文件的相对路径。
      "DstFilePath": "/mnt/file.png" // 流式任务中DstDirectory目录下的某个文件的相对路径。
    }

    预期输出:

    {
      "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3",
      "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****"
    }
  4. 查询流式导出子任务的执行进度和任务状态。

    通过调用DescribeDataFlowSubTasks API查询已提交的子任务执行进度与任务状态。不同的Key值对应不同的Value值。更多信息,请参见DescribeDataFlowSubTasks

    本示例通过筛选DataFlowIds(数据流动ID)方式查询子任务的信息。

    {
      "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系统ID。
      "Filters": [
        {
          "Key": "DataFlowIds",
          "Value": "df-37bae1804cc****"
        }
      ]
    }

    预期输出:

    {
      "RequestId": "FCBB356-96CA-135B-84B3-02E6F262B6BD",
      "DataFlowSubTask": {
        "DataFlowSubTask": [
          {
            "FileSystemId": "bmcpfs-370jz26fkr2st9****", //文件系统ID。
            "DataFlowId": "df-37bae1804cc****", //数据流动ID。
            "DataFlowTaskId": "task-37b705830bcb****", //数据流动流式任务ID。
            "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****",//数据流动流式子任务ID。
            "SrcFilePath": "/oss_test/yaml/test/file.png",//源文件路径。
            "DstFilePath": "/bmcpfs_test/dataflows/mnt/file.png", //目标文件路径。
            "Status": "COMPLETE",
            "Progress": 10000,
            "CreateTime": "2024-10-23 17:18:16",
            "StartTime": "2024-10-23 17:18:17",
            "EndTime": "2024-10-23 17:19:00",
            "ErrorMsg": "",//未返回或者返回为空时,表示没有错误信息。
            "ProgressStats": {
              "BytesTotal": 68,
              "BytesDone": 68,
              "ActualBytes": 68,
              "AverageSpeed": 34
            },
            "FileDetail": {
              "ModifyTime": 1725897600000000000,
              "Size": 68,
              "Checksum": "crc64:850309505450944****"//文件校验码。
            }
          }
        ]
      }
    }

    调用结果中的ProgressStatus参数值即为子任务的执行进度和任务状态信息。当任务状态Status值为COMPLETE时,表示任务已完成;当Progress值为10000时,表示数据已全部导入或导出至目标目录。

相关操作

如果您需要取消流式子任务,可以通过调用CancelDataFlowSubTask API实现。仅支持取消CREATEDRUNNING状态的流式子任务。

{
  "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系统ID。
  "DataFlowId": "df-37bae1804cc****", //数据流动ID。
  "DataFlowTaskId": "task-37b705830bcb****", //流式导入或导出任务ID。
  "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" // 数据流动流式子任务ID。
}