场景示例
更新时间:
本文介绍运行任务并查看结果、重试失败文件的完整流程。
场景一:运行任务
以下示例用于创建通道、代理、数据地址和迁移任务,并运行任务以查看其执行结果。
说明
在不使用代理的情况下,请忽略下面代码中的创建通道和代理的部分。使用代理前需要先部署代理,详情请参见代理管理。
import os
import sys
import time
from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import *
from alibabacloud_tea_openapi.models import Config
if __name__ == "__main__":
try:
# 填写主账号ID。
userid = "11470***876***55"
# 这里以北京区域为例。
endpoint = "cn-beijing.mgw.aliyuncs.com"
"""
步骤1: 初始化。
"""
config = Config(
endpoint=endpoint,
access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
)
client = Client(config)
"""
步骤2:创建通道和代理(可选,请根据实际情况选择是否创建通道和代理),代理可以创建多个关联一个通道。
"""
# MaxBandwidth, MaxQps默认为0, 表示没有限制, MaxBandWidth的单位是bit,请按照实际需求填写。
max_bandwidth = 1073741824
max_qps = 1000
resp = client.create_tunnel(userid, CreateTunnelRequest(
CreateTunnelInfo(tunnel_qos=TunnelQos(max_bandwidth=max_bandwidth, max_qps=max_qps))))
print("通道ID:" + resp.headers["x-oss-import-tunnel-id"])
tunnel_id = resp.headers["x-oss-import-tunnel-id"]
# 填写代理名称。
agent_name = "exampleagent"
# 使用网络,公网请填写public, 专线或者VPN请填写vpc。
agent_endpoint = "public"
# 部署方式,目前仅支持填写default。
deploy_method = "default"
client.create_agent(userid, CreateAgentRequest(CreateAgentInfo(
agent_endpoint=agent_endpoint,
deploy_method=deploy_method,
name=agent_name,
tunnel_id=tunnel_id
)))
"""
步骤3:创建源地址和目的地址并验证可用性,这里以创建源为oss,目的为oss的地址为例,其它源示例请参见数据地址篇创建数据地址小节。
"""
# 填写数据地址名称。
src_address = "examplesrcaddress"
src_address_type = "oss"
# 以下参数请根据实际值填写。
src_region_id = "oss-cn-beijing"
src_bucket = "examplebucket"
src_prefix = "***/"
src_role_name = "rolename_******"
# 关联的代理列表,多个代理可使用,分隔。在不使用代理的情况下,该项可不填。
agent_list = agent_name
src_detail = AddressDetail(address_type=src_address_type,
region_id=src_region_id,
bucket=src_bucket,
prefix=src_prefix,
role=src_role_name)
client.create_address(userid, CreateAddressRequest(CreateAddressInfo(name=src_address, address_detail=src_detail)))
response = client.verify_address(userid, src_address)
if response.body.verify_address_response.status != "available":
print("verify address status failed, status: ", response.body.verify_address_response.status)
sys.exit()
# 填写数据地址名称。
dest_address = "exampledestaddress"
dest_address_type = "oss"
# 以下参数请根据实际值填写。
dest_region_id = "oss-cn-beijing"
dest_bucket = "examplebucket"
dest_prefix = "***/"
dest_role_name = "rolename_******"
dest_detail = AddressDetail(address_type=dest_address_type,
region_id=dest_region_id,
bucket=dest_bucket,
prefix=dest_prefix,
role=dest_role_name)
client.create_address(userid, CreateAddressRequest(CreateAddressInfo(name=dest_address, address_detail=dest_detail)))
response = client.verify_address(userid, src_address)
if response.body.verify_address_response.status != "available":
print("verify address status failed, status: %s" %response.body.verify_address_response.status)
sys.exit()
"""
** 步骤4:创建并启动任务。
"""
# 填写任务名称。
job_name = "examplejob"
# 请根据实际情况填写参数。
overwrite_mode = "always"
transfer_mode = "lastmodified"
# maxBandWidth,maxImportTaskQps根据实际需求值填写。
max_import_task_qps = 1000
max_band_width = 2147483648
# 如果maxImportTaskQps值为0或者不设置,会被设置为默认值;MaxBandWidth值为0或者不设置, 会被设置为默认值,maxBandWidth值单位为bits。
import_qos = ImportQos(max_import_task_qps=max_import_task_qps, max_band_width=max_band_width)
# 配置过滤规则,包含文件类型过滤器、文件过滤器、时间过滤器,具体参数含义请参看API文档。
# 文件类型过滤器,适用于localfs。
exclude_symlink = False
exclude_dir = False
file_type_filters = FileTypeFilters(exclude_dir=exclude_dir, exclude_symlink=exclude_symlink)
# 文件过滤器,根据实际需求值填写。
include_regex = [".*.jpg", ".*.gif"]
exclude_regex = ["txtFile", ".*.js"]
exclude_regex_item = KeyFilterItem(regex=exclude_regex)
include_regex_item = KeyFilterItem(regex=include_regex)
Key_filters = KeyFilters(excludes=exclude_regex_item, includes=include_regex_item)
# 时间过滤器, 时间格式遵循UTC时间格式,根据实际需求值填写。
include_start_time = "2006-01-01T00:00:00Z"
include_end_time = "2007-01-01T00:00:00Z"
exclude_start_time = "2009-01-01T00:00:00Z"
exclude_end_time = "2010-01-01T00:00:00Z"
include_time_filters = [TimeFilter(start_time=include_start_time, end_time=include_end_time)]
exculde_time_filters = [TimeFilter(start_time=exclude_start_time, end_time=exclude_end_time)]
include_lastmodied_filter_item = LastModifyFilterItem(time_filter=include_time_filters)
exclude_lastmodied_filter_item = LastModifyFilterItem(time_filter=exculde_time_filters)
last_modified_filters = LastModifiedFilters(excludes=exclude_lastmodied_filter_item,
includes=include_lastmodied_filter_item)
filterRule = FilterRule(file_type_filters=file_type_filters,
key_filters=Key_filters,
last_modified_filters=last_modified_filters)
# 配置调度规则,具体参数含义请参看API文档。
max_schedule_count = 5
start_cron_expression = "0 0 10 * * ?"
suspend_cron_expression = "0 0 14 * * ?"
schedule_rule = ScheduleRule(max_schedule_count=max_schedule_count,
suspend_cron_expression=suspend_cron_expression,
start_cron_expression=start_cron_expression)
client.create_job(userid, CreateJobRequest(CreateJobInfo(
name=job_name,
transfer_mode=transfer_mode,
overwrite_mode=overwrite_mode,
src_address=src_address,
dest_address=dest_address,
import_qos=import_qos,
filter_rule=filterRule,
schedule_rule=schedule_rule,
)))
client.update_job(userid, job_name, UpdateJobRequest(UpdateJobInfo(status="IMPORT_JOB_LAUNCHING")))
"""
步骤5:循环查看当前任务状态。
"""
while True:
response = client.get_job(userid, job_name, GetJobRequest())
if response.body.import_job.status == "IMPORT_JOB_INTERRUPTED":
print("job is interrupted")
sys.exit()
if response.body.import_job.status == "IMPORT_JOB_FINISHED":
print("job is finished")
break
time.sleep(60)
"""
步骤6:任务结束后查看结果。
"""
response = client.list_job_history(userid, job_name, ListJobHistoryRequest())
print(response.body.job_history_list)
except Exception as e:
print(e)
场景二:重试失败文件
迁移任务运行完成后,可能会产生一些失败文件。迁移服务将为这些失败文件构建一份失败文件列表。以下示例首先获取失败文件列表的详细信息,然后基于该信息创建一个新的数据地址,接着生成一个重试子任务。通过这种方式,您可以重新迁移这些失败的文件。
import os
import sys
import time
from datetime import datetime
from alibabacloud_hcs_mgw20240626.client import Client
from alibabacloud_hcs_mgw20240626.models import *
from alibabacloud_tea_openapi.models import Config
def retry(client, userid, job_name, src_address, dest_address, runtime_id):
"""
步骤5:根据失败文件清单信息创建新的数据地址并且验证是否可用。
"""
result_response = client.get_job_result(userid, job_name, GetJobResultRequest(
runtime_id=runtime_id
))
job_response = client.get_job(userid, job_name, GetJobRequest())
address_response = client.get_address(userid, src_address)
now_time = datetime.now().timestamp()
retry_src_address = src_address + "_retry_" + now_time
client.create_address(userid, CreateAddressRequest(CreateAddressInfo(
name=retry_src_address,
address_detail=AddressDetail(
address_type=result_response.body.import_job_result.address_type,
inv_location=result_response.body.import_job_result.inv_location,
inv_bucket=result_response.body.import_job_result.inv_bucket,
inv_access_id=result_response.body.import_job_result.inv_access_id,
inv_access_secret=result_response.body.import_job_result.inv_access_secret,
inv_path=result_response.body.import_job_result.inv_path,
inv_region_id=result_response.body.import_job_result.inv_region_id,
domain=address_response.body.import_address.address_detail.domain,
region_id=address_response.body.import_address.address_detail.region_id,
access_id=address_response.body.import_address.address_detail.access_id,
access_secret=address_response.body.import_address.address_detail.access_secret,
agent_list=address_response.body.import_address.address_detail.agent_list,
prefix=address_response.body.import_address.address_detail.prefix,
role=address_response.body.import_address.address_detail.role,
inv_role=address_response.body.import_address.address_detail.inv_role,
bucket=address_response.body.import_address.address_detail.bucket
)
)))
verify_response = client.verify_address(userid, retry_src_address)
if verify_response.body.verify_address_response.status != "available":
print("verify job failed, status: %s" %verify_response.body.verify_address_response.status)
"""
** 步骤6:创建重试子任务并启动。
"""
retry_job_name = job_name + "_retry_" + now_time
client.create_job(userid, CreateJobRequest(CreateJobInfo(
name=retry_job_name,
transfer_mode=job_response.body.import_job.transfer_mode,
overwrite_mode=job_response.body.import_job.overwrite_mode,
filter_rule=job_response.body.import_job.filter_rule,
create_report=job_response.body.import_job.create_report,
audit=job_response.body.import_job.audit,
src_address=retry_src_address,
dest_address=dest_address,
)))
client.update_job(userid, retry_job_name, UpdateJobRequest(UpdateJobInfo(
status="IMPORT_JOB_LAUNCHING"
)))
if __name__ == "__main__":
try:
# 填写主账号ID。
userid = "11470***876***55"
# 这里以北京区域为例。
endpoint = "cn-beijing.mgw.aliyuncs.com"
"""
步骤1: 初始化。
"""
config = Config(
endpoint=endpoint,
access_key_id=os.environ.get("OSS_ACCESS_KEY_ID"),
access_key_secret=os.environ.get("OSS_ACCESS_KEY_SECRET"),
)
client = Client(config)
# 下列参数值请根据实际需求填写。
job_name = "examplejob"
runtime_id = 1
src_address = "examplesrcaddress"
dest_address = "exampledestaddress"
"""
步骤2:循环查看任务状态,查看任务是否结束或者中断。
"""
while True:
response = client.get_job(userid, job_name, GetJobRequest())
if response.body.import_job.status == "IMPORT_JOB_INTERRUPTED":
print("job is interrupted")
sys.exit()
if response.body.import_job.status == "IMPORT_JOB_FINISHED":
print("job is finished")
break
time.sleep(60)
"""
步骤3:ListJobHistory,查看是否有失败文件。
"""
count = 1
marker = ""
response = client.list_job_history(userid, job_name, ListJobHistoryRequest(
count
))
if len(response.body.job_history_list.job_history) <1:
print("list job history failed, length < 1")
sys.exit()
if response.body.job_history_list.job_history[0].failed_count <= 0:
print("job %s no need retry" %job_name)
sys.exit()
"""
步骤4:循环GetJobResult,等待Ready。
"""
while True:
response = client.get_job_result(userid, job_name, GetJobResultRequest(
runtime_id=runtime_id
))
if response.body.import_job_result.ready_retry == "NoNeed":
print("job no need retry")
sys.exit()
if response.body.import_job_result.ready_retry == "Ready":
break
time.sleep(60)
retry(client, userid, job_name, src_address, dest_address, runtime_id)
except Exception as e:
print(e)
文档内容是否对您有帮助?