数据流动流式任务最佳实践
为了便于CPFS智算版与OSS Bucket中的单文件粒度持续性的数据流动,您可以通过创建流式任务实现。
方案概览
实现某一个目录下不同文件的导入导出,只需要4步:
创建数据流动:通过创建数据流动,建立CPFS智算版文件系统任意子目录到OSS Bucket下任意prefix的映射。
创建流式任务:通过调用CreateDataFlowFlowTask创建流式导入或导出任务。
创建流式任务的子任务:接着通过调用CreateDataFlowSubTask提交不同文件的导入或导出子任务。
查询流式子任务状态:最后通过调用DescribeDataFlowSubTask查询已提交的子任务进度与状态。当调用结果中Status值为COMPLETE;Progress值为10000时,则表示源数据已全部导入或导出至目标目录。
前提条件
已创建CPFS智算版文件系统。具体操作,请参见创建文件系统。
使用数据流动流式任务,CPFS智算版文件系统的版本号必须为2.6.0及以上版本。关于如何查看文件系统的版本号,请参见查询文件系统版本号。
操作步骤
创建数据流动。
您可以通过控制台或调用API为目标文件系统创建数据流动。
通过控制台,请参见管理数据流动。
通过调用API,请参见CreateDataFlow。
创建数据流动流式任务。
通过调用CreateDataFlowTask API创建数据流动流式导入或导出任务。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。 "DataFlowId": "df-37bae1804cc6****", //数据流动ID。 "TaskAction": "StreamImport", // 数据流动流式任务类型,导入StreamImport,导出StreamExport。 "DataType": "MetaAndData", // 数据类型,目前仅支持MetaAndData。 "Directory": "/", // 同步源相对目录。导入为OSS Bucket的Bucket Prefix,导出为智算CPFS文件系统的目录。 "DstDirectory": "/", // 同步目标相对目录。导入为智算CPFS文件系统的目录,导出为OSS Bucket的Bucket Prefix。 "ConflictPolicy": "SKIP_THE_FILE" // 同名文件冲突策略。OVERWRITE_EXISTING:强制覆盖同名文件;SKIP_THE_FILE:跳过同名文件;KEEP_LATEST:比较更新时间,保留最新版本。 }
创建流式任务的子任务。
通过调用CreateDataFlowSubTask API提交流式任务的导入或导出子任务。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。 "DataFlowId": "df-37bae1804cc****", //数据流动ID。 "DataFlowTaskId": "task-xxx", //流式导入或导出任务ID。 "SrcFilePath": "/test/file.jpg", // 流式任务中源目录下的某个文件路径。导入为OSS Bucket的Bucket对象路径,导出为智算CPFS文件系统的目录。 "DstFilePath": "/mnt/file.jpg" // 流式任务中目标目录下的某个文件路径。导入为智算CPFS文件系统的目录,导出为OSS Bucket的Bucket对象路径。 }
查询流式子任务的执行进度和任务状态。
通过调用DescribeDataFlowTasks API查询已提交的子任务执行进度与任务状态。不同的Key值对应不同的Value值。更多信息,请参见DescribeDataFlowTasks。
本示例通过筛选DataFlowIds(数据流动ID)方式查询子任务的信息。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。 "Filters": [ { "Key": "DataFlowIds", "Value": "df-37bae1804cc****" } ] }
调用结果中的Progress和Status参数值即为子任务的执行进度和任务状态信息。当任务状态Status值为COMPLETE时,表示任务已完成;当Progress值为10000时,表示数据已全部导入或导出至目标目录。
相关操作
如果您需要取消流式任务,可以通过调用CancelDataFlowAutoRefresh API实现。仅支持取消CREATED和RUNNING状态的流式任务。
{
"FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系统ID。
"DataFlowId": "df-37bae1804cc****", //数据流动ID。
"DataFlowTaskId": "task-****", //流式导入或导出任务ID。
"DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" // 数据流动流式任务ID。
}