数据流动流式任务最佳实践
为了便于CPFS智算版与OSS Bucket中的单文件粒度持续性的数据流动,您可以通过创建流式任务实现。
方案概览
实现某一个目录下不同文件的导入导出,只需要4步:
创建数据流动:通过创建数据流动,建立CPFS智算版文件系统任意子目录到OSS Bucket下任意prefix的映射。
创建流式任务:通过调用CreateDataFlowTask创建流式导入或导出任务,建立源端目录到目的端的通道。创建成功后,流式任务的状态一直保持为运行中,但实际不会流动数据,还需要为其创建流式子任务。
创建流式任务的子任务:接着通过调用CreateDataFlowSubTask提交不同文件的导入或导出子任务。
查询流式子任务状态:最后通过调用DescribeDataFlowSubTask查询已提交的子任务进度与状态。当调用结果中Status值为COMPLETE;Progress值为10000时,则表示源数据已全部导入或导出至目标目录。
前提条件
已创建CPFS智算版文件系统。具体操作,请参见创建文件系统。
已为目标OSS Bucket设置标签(key: cpfs-dataflow, value: true),且在数据流动的使用过程中,不能删除和修改该标签,否则CPFS智算版数据流动无法访问Bucket的数据。具体操作,请参见管理存储空间标签。
为了防止多个数据流动向同一个OSS Bucket导出数据时产生数据冲突,需要该OSS Bucket开启版本控制。更多信息,请参见版本控制概述。
使用数据流动流式任务,CPFS智算版文件系统的版本号必须为2.6.0及以上版本。关于如何查看文件系统的版本号,请参见查询文件系统版本号。
流式导入任务
本示例以将OSS Bucket(examplebucket)中/bmcpfs/test/file.xml
下的子目录(/test/file)数据迁移至CPFS智算版文件系统(bmcpfs-370jz26fkr2st9****)中/oss/mnt/file.xml
下的子目录(/mnt/file)为例,介绍如何创建流式导入任务和流式导入子任务,实现单文件粒度持续性的数据流动。
创建数据流动。
您可以通过调用API或控制台为目标文件系统创建数据流动,并获取数据流动ID(例如,df-37bae1804cc6****)。
通过调用CreateDataFlow API创建数据流动。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系统ID。 "SourceStorage": "oss://examplebucket", //源端OSS Bucket的访问地址。 "FileSystemPath": "/oss/", //指定链接到OSS的CPFS智算版文件系统目录,且必须是已有目录。 "SourceStoragePath": "/bmcpfs/", //源端存储Bucket内的访问路径。 }
预期输出:
{ "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E", "DataFlowId": "df-37bae1804cc6****" }
通过控制台创建数据流动。具体操作,请参见管理数据流动。
创建数据流动流式导入任务。
通过调用CreateDataFlowTask API创建数据流动流式导入任务,并保存
TaskId
返回值。{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。 "DataFlowId": "df-37bae1804cc6****", //数据流动ID。 "TaskAction": "StreamImport", // 数据流动流式任务类型,导入StreamImport,导出StreamExport。 "DataType": "MetaAndData", // 数据类型,目前仅支持MetaAndData。 "Directory": "/test/", // 同步源相对目录。此场景为OSS Bucket的Bucket Prefix。 "DstDirectory": "/mnt/", // 同步目标相对目录。此场景为CPFS智算版文件系统的目录。 "ConflictPolicy": "SKIP_THE_FILE" // 同名文件冲突策略。OVERWRITE_EXISTING:强制覆盖同名文件;SKIP_THE_FILE:跳过同名文件;KEEP_LATEST:比较更新时间,保留最新版本。 }
预期输出:
{ "RequestId": "2D69A58F-345C-4FDE-88E4-BF518948F518", "TaskId": "task-376a61ab2d80****" }
创建流式导入任务的子任务。
通过调用CreateDataFlowSubTask API提交流式任务的导入子任务。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。 "DataFlowId": "df-37bae1804cc****", //数据流动ID。 "DataFlowTaskId": "task-376a61ab2d80****", //流式导入任务ID。 "SrcFilePath": "/file.xml", // 流式任务中源目录下的某个文件路径。此场景为OSS Bucket的Bucket对象路径。 "DstFilePath": "/mnt/file.xml" // 流式任务中目标目录下的某个文件路径。此场景为CPFS智算版文件系统的目录。 }
预期输出:
{ "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3", "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" }
查询流式子任务的执行进度和任务状态。
通过调用DescribeDataFlowSubTasks API查询已提交的子任务执行进度与任务状态。不同的Key值对应不同的Value值。更多信息,请参见DescribeDataFlowSubTasks。
本示例通过筛选DataFlowIds(数据流动ID)方式查询子任务的信息。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系统ID。 "Filters": [ { "Key": "DataFlowIds", "Value": "df-37bae1804cc****" } ] }
预期输出:
{ "RequestId": "98696EF0-1607-4E9D-B01D-F20930B6****", "DataFlowSubTask": { "DataFlowSubTask": [ { "FileSystemId": "bmcpfs-370jz26fkr2st9****", //文件系统ID。 "DataFlowId": "df-37bae1804cc****", //数据流动ID。 "DataFlowTaskId": "task-37b705830bcb****", //数据流动流式任务ID。 "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****",//数据流动流式子任务ID。 "SrcFilePath": "/bmcpfs/test/file.xml",//源文件路径。 "DstFilePath": "/oss/mnt/file.xml", //目标文件路径。 "Status": "COMPLETE", "Progress": 10000, "CreateTime": "2024-10-23 16:28:16", "StartTime": "2024-10-23 16:28:17", "EndTime": "2024-10-23 16:29:22", "ErrorMsg": "",//未返回或者返回为空时,表示没有错误信息。 "ProgressStats": { "BytesTotal": 68, "BytesDone": 68, "ActualBytes": 68, "AverageSpeed": 34 }, "FileDetail": { "ModifyTime": 1725897600000000000, "Size": 68, "Checksum": "crc64:850309505450944****"//文件校验码。 } } ] } }
调用结果中的Progress和Status参数值即为子任务的执行进度和任务状态信息。当任务状态Status值为COMPLETE时,表示任务已完成;当Progress值为10000时,表示数据已全部导入或导出至目标目录。
流式导出任务
本示例以将CPFS智算版文件系统(bmcpfs-370jz26fkr2st9****)中/oss_test/yaml/test/file.png
数据迁移至OSS Bucket(examplebucket)/bmcpfs_test/dataflows/mnt/file.png
为例,介绍如何创建流式导出任务和流式导出子任务,实现单文件粒度持续性的数据流动。
创建数据流动。
您可以通过调用API或控制台为目标文件系统创建数据流动,并获取数据流动ID(例如,df-37bae1804cc6****)。
通过调用CreateDataFlow API创建数据流动。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系统ID。 "SourceStorage": "oss://examplebucket", //源端OSS Bucket的访问地址。 "FileSystemPath": "/oss/", //指定链接到OSS的CPFS智算版文件系统目录,且必须是已有目录。 "SourceStoragePath": "/bmcpfs/", //源端存储Bucket内的访问路径。 }
预期输出:
{ "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E", "DataFlowId": "df-37bae1804cc6****" }
通过控制台创建数据流动。具体操作,请参见管理数据流动。
创建数据流动流式导出任务。
通过调用CreateDataFlowTask API创建数据流动流式导出任务,并保存
TaskId
返回值。{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。 "DataFlowId": "df-37bae1804cc6****", //数据流动ID。 "TaskAction": "StreamImport", // 数据流动流式任务类型,此场景为导出StreamExport。 "DataType": "MetaAndData", // 数据类型,目前仅支持MetaAndData。 "Directory": "/yaml/", // 同步源相对目录。流式导出场景时为CPFS智算版文件系统CPFS目录的相对路径。 "DstDirectory": "/dataflows/", // 同步目标相对目录。流式导出场景为OSS Bucket的Bucket Prefix的相对路径。 "ConflictPolicy": "SKIP_THE_FILE" // 同名文件冲突策略。OVERWRITE_EXISTING:强制覆盖同名文件;SKIP_THE_FILE:跳过同名文件;KEEP_LATEST:比较更新时间,保留最新版本。 }
预期输出:
{ "RequestId": "BC7C825C-5F65-4B56-BEF6-98C56C7C930B", "TaskId": "task-37b705830bcb****" }
创建流式导出任务的子任务。
通过调用CreateDataFlowSubTask API提交流式导出任务的子任务。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。 "DataFlowId": "df-37bae1804cc****", //数据流动ID。 "DataFlowTaskId": "task-37b705830bcb****", //流式导出任务ID。 "SrcFilePath": "/test/file.png", // 流式任务中Directory目录下的某个文件的相对路径。 "DstFilePath": "/mnt/file.png" // 流式任务中DstDirectory目录下的某个文件的相对路径。 }
预期输出:
{ "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3", "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****" }
查询流式导出子任务的执行进度和任务状态。
通过调用DescribeDataFlowSubTasks API查询已提交的子任务执行进度与任务状态。不同的Key值对应不同的Value值。更多信息,请参见DescribeDataFlowSubTasks。
本示例通过筛选DataFlowIds(数据流动ID)方式查询子任务的信息。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系统ID。 "Filters": [ { "Key": "DataFlowIds", "Value": "df-37bae1804cc****" } ] }
预期输出:
{ "RequestId": "FCBB356-96CA-135B-84B3-02E6F262B6BD", "DataFlowSubTask": { "DataFlowSubTask": [ { "FileSystemId": "bmcpfs-370jz26fkr2st9****", //文件系统ID。 "DataFlowId": "df-37bae1804cc****", //数据流动ID。 "DataFlowTaskId": "task-37b705830bcb****", //数据流动流式任务ID。 "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****",//数据流动流式子任务ID。 "SrcFilePath": "/oss_test/yaml/test/file.png",//源文件路径。 "DstFilePath": "/bmcpfs_test/dataflows/mnt/file.png", //目标文件路径。 "Status": "COMPLETE", "Progress": 10000, "CreateTime": "2024-10-23 17:18:16", "StartTime": "2024-10-23 17:18:17", "EndTime": "2024-10-23 17:19:00", "ErrorMsg": "",//未返回或者返回为空时,表示没有错误信息。 "ProgressStats": { "BytesTotal": 68, "BytesDone": 68, "ActualBytes": 68, "AverageSpeed": 34 }, "FileDetail": { "ModifyTime": 1725897600000000000, "Size": 68, "Checksum": "crc64:850309505450944****"//文件校验码。 } } ] } }
调用结果中的Progress和Status参数值即为子任务的执行进度和任务状态信息。当任务状态Status值为COMPLETE时,表示任务已完成;当Progress值为10000时,表示数据已全部导入或导出至目标目录。
相关操作
如果您需要取消流式子任务,可以通过调用CancelDataFlowSubTask API实现。仅支持取消CREATED和RUNNING状态的流式子任务。
{
"FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系统ID。
"DataFlowId": "df-37bae1804cc****", //数据流动ID。
"DataFlowTaskId": "task-37b705830bcb****", //流式导入或导出任务ID。
"DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" // 数据流动流式子任务ID。
}