场景示例
更新时间:
本文介绍运行任务并查看结果、重试失败文件的完整流程。
场景一:运行任务
以下示例用于创建通道、创建代理、创建数据地址、创建迁移任务,运行任务并查看运行结果。
说明
在不使用代理的情况下,请忽略下面代码中的创建通道和代理的部分。使用代理前需要先部署代理,详情请参见 代理管理。
package main
import (
"fmt"
openapipackage "github.com/alibabacloud-go/darabonba-openapi/v2/client"
mgwpackage "github.com/alibabacloud-go/hcs-mgw-20240626/client"
"os"
)
/** 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。*/
var accessKeyId = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
var accessKeySecret = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
/** 填写主账号ID。*/
var userId = "11470***876***55";
func main() {
// 这里以北京区域为例。
endpoint := "mgw.cn-beijing.aliyuncs.com"
/** 步骤1: 初始化。 */
config := openapipackage.Config{
AccessKeyId: &accessKeyId,
AccessKeySecret: &accessKeySecret,
Endpoint: &endpoint,
}
client, err := mgwpackage.NewClient(&config)
if err != nil{
fmt.Errorf("create client failed, %v", err)
return
}
// MaxBandwidth, MaxQps 默认为0, 表示没有限制, MaxBandWidth的单位是bit,请按照实际需求填写。
maxBandwidth := int64(1073741824)
maxQps := int32(1000)
request := &mgwpackage.CreateTunnelRequest{
ImportTunnel: &mgwpackage.CreateTunnelInfo{
TunnelQos : &mgwpackage.TunnelQos{
MaxBandwidth : &maxBandwidth,
MaxQps : &maxQps,
},
},
}
/** 步骤2:创建通道和代理(可选,请根据实际情况选择是否创建通道和代理),代理可以创建多个关联一个通道。 */
resp, err := client.CreateTunnel(&userId, request)
if err != nil {
fmt.Errorf("create tunnel failed, %v", err)
return
}
fmt.Println("通道ID:" + *resp.Headers["x-oss-import-tunnel-id"])
tunnelId := *resp.Headers["x-oss-import-tunnel-id"]
// 填写代理名称。
agentName := "exampleagent"
// 使用网络,公网请填写public, 专线或者VPN请填写vpc。
agentEndpoint := "public"
// 部署方式,目前仅支持填写default。
deployMethod := "default"
info := mgwpackage.CreateAgentInfo{
AgentEndpoint : &agentEndpoint,
DeployMethod : &deployMethod,
Name : &agentName,
TunnelId : &tunnelId,
}
_, err = client.CreateAgent(&userId, &mgwpackage.CreateAgentRequest{ImportAgent: &info})
if err != nil {
fmt.Errorf("create agent failed, %v", err)
return
}
/** 步骤3:创建源地址和目的地址并验证可用性,这里以创建源为oss,目的为oss的地址为例,其它源示例请参见数据地址篇创建数据地址小节。 */
// 填写数据地址名称。
srcAddress := "examplesrcaddress"
srcAddressType := "oss"
// 以下参数请根据实际值填写。
srcRegionId := "oss-cn-beijing"
srcBucket := "examplebucket"
srcPrefix := "***/"
srcRoleName := "rolename_******"
// 关联的代理列表,多个代理可使用,分隔。在不使用代理的情况下,该项可不填。
agentList := agentName
srcDetail := mgwpackage.AddressDetail{
AddressType: &srcAddressType,
RegionId: &srcRegionId,
Bucket: &srcBucket,
Prefix: &srcPrefix,
Role: &srcRoleName,
AgentList: &agentList,
}
_, err = client.CreateAddress(&userId, &mgwpackage.CreateAddressRequest{
ImportAddress: &mgwpackage.CreateAddressInfo{
Name: &srcAddress,
AddressDetail: &srcDetail,
}})
if err != nil{
fmt.Errorf("create address failed, %v", err)
return
}
// 验证源端数据地址可用性。
verifySrcAddrResp, err := client.VerifyAddress(&userId, &srcAddress)
if err != nil {
fmt.Errorf("verify address failed, %v", err)
return
}
if *verifySrcAddrResp.Body.VerifyAddressResponse.Status != "available" {
fmt.Errorf("verify address status failed, status: %v", *verifySrcAddrResp.Body.VerifyAddressResponse.Status)
return
}
// 填写数据地址名称。
destAddress := "exampledestaddress"
destAddressType := "oss"
// 以下参数请根据实际值填写。
destRegionId := "oss-cn-beijing"
destBucket := "examplebucket"
destPrefix := "***/"
destRoleName := "rolename_******"
detail := mgwpackage.AddressDetail{
AddressType: &destAddressType,
RegionId: &destRegionId,
Bucket: &destBucket,
Prefix: &destPrefix,
Role: &destRoleName,
}
_, err = client.CreateAddress(&userId, &mgwpackage.CreateAddressRequest{
ImportAddress: &mgwpackage.CreateAddressInfo{
Name: &destAddress,
AddressDetail: &detail,
}})
if err != nil{
fmt.Errorf("create address failed, %v", err)
return
}
verifyDestAddrResp, err := client.VerifyAddress(&userId, &srcAddress)
if err != nil {
fmt.Errorf("verify address failed, %v", err)
return
}
if *verifyDestAddrResp.Body.VerifyAddressResponse.Status != "available" {
fmt.Errorf("verify address status failed, status: %v", *verifySrcAddrResp.Body.VerifyAddressResponse.Status)
return
}
/** 步骤4:创建并启动任务 */
jobName := "examplejob"
/*
overwriteMode和transferMode需要组合使用,具体组合含义如下
always,all 全覆盖
always,lastmodified 根据文件最后修改时间覆盖
never,all 不覆盖
*/
/* 文件覆盖方式, 可能的值 1.never 2.always */
overwriteMode := "always"
/* 文件传输的方式, 可能的值为 1.changed 2.all 3.lastmodified */
transferMode := "lastmodified"
// maxBandWidth,maxImportTaskQps 根据实际需求值填写。
maxImportTaskQps := int64(1000)
maxBandWidth := int64(2147483648)
// 如果maxImportTaskQps值为0或者不设置,会被设置为默认值;MaxBandWidth值为0或者不设置,会被设置为默认值,maxBandWidth值单位为bits。
importQos := mgwpackage.ImportQos{
MaxImportTaskQps: &maxImportTaskQps,
MaxBandWidth: &maxBandWidth,
}
// 配置过滤规则,包含文件类型过滤器、文件过滤器、时间过滤器,具体参数含义请参看API文档。
// 文件类型过滤器,适用于localfs。
excludeSymlink := false
excludeDir := false
fileTypeFilters := &mgwpackage.FileTypeFilters{
&excludeDir, &excludeSymlink,
}
// 文件过滤器,根据实际需求值填写。
var includeRegex []*string
var excludeRegex []*string
jpgFile := ".*.jpg"
gifFile := ".*.gif"
txtFile := ".*.txt"
jsFile := ".*.js"
includeRegex = append(includeRegex, &jpgFile, &gifFile)
excludeRegex = append(excludeRegex, &txtFile, &jsFile)
KeyFilters := &mgwpackage.KeyFilters{
Excludes: &mgwpackage.KeyFilterItem{
excludeRegex,
}, Includes: &mgwpackage.KeyFilterItem{
includeRegex,
},
}
// 时间过滤器, 时间格式遵循UTC时间格式,根据实际需求值填写。
includeStartTime := "2006-01-01T00:00:00Z"
includeEndTime := "2007-01-01T00:00:00Z"
excludeStartTime := "2009-01-01T00:00:00Z"
excludeEndTime := "2010-01-01T00:00:00Z"
includeTimeFilter := []*mgwpackage.TimeFilter{{
&includeStartTime, &includeEndTime,
}}
includeLastModifyFilters := &mgwpackage.LastModifyFilterItem{
includeTimeFilter,
}
excludeTimeFilter := []*mgwpackage.TimeFilter{{
EndTime: &excludeStartTime, StartTime: &excludeEndTime,
}}
excludeLastModifyFilters := &mgwpackage.LastModifyFilterItem{
TimeFilter: excludeTimeFilter,
}
lastModifiedFilters := &mgwpackage.LastModifiedFilters{
Excludes: excludeLastModifyFilters, Includes: includeLastModifyFilters,
}
filterRule := mgwpackage.FilterRule{
LastModifiedFilters: lastModifiedFilters,
KeyFilters: KeyFilters,
FileTypeFilters: fileTypeFilters,
}
// 配置调度规则,具体参数含义请参看API文档。
maxScheduleCount := int64(5)
startCronExpression := "0 0 10 * * ?"
suspendCronExpression := "0 0 14 * * ?"
scheduleRule := mgwpackage.ScheduleRule{
MaxScheduleCount: &maxScheduleCount,
StartCronExpression: &startCronExpression,
SuspendCronExpression: &suspendCronExpression,
}
_, err = client.CreateJob(&userId, &mgwpackage.CreateJobRequest{ImportJob: &mgwpackage.CreateJobInfo{
Name: &jobName,
TransferMode: &transferMode,
OverwriteMode: &overwriteMode,
SrcAddress: &srcAddress,
DestAddress: &destAddress,
ImportQos: &importQos,
FilterRule: &filterRule,
ScheduleRule: &scheduleRule,
}})
if err != nil {
fmt.Errorf("create job failed, %v", err)
return
}
status := "IMPORT_JOB_LAUNCHING"
_, err = client.UpdateJob(&userId, &jobName, &mgwpackage.UpdateJobRequest{
&mgwpackage.UpdateJobInfo{Status: &status},
})
if err != nil {
fmt.Errorf("update job failed, %v", err)
return
}
/** 步骤5:循环查看当前任务状态 */
for {
jobResp, err := client.GetJob(&userId, &jobName, &mgwpackage.GetJobRequest{})
if err != nil {
fmt.Errorf("get job status failed, %v", err)
return
}
if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_INTERRUPTED" {
fmt.Errorf("job is interrupted")
return
}
if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_FINISHED" {
fmt.Printf("job is finished")
break;
}
time.Sleep(60)
}
/** 步骤6:任务结束后查看结果。 */
resp, err := client.ListJobHistory(&userId, &jobName, &mgwpackage.ListJobHistoryRequest{})
if err != nil {
fmt.Errorf("listJobHistory failed,%v", err)
return
}
fmt.Println(resp.Body.JobHistoryList.JobHistory)
}
场景二:重试失败文件
迁移任务运行完成后,可能有失败文件,迁移服务会为这些失败文件构造一份失败文件列表,以下示例先获取失败文件列表的详情,然后据此创建一个新的数据地址,再创建一个重试子任务。通过这种方式,您能够重新迁移这些失败的文件。
package main
import (
"fmt"
openapipackage "github.com/alibabacloud-go/darabonba-openapi/v2/client"
mgwpackage "github.com/alibabacloud-go/hcs-mgw-20240626/client"
"github.com/alibabacloud-go/hcs-mgw-20240626/util"
"os"
"strconv"
"time"
)
/** 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。*/
var accessKeyId = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
var accessKeySecret = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
/** 填写主账号ID。*/
var userId = "11470***876***55";
func main() {
// 这里以北京区域为例。
endpoint := "mgw.cn-beijing.aliyuncs.com"
/** 步骤1:初始化。 */
config := openapipackage.Config{
AccessKeyId: &accessKeyId,
AccessKeySecret: &accessKeySecret,
Endpoint: &endpoint,
}
client, err := mgwpackage.NewClient(&config)
if err != nil{
fmt.Errorf("create client failed, %v", err)
return
}
/** 步骤2:循环查看任务状态,查看任务是否结束或者中断。 */
// 下列参数值请根据实际需求填写。
runtimeId := int32(1)
jobName := "examplejob"
srcAddress := "examplesrcaddress"
destAddress := "exampledestaddress"
for {
jobResp, err := client.GetJob(&userId, &jobName, &mgwpackage.GetJobRequest{})
if err != nil {
fmt.Errorf("get job status failed, %v", err)
return
}
if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_INTERRUPTED" {
fmt.Errorf("job is interrupted, can not retry")
return
}
if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_FINISHED" {
break
}
time.Sleep(60)
}
/** 步骤3:ListJobHistory,查看是否有失败文件。 */
count := int32(1)
marker := ""
listHistoryResp, err := client.ListJobHistory(&userId, &jobName, &mgwpackage.ListJobHistoryRequest{
Marker: &marker,
Count: &count,
RuntimeId: &runtimeId,
})
if err != nil || len(listHistoryResp.Body.JobHistoryList.JobHistory) < 1 {
fmt.Errorf("list job history failed, %v", err)
return
}
if *listHistoryResp.Body.JobHistoryList.JobHistory[0].FailedCount <= 0 {
fmt.Sprintf("job %v no need retry", jobName)
return
}
/** 步骤4:循环GetJobResult,等待Ready。 */
for {
result, err := client.GetJobResult(&userId, &jobName, &mgwpackage.GetJobResultRequest{
RuntimeId: &runtimeId,
})
if err != nil {
fmt.Errorf("get job result failed, %v", err)
return
}
if *result.Body.ImportJobResult.ReadyRetry == "NoNeed" {
fmt.Sprintf("job %v no need retry", jobName)
return
}
if *result.Body.ImportJobResult.ReadyRetry == "Ready" {
break
}
time.Sleep(60)
}
err = Retry(client, jobName, srcAddress, destAddress, runtimeId)
if err != nil {
fmt.Errorf("retry job failed, %v", err)
return
}
}
func Retry(client *mgwpackage.Client, jobName string, srcAddress string, destAddress string, runtimeId int32) error {
/** 步骤5:根据失败文件清单信息创建新的数据地址并且验证是否可用。 */
resultResp, err := client.GetJobResult(&userId, &jobName, &mgwpackage.GetJobResultRequest{
RuntimeId: &runtimeId,
})
jobResp, err := client.GetJob(&userId, &jobName, &mgwpackage.GetJobRequest{})
if err != nil {
return err
}
addressResp, err := client.GetAddress(&userId, &srcAddress)
if err != nil {
return err
}
nowTime := strconv.FormatInt(time.Now().Unix(), 10)
retrySrcAddress := srcAddress + "_retry_" + nowTime
_, err = client.CreateAddress(&userId, &mgwpackage.CreateAddressRequest{
ImportAddress: &mgwpackage.CreateAddressInfo{
Name: &retrySrcAddress,
AddressDetail: &mgwpackage.AddressDetail{
AddressType: resultResp.Body.ImportJobResult.AddressType,
InvLocation: resultResp.Body.ImportJobResult.InvLocation,
InvBucket: resultResp.Body.ImportJobResult.InvBucket,
InvAccessId: resultResp.Body.ImportJobResult.InvAccessId,
InvAccessSecret: resultResp.Body.ImportJobResult.InvAccessSecret,
InvPath: resultResp.Body.ImportJobResult.InvPath,
InvRegionId: resultResp.Body.ImportJobResult.InvRegionId,
Domain: addressResp.Body.ImportAddress.AddressDetail.Domain,
RegionId: addressResp.Body.ImportAddress.AddressDetail.RegionId,
AccessId: addressResp.Body.ImportAddress.AddressDetail.AccessId,
AccessSecret: addressResp.Body.ImportAddress.AddressDetail.AccessSecret,
AgentList: addressResp.Body.ImportAddress.AddressDetail.AgentList,
Prefix: addressResp.Body.ImportAddress.AddressDetail.Prefix,
Role: addressResp.Body.ImportAddress.AddressDetail.Role,
InvRole: addressResp.Body.ImportAddress.AddressDetail.InvRole,
Bucket: addressResp.Body.ImportAddress.AddressDetail.Bucket,
},
}})
if err != nil {
return err
}
verifyResp, err := client.VerifyAddress(&userId, &retrySrcAddress)
if err != nil {
return err
} else if *verifyResp.Body.VerifyAddressResponse.Status != "available" {
return fmt.Errorf("verify job %v failed, status: %v", jobName, *verifyResp.Body.VerifyAddressResponse.Status)
}
/** 步骤6:创建重试子任务并启动。 */
retryJobName := jobName + "_retry_" + nowTime
_, err = client.CreateJob(&userId, &mgwpackage.CreateJobRequest{ImportJob: &mgwpackage.CreateJobInfo{
Name: &retryJobName,
TransferMode: jobResp.Body.ImportJob.TransferMode,
OverwriteMode: jobResp.Body.ImportJob.OverwriteMode,
FilterRule: jobResp.Body.ImportJob.FilterRule,
CreateReport: jobResp.Body.ImportJob.CreateReport,
Audit: jobResp.Body.ImportJob.Audit,
SrcAddress: &retrySrcAddress,
DestAddress: &destAddress,
ParentVersion: resultResp.Body.ImportJobResult.Version,
}})
if err != nil {
return err
}
launchStatus := "IMPORT_JOB_LAUNCHING"
_, err = client.UpdateJob(&userId, &retryJobName, &mgwpackage.UpdateJobRequest{
ImportJob: &mgwpackage.UpdateJobInfo{
Status: &launchStatus,
}})
return err
}
文档内容是否对您有帮助?