场景示例

更新时间:

本文介绍运行任务并查看结果、重试失败文件的完整流程。

场景一:运行任务

以下示例用于创建通道、代理、数据地址和迁移任务,并运行任务以查看其执行结果。

说明
  • 在不使用代理的情况下,请忽略下面代码中的创建通道和代理的部分。使用代理前需要先部署代理,详情请参见代理管理

import os
import sys
import time

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import *
from alibabacloud_tea_openapi.models import Config

if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"

        """ 
        步骤1: 初始化。
        """
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)

        """
        步骤2:创建通道和代理(可选,请根据实际情况选择是否创建通道和代理),代理可以创建多个关联一个通道。 
        """
        # MaxBandwidth, MaxQps默认为0, 表示没有限制, MaxBandWidth的单位是bit,请按照实际需求填写。
        max_bandwidth = 1073741824
        max_qps = 1000
        
        resp = client.create_tunnel(userid, CreateTunnelRequest(
            CreateTunnelInfo(tunnel_qos=TunnelQos(max_bandwidth=max_bandwidth, max_qps=max_qps))))
        print("通道ID:" + resp.headers["x-oss-import-tunnel-id"])
        tunnel_id = resp.headers["x-oss-import-tunnel-id"]
        # 填写代理名称。
        agent_name = "exampleagent"
        # 使用网络,公网请填写public, 专线或者VPN请填写vpc。
        agent_endpoint = "public"
        # 部署方式,目前仅支持填写default。
        deploy_method = "default"
        client.create_agent(userid, CreateAgentRequest(CreateAgentInfo(
            agent_endpoint=agent_endpoint,
            deploy_method=deploy_method,
            name=agent_name,
            tunnel_id=tunnel_id
        )))

        """ 
        步骤3:创建源地址和目的地址并验证可用性,这里以创建源为oss,目的为oss的地址为例,其它源示例请参见数据地址篇创建数据地址小节。
        """
        # 填写数据地址名称。
        src_address = "examplesrcaddress"
        src_address_type = "oss"
        # 以下参数请根据实际值填写。
        src_region_id = "oss-cn-beijing"
        src_bucket = "examplebucket"
        src_prefix = "***/"
        src_role_name = "rolename_******"
        # 关联的代理列表,多个代理可使用,分隔。在不使用代理的情况下,该项可不填。
        agent_list = agent_name
        src_detail = AddressDetail(address_type=src_address_type,
                                   region_id=src_region_id,
                                   bucket=src_bucket,
                                   prefix=src_prefix,
                                   role=src_role_name)
        client.create_address(userid, CreateAddressRequest(CreateAddressInfo(name=src_address, address_detail=src_detail)))
        response = client.verify_address(userid, src_address)
        if response.body.verify_address_response.status != "available":
            print("verify address status failed, status: ", response.body.verify_address_response.status)
            sys.exit()
        # 填写数据地址名称。
        dest_address = "exampledestaddress"
        dest_address_type = "oss"
        # 以下参数请根据实际值填写。
        dest_region_id = "oss-cn-beijing"
        dest_bucket = "examplebucket"
        dest_prefix = "***/"
        dest_role_name = "rolename_******"
        dest_detail = AddressDetail(address_type=dest_address_type,
                                   region_id=dest_region_id,
                                   bucket=dest_bucket,
                                   prefix=dest_prefix,
                                   role=dest_role_name)
        client.create_address(userid, CreateAddressRequest(CreateAddressInfo(name=dest_address, address_detail=dest_detail)))
        response = client.verify_address(userid, src_address)
        if response.body.verify_address_response.status != "available":
            print("verify address status failed, status: %s" %response.body.verify_address_response.status)
            sys.exit()

        """
         ** 步骤4:创建并启动任务。
        """
        # 填写任务名称。
        job_name = "examplejob"
        # 请根据实际情况填写参数。
        overwrite_mode = "always"
        transfer_mode = "lastmodified"

        # maxBandWidth,maxImportTaskQps根据实际需求值填写。
        max_import_task_qps = 1000
        max_band_width = 2147483648
        # 如果maxImportTaskQps值为0或者不设置,会被设置为默认值;MaxBandWidth值为0或者不设置, 会被设置为默认值,maxBandWidth值单位为bits。
        import_qos = ImportQos(max_import_task_qps=max_import_task_qps, max_band_width=max_band_width)

        # 配置过滤规则,包含文件类型过滤器、文件过滤器、时间过滤器,具体参数含义请参看API文档。
        # 文件类型过滤器,适用于localfs。
        exclude_symlink = False
        exclude_dir = False
        file_type_filters = FileTypeFilters(exclude_dir=exclude_dir, exclude_symlink=exclude_symlink)
        # 文件过滤器,根据实际需求值填写。
        include_regex = [".*.jpg", ".*.gif"]
        exclude_regex = ["txtFile", ".*.js"]
        exclude_regex_item = KeyFilterItem(regex=exclude_regex)
        include_regex_item = KeyFilterItem(regex=include_regex)
        Key_filters = KeyFilters(excludes=exclude_regex_item, includes=include_regex_item)
        # 时间过滤器, 时间格式遵循UTC时间格式,根据实际需求值填写。
        include_start_time = "2006-01-01T00:00:00Z"
        include_end_time = "2007-01-01T00:00:00Z"
        exclude_start_time = "2009-01-01T00:00:00Z"
        exclude_end_time = "2010-01-01T00:00:00Z"
        include_time_filters = [TimeFilter(start_time=include_start_time, end_time=include_end_time)]
        exculde_time_filters = [TimeFilter(start_time=exclude_start_time, end_time=exclude_end_time)]
        include_lastmodied_filter_item = LastModifyFilterItem(time_filter=include_time_filters)
        exclude_lastmodied_filter_item = LastModifyFilterItem(time_filter=exculde_time_filters)
        last_modified_filters = LastModifiedFilters(excludes=exclude_lastmodied_filter_item,
                                                    includes=include_lastmodied_filter_item)

        filterRule = FilterRule(file_type_filters=file_type_filters,
                                key_filters=Key_filters,
                                last_modified_filters=last_modified_filters)

        # 配置调度规则,具体参数含义请参看API文档。
        max_schedule_count = 5
        start_cron_expression = "0 0 10 * * ?"
        suspend_cron_expression = "0 0 14 * * ?"
        schedule_rule = ScheduleRule(max_schedule_count=max_schedule_count,
                                     suspend_cron_expression=suspend_cron_expression,
                                     start_cron_expression=start_cron_expression)
        client.create_job(userid, CreateJobRequest(CreateJobInfo(
            name=job_name,
            transfer_mode=transfer_mode,
            overwrite_mode=overwrite_mode,
            src_address=src_address,
            dest_address=dest_address,
            import_qos=import_qos,
            filter_rule=filterRule,
            schedule_rule=schedule_rule,
        )))
        client.update_job(userid, job_name, UpdateJobRequest(UpdateJobInfo(status="IMPORT_JOB_LAUNCHING")))

        """
        步骤5:循环查看当前任务状态。
        """
        while True:
            response = client.get_job(userid, job_name, GetJobRequest())
            if response.body.import_job.status == "IMPORT_JOB_INTERRUPTED":
                print("job is interrupted")
                sys.exit()
            if response.body.import_job.status == "IMPORT_JOB_FINISHED":
                print("job is finished")
                break
            time.sleep(60)
        """
        步骤6:任务结束后查看结果。
        """
        response = client.list_job_history(userid, job_name, ListJobHistoryRequest())
        print(response.body.job_history_list)
    except Exception as e:
        print(e)
      

场景二:重试失败文件

迁移任务运行完成后,可能会产生一些失败文件。迁移服务将为这些失败文件构建一份失败文件列表。以下示例首先获取失败文件列表的详细信息,然后基于该信息创建一个新的数据地址,接着生成一个重试子任务。通过这种方式,您可以重新迁移这些失败的文件。

import os
import sys
import time
from datetime import datetime

from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import *
from alibabacloud_tea_openapi.models import Config


def retry(client, userid, job_name, src_address, dest_address, runtime_id):
    """
    步骤5:根据失败文件清单信息创建新的数据地址并且验证是否可用。
    """
    result_response = client.get_job_result(userid, job_name, GetJobResultRequest(
        runtime_id=runtime_id
    ))
    job_response = client.get_job(userid, job_name, GetJobRequest())
    address_response = client.get_address(userid, src_address)
    now_time = datetime.now().timestamp()
    retry_src_address = src_address + "_retry_" + now_time
    client.create_address(userid, CreateAddressRequest(CreateAddressInfo(
        name=retry_src_address,
        address_detail=AddressDetail(
            address_type=result_response.body.import_job_result.address_type,
            inv_location=result_response.body.import_job_result.inv_location,
            inv_bucket=result_response.body.import_job_result.inv_bucket,
            inv_access_id=result_response.body.import_job_result.inv_access_id,
            inv_access_secret=result_response.body.import_job_result.inv_access_secret,
            inv_path=result_response.body.import_job_result.inv_path,
            inv_region_id=result_response.body.import_job_result.inv_region_id,
            domain=address_response.body.import_address.address_detail.domain,
            region_id=address_response.body.import_address.address_detail.region_id,
            access_id=address_response.body.import_address.address_detail.access_id,
            access_secret=address_response.body.import_address.address_detail.access_secret,
            agent_list=address_response.body.import_address.address_detail.agent_list,
            prefix=address_response.body.import_address.address_detail.prefix,
            role=address_response.body.import_address.address_detail.role,
            inv_role=address_response.body.import_address.address_detail.inv_role,
            bucket=address_response.body.import_address.address_detail.bucket
        )
    )))
    verify_response = client.verify_address(userid, retry_src_address)
    if verify_response.body.verify_address_response.status != "available":
        print("verify job failed, status: %s" %verify_response.body.verify_address_response.status)
    """
    ** 步骤6:创建重试子任务并启动。
    """
    retry_job_name = job_name + "_retry_" + now_time
    client.create_job(userid, CreateJobRequest(CreateJobInfo(
        name=retry_job_name,
        transfer_mode=job_response.body.import_job.transfer_mode,
        overwrite_mode=job_response.body.import_job.overwrite_mode,
        filter_rule=job_response.body.import_job.filter_rule,
        create_report=job_response.body.import_job.create_report,
        audit=job_response.body.import_job.audit,
        src_address=retry_src_address,
        dest_address=dest_address,
    )))
    client.update_job(userid, retry_job_name, UpdateJobRequest(UpdateJobInfo(
        status="IMPORT_JOB_LAUNCHING"
    )))


if __name__ == "__main__":
    try:
        # 填写主账号ID。
        userid = "11470***876***55"
        # 这里以北京区域为例。
        endpoint = "cn-beijing.mgw.aliyuncs.com"

        """ 
        步骤1: 初始化。
        """
        config = Config(
            endpoint=endpoint,
            access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
            access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
        )
        client = Client(config)

        # 下列参数值请根据实际需求填写。
        job_name = "examplejob"
        runtime_id = 1
        src_address = "examplesrcaddress"
        dest_address = "exampledestaddress"

        """
        步骤2:循环查看任务状态,查看任务是否结束或者中断。 
        """
        while True:
            response = client.get_job(userid, job_name, GetJobRequest())
            if response.body.import_job.status == "IMPORT_JOB_INTERRUPTED":
                print("job is interrupted")
                sys.exit()
            if response.body.import_job.status == "IMPORT_JOB_FINISHED":
                print("job is finished")
                break
            time.sleep(60)
        """
        步骤3:ListJobHistory,查看是否有失败文件。
        """
        count = 1
        marker = ""
        response = client.list_job_history(userid, job_name, ListJobHistoryRequest(
            count
        ))
        if len(response.body.job_history_list.job_history) <1:
            print("list job history failed, length < 1")
            sys.exit()
        if response.body.job_history_list.job_history[0].failed_count <= 0:
            print("job %s no need retry" %job_name)
            sys.exit()
        """
        步骤4:循环GetJobResult,等待Ready。
        """
        while True:
            response = client.get_job_result(userid, job_name, GetJobResultRequest(
                runtime_id=runtime_id
            ))
            if response.body.import_job_result.ready_retry == "NoNeed":
                print("job no need retry")
                sys.exit()
            if response.body.import_job_result.ready_retry == "Ready":
                break
            time.sleep(60)
        retry(client, userid, job_name, src_address, dest_address, runtime_id)
    except Exception as e:
        print(e)