任务

更新时间:

本文介绍如何使用SDK调用任务相关的方法。

创建任务

以下示例代码用于创建任务。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import ImportQos, FileTypeFilters, FilterRule, LastModifiedFilters, \
    LastModifyFilterItem, TimeFilter, KeyFilters, KeyFilterItem, ScheduleRule, CreateJobRequest, CreateJobInfo
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)
        # 填写任务名称。
        job_name = "examplejob"
        # 填写源端数据地址名称。
        src_address="examplesrcaddress"
        # 填写目的端数据地址名称。
        dest_address = "exampledestaddress"

        """
            overwriteMode和transferMode需要组合使用,具体组合含义如下
            always, all
            全覆盖
            always, lastmodified
            根据文件最后修改时间覆盖
            never, all
            不覆盖
        """
        """
            文件覆盖方式, 可能的值
            1.never
            2.always 
        """
        overwrite_mode = "always"
        """
            文件传输的方式, 可能的值为
            1.changed
            2.all
            3.lastmodified
        """
        transfer_mode = "lastmodified"

        # maxBandWidth,maxImportTaskQps根据实际需求值填写。
        max_import_task_qps = 1000
        max_band_width = 2147483648
        # 如果maxImportTaskQps值为0或者不设置,会被设置为默认值;MaxBandWidth值为0或者不设置, 会被设置为默认值,maxBandWidth值单位为bits。
        import_qos = ImportQos(max_import_task_qps=max_import_task_qps, max_band_width=max_band_width)

        # 配置过滤规则,包含文件类型过滤器、文件过滤器、时间过滤器,具体参数含义请参看API文档。
        # 文件类型过滤器,适用于localfs。
        exclude_symlink = False
        exclude_dir = False
        file_type_filters = FileTypeFilters(exclude_dir=exclude_dir, exclude_symlink=exclude_symlink)
        # 文件过滤器,根据实际需求值填写。
        include_regex = [".*.jpg", ".*.gif"]
        exclude_regex = ["txtFile", ".*.js"]
        exclude_regex_item = KeyFilterItem(regex=exclude_regex)
        include_regex_item = KeyFilterItem(regex=include_regex)
        Key_filters = KeyFilters(excludes=exclude_regex_item, includes=include_regex_item)
        # 时间过滤器, 时间格式遵循UTC时间格式,根据实际需求值填写。
        include_start_time = "2006-01-01T00:00:00Z"
        include_end_time = "2007-01-01T00:00:00Z"
        exclude_start_time = "2009-01-01T00:00:00Z"
        exclude_end_time = "2010-01-01T00:00:00Z"
        include_time_filters = [TimeFilter(start_time=include_start_time, end_time=include_end_time)]
        exculde_time_filters = [TimeFilter(start_time=exclude_start_time, end_time=exclude_end_time)]
        include_lastModify_filter_item = LastModifyFilterItem(time_filter=include_time_filters)
        exclude_lastModify_filter_tiem = LastModifyFilterItem(time_filter=exculde_time_filters)
        last_modified_filters = LastModifiedFilters(excludes=exclude_lastModify_filter_tiem,
                                                    includes=include_lastModify_filter_item)

        filterRule = FilterRule(file_type_filters=file_type_filters,
                                key_filters=Key_filters,
                                last_modified_filters=last_modified_filters)

        # 配置调度规则,具体参数含义请参看API文档。
        max_schedule_count = 5
        start_cron_expression = "0 0 10 * * ?"
        suspend_cron_expression = "0 0 14 * * ?"
        schedule_rule = ScheduleRule(max_schedule_count=max_schedule_count, 
                                    suspend_cron_expression=suspend_cron_expression,
                                    start_cron_expression=start_cron_expression)
        client.create_job(userid, CreateJobRequest(CreateJobInfo(
            name=job_name,
            transfer_mode=transfer_mode,
            overwrite_mode=overwrite_mode,
            src_address=src_address,
            dest_address=dest_address,
            import_qos=import_qos,
            filter_rule=filterRule,
            schedule_rule=schedule_rule,
        )))
    except Exception as e:
        print(e)

更新任务

说明

更新任务操作可分为更新任务状态和更新任务限流等,具体内容请参见API任务相关文档。

以下示例代码用于更新任务状态。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import UpdateJobRequest, UpdateJobInfo, ImportQos
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)
        # 填写任务名称。
        job_name = "examplejob"
              
        # 允许的状态值有IMPORT_JOB_LAUNCHING(启动任务), IMPORT_JOB_SUSPEND(暂停任务),IMPORT_JOB_CLOSING(关闭任务)。
        status = "IMPORT_JOB_LAUNCHING"
        client.update_job(userid, job_name, UpdateJobRequest(UpdateJobInfo(
            status=status
        )))
    except Exception as e:
        print(e)
               

以下示例代码用于更新任务限流。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import UpdateJobRequest, UpdateJobInfo, ImportQos
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)
        # 填写任务名称。
        job_name = "examplejob"
        # maxBandWidth、maxImportTaskQps请根据需求填写。
        max_band_width = 1610612736
        max_import_task_qps = 1500
        client.update_job(userid, job_name, UpdateJobRequest(UpdateJobInfo(
            import_qos=ImportQos(
                max_band_width=max_band_width,
                max_import_task_qps=max_import_task_qps,
            )
        )))
    except Exception as e:
        print(e)

获取任务详情

以下示例代码用于获取特定任务的详细信息(通过任务名称)。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import GetJobRequest
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)
        # 填写任务名称。
        job_name = "examplejob"
        response = client.get_job(userid, job_name, GetJobRequest())
        print(response.body.import_job)
    except Exception as e:
        print(e)
        

以下示例代码用于获取指定任务的详情(通过任务ID)。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import GetJobRequest
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)
        # 填写任务ID。
        version = "b4155550-****-4371-****-9c7337348021"
        response = client.get_job(userid, version, GetJobRequest(by_version=""))
        print(response.body.import_job)
    except Exception as e:
        print(e)
        

正常返回示例

{
  "ImportJob": {
    "Name": "test_name",
    "SrcAddress": "test_src_address",
    "DestAddress": "test_dest_address",
    "Status": "IMPORT_JOB_DOING",
    "EnableMultiVersioning": false,
    "CreateTime": "2024-05-01T12:00:00.000Z",
    "ModifyTime": "2024-05-01T12:00:00.000Z",
    "Version": "test_id",
    "Audit": {
      "LogMode": "off"
    },
    "OverwriteMode": "always",
    "TransferMode": "all",
    "Tags": "K1:V1,K2:V2",
    "ParentName": "test_parent_name",
    "ParentVersion": "7db93837-a5ee-4e3a-b3c8-800e7947dabc",
    "ConvertSymlinkTarget": false,
    "CreateReport": false,
    "Owner": "test_owner",
    "FilterRule": {
      "KeyFilters": {
        "Includes": {
          "Regex": [
            ".*\\.jpg$"
          ]
        },
        "Excludes": {
          "Regex": [
            ".*\\.jpg$"
          ]
        }
      },
      "LastModifiedFilters": {
        "Includes": {
          "TimeFilter": [
            {
              "StartTime": "2006-01-01T00:00:00Z",
              "EndTime": "2006-12-31T59:59:59Z"
            }
          ]
        },
        "Excludes": {
          "TimeFilter": [
            {
              "StartTime": "2006-01-01T00:00:00Z",
              "EndTime": "2006-12-31T59:59:59Z"
            }
          ]
        }
      },
      "FileTypeFilters": {
        "ExcludeSymlink": true,
        "ExcludeDir": true
      }
    },
    "ImportQos": {
      "MaxBandWidth": 1073741824,
      "MaxImportTaskQps": 1000
    },
    "ScheduleRule": {
      "StartCronExpression": "0 0 * * * ?",
      "SuspendCronExpression": "0 0 * * * ?",
      "MaxScheduleCount": 1
    }
  }
}

列举任务

以下示例代码用于列举账号下所有任务。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import ListJobRequest
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)
        # 根据实际填写marker,count。
        count = 2
        marker = ""
        response = client.list_job(userid, ListJobRequest(
            marker=marker,
            count=2,
        ))
        for import_job in response.body.import_job_list.import_job:
            print(import_job)
    except Exception as e:
        print(e)
        

获取任务重试信息

说明

该方法的结果主要应用于重试任务的参数配置。

以下示例代码用于获取指定轮次的任务重试信息。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import GetJobResultRequest
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)

        # 填写任务名称。
        job_name = "examplejob"
        # 填写任务轮次。
        runtime_id = 1
        response = client.get_job_result(userid, job_name, GetJobResultRequest(
            runtime_id=runtime_id
        ))
        print(response.body.import_job_result)
    except Exception as e:
        print(e)
        

正常返回示例

{
  "ImportJobResult": {
    "ReadyRetry": "Ready",
    "InvPath": "mainfest.json",
    "InvBucket": "test_sys_bucket",
    "InvDomain": "test_domain",
    "InvLocation": "oss",
    "InvAccessId": "test_access_id",
    "InvAccessSecret": "test_secret_key",
    "InvRegionId": "test_region_id",
    "AddressType": "ossinv",
    "TotalObjectCount": 1000,
    "CopiedObjectCount": 800,
    "FailedObjectCount": 200,
    "TotalObjectSize": 1000,
    "CopiedObjectSize": 800,
    "Version": "test_job_id"
  }
}

列举任务历史记录

说明

该方法主要用于查询指定任务的历史记录,详细信息请参见与任务相关的 API 文档。

以下示例代码用于列举指定任务的历史记录。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import ListJobHistoryRequest
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)

        # 填写任务名称。
        job_name = "examplejob"
        # 下列参数值请根据实际需求填写。
        marker = ""
        count = 2
        response = client.list_job_history(userid, job_name, ListJobHistoryRequest(marker=marker, count=count))
        for job_history in response.body.job_history_list.job_history:
            print(job_history)
    except Exception as e:
        print(e)
             

删除任务

以下示例代码用于删除指定任务。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import DeleteJobRequest
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)

        # 填写任务名称。
        job_name = "examplejob"
        client.delete_job(userid, job_name, DeleteJobRequest())
    except Exception as e:
        print(e)
        

创建任务迁移报告

说明

该方法为异步方法,调用后端将开始准备生成迁移报告。是否生成完毕需要通过获取任务迁移报告的方法进行确认,具体详情请参见任务相关部分的API文档。

以下示例代码用于创建指定任务迁移报告。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import CreateReportRequest, CreateReportInfo
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)

        # 填写任务名称,必填。
        job_name = "examplejob"
        # 填写任务ID,必填。
        version = "b4155550-****-4371-****-9c7337348021"
        # 填写任务轮次,必填。
        runtime_id = 1
        client.create_report(userid, CreateReportRequest(
            CreateReportInfo(
                job_name=job_name,
                version=version,
                runtime_id=runtime_id
            )
        ))
    except Exception as e:
        print(e)
        

查询任务迁移报告

说明

调用该方法可以查询指定任务的迁移报告,依据此方法可判断迁移报告是否已生成完毕,并获取已生成的迁移报告。

以下示例代码用于查询指定任务迁移报告。

import os

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import GetReportRequest
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)

        # 填写任务ID,必填。
        version = "b4155550-****-4371-****-9c7337348021"
        # 填写任务轮次,必填。
        runtime_id = 1
        response = client.get_report(userid, GetReportRequest(
            runtime_id=runtime_id,
            version=version
        ))
        print(response.body.get_report_response)
    except Exception as e:
        print(e)