场景示例

更新时间:

本文介绍运行任务并查看结果、重试失败文件的完整流程。

场景一:运行任务

以下示例用于创建通道、创建代理、创建数据地址、创建迁移任务,运行任务并查看运行结果。

说明
  • 在不使用代理的情况下,请忽略下面代码中的创建通道和代理的部分。使用代理前需要先部署代理,详情请参见 代理管理

package main

import (
	"fmt"
	openapipackage "github.com/alibabacloud-go/darabonba-openapi/v2/client"
	mgwpackage "github.com/alibabacloud-go/hcs-mgw-20240626/client"
	"os"
)

/** 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。*/
var accessKeyId = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
var accessKeySecret = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
/** 填写主账号ID。*/
var userId = "11470***876***55";

func main()  {
	// 这里以北京区域为例。
	endpoint := "mgw.cn-beijing.aliyuncs.com"

	/** 步骤1: 初始化。 */
	config := openapipackage.Config{
		AccessKeyId:     &accessKeyId,
		AccessKeySecret: &accessKeySecret,
		Endpoint:        &endpoint,
	}
	client, err := mgwpackage.NewClient(&config)
	if err != nil{
		fmt.Errorf("create client failed, %v", err)
		return
	}
	// MaxBandwidth, MaxQps 默认为0, 表示没有限制, MaxBandWidth的单位是bit,请按照实际需求填写。
	maxBandwidth := int64(1073741824)
	maxQps := int32(1000)
	request := &mgwpackage.CreateTunnelRequest{
		ImportTunnel: &mgwpackage.CreateTunnelInfo{
			TunnelQos : &mgwpackage.TunnelQos{
				MaxBandwidth : &maxBandwidth,
				MaxQps : &maxQps,
			},
		},
	}

        /** 步骤2:创建通道和代理(可选,请根据实际情况选择是否创建通道和代理),代理可以创建多个关联一个通道。 */
	resp, err := client.CreateTunnel(&userId, request)
	if err != nil {
		fmt.Errorf("create tunnel failed, %v", err)
		return
	}
	fmt.Println("通道ID:" + *resp.Headers["x-oss-import-tunnel-id"])
	tunnelId := *resp.Headers["x-oss-import-tunnel-id"]

	// 填写代理名称。
	agentName := "exampleagent"
	// 使用网络,公网请填写public, 专线或者VPN请填写vpc。
	agentEndpoint := "public"
	// 部署方式,目前仅支持填写default。
	deployMethod := "default"
	info := mgwpackage.CreateAgentInfo{
		AgentEndpoint : &agentEndpoint,
		DeployMethod : &deployMethod,
		Name : &agentName,
		TunnelId : &tunnelId,
	}
	_, err = client.CreateAgent(&userId, &mgwpackage.CreateAgentRequest{ImportAgent: &info})
	if err != nil {
		fmt.Errorf("create agent failed, %v", err)
		return
	}

	/** 步骤3:创建源地址和目的地址并验证可用性,这里以创建源为oss,目的为oss的地址为例,其它源示例请参见数据地址篇创建数据地址小节。 */
	// 填写数据地址名称。
	srcAddress := "examplesrcaddress"
	srcAddressType := "oss"
	// 以下参数请根据实际值填写。
	srcRegionId := "oss-cn-beijing"
	srcBucket := "examplebucket"
	srcPrefix := "***/"
	srcRoleName := "rolename_******"
	// 关联的代理列表,多个代理可使用,分隔。在不使用代理的情况下,该项可不填。
	agentList :=  agentName
	srcDetail := mgwpackage.AddressDetail{
		AddressType:  &srcAddressType,
		RegionId:     &srcRegionId,
		Bucket:       &srcBucket,
		Prefix:       &srcPrefix,
		Role:         &srcRoleName,
		AgentList:    &agentList,
	}
	_, err = client.CreateAddress(&userId, &mgwpackage.CreateAddressRequest{
		ImportAddress: &mgwpackage.CreateAddressInfo{
			Name: &srcAddress,
			AddressDetail: &srcDetail,
		}})
	if err != nil{
		fmt.Errorf("create address failed, %v", err)
		return
	}
	// 验证源端数据地址可用性。
	verifySrcAddrResp, err := client.VerifyAddress(&userId, &srcAddress)
	if err != nil {
		fmt.Errorf("verify address failed, %v", err)
		return
	}
	if *verifySrcAddrResp.Body.VerifyAddressResponse.Status != "available" {
		fmt.Errorf("verify address status failed, status: %v", *verifySrcAddrResp.Body.VerifyAddressResponse.Status)
		return
	}

	// 填写数据地址名称。
	destAddress := "exampledestaddress"
	destAddressType := "oss"
	// 以下参数请根据实际值填写。
	destRegionId := "oss-cn-beijing"
	destBucket := "examplebucket"
	destPrefix := "***/"
	destRoleName := "rolename_******"
	detail := mgwpackage.AddressDetail{
		AddressType:  &destAddressType,
		RegionId:     &destRegionId,
		Bucket:       &destBucket,
		Prefix:       &destPrefix,
		Role:         &destRoleName,
	}
	_, err = client.CreateAddress(&userId, &mgwpackage.CreateAddressRequest{
		ImportAddress: &mgwpackage.CreateAddressInfo{
			Name: &destAddress,
			AddressDetail: &detail,
		}})
	if err != nil{
		fmt.Errorf("create address failed, %v", err)
		return
	}
	verifyDestAddrResp, err := client.VerifyAddress(&userId, &srcAddress)
	if err != nil {
		fmt.Errorf("verify address failed, %v", err)
		return
	}
	if *verifyDestAddrResp.Body.VerifyAddressResponse.Status != "available" {
		fmt.Errorf("verify address status failed, status: %v", *verifySrcAddrResp.Body.VerifyAddressResponse.Status)
		return
	}

	/** 步骤4:创建并启动任务 */
	jobName := "examplejob"
	/*
          overwriteMode和transferMode需要组合使用,具体组合含义如下
          always,all 全覆盖
          always,lastmodified 根据文件最后修改时间覆盖
          never,all 不覆盖
        */
	/* 文件覆盖方式, 可能的值 1.never  2.always */
	overwriteMode := "always"
	/* 文件传输的方式, 可能的值为 1.changed 2.all 3.lastmodified */
	transferMode := "lastmodified"

	// maxBandWidth,maxImportTaskQps 根据实际需求值填写。
	maxImportTaskQps := int64(1000)
	maxBandWidth := int64(2147483648)
	// 如果maxImportTaskQps值为0或者不设置,会被设置为默认值;MaxBandWidth值为0或者不设置,会被设置为默认值,maxBandWidth值单位为bits。
	importQos := mgwpackage.ImportQos{
		MaxImportTaskQps: &maxImportTaskQps,
		MaxBandWidth:     &maxBandWidth,
	}

	// 配置过滤规则,包含文件类型过滤器、文件过滤器、时间过滤器,具体参数含义请参看API文档。
	// 文件类型过滤器,适用于localfs。
	excludeSymlink := false
	excludeDir := false
	fileTypeFilters := &mgwpackage.FileTypeFilters{
		&excludeDir, &excludeSymlink,
	}

	// 文件过滤器,根据实际需求值填写。
	var includeRegex []*string
	var excludeRegex []*string
	jpgFile := ".*.jpg"
	gifFile := ".*.gif"
	txtFile := ".*.txt"
	jsFile := ".*.js"
	includeRegex = append(includeRegex, &jpgFile, &gifFile)
	excludeRegex = append(excludeRegex, &txtFile, &jsFile)
	KeyFilters := &mgwpackage.KeyFilters{
		Excludes: &mgwpackage.KeyFilterItem{
			excludeRegex,
		}, Includes: &mgwpackage.KeyFilterItem{
			includeRegex,
		},
	}
	// 时间过滤器, 时间格式遵循UTC时间格式,根据实际需求值填写。
	includeStartTime := "2006-01-01T00:00:00Z"
	includeEndTime := "2007-01-01T00:00:00Z"
	excludeStartTime := "2009-01-01T00:00:00Z"
	excludeEndTime := "2010-01-01T00:00:00Z"
	includeTimeFilter := []*mgwpackage.TimeFilter{{
		&includeStartTime, &includeEndTime,
	}}
	includeLastModifyFilters := &mgwpackage.LastModifyFilterItem{
		includeTimeFilter,
	}
	excludeTimeFilter := []*mgwpackage.TimeFilter{{
		EndTime: &excludeStartTime, StartTime: &excludeEndTime,
	}}
	excludeLastModifyFilters := &mgwpackage.LastModifyFilterItem{
		TimeFilter: excludeTimeFilter,
	}
	lastModifiedFilters := &mgwpackage.LastModifiedFilters{
		Excludes: excludeLastModifyFilters, Includes: includeLastModifyFilters,
	}
	filterRule := mgwpackage.FilterRule{
		LastModifiedFilters: lastModifiedFilters,
		KeyFilters:          KeyFilters,
		FileTypeFilters:     fileTypeFilters,
	}

	// 配置调度规则,具体参数含义请参看API文档。
	maxScheduleCount := int64(5)
	startCronExpression := "0 0 10 * * ?"
	suspendCronExpression := "0 0 14 * * ?"
	scheduleRule := mgwpackage.ScheduleRule{
		MaxScheduleCount:      &maxScheduleCount,
		StartCronExpression:   &startCronExpression,
		SuspendCronExpression: &suspendCronExpression,
	}
	_, err = client.CreateJob(&userId, &mgwpackage.CreateJobRequest{ImportJob: &mgwpackage.CreateJobInfo{
		Name:          &jobName,
		TransferMode:  &transferMode,
		OverwriteMode: &overwriteMode,
		SrcAddress:    &srcAddress,
		DestAddress:   &destAddress,
		ImportQos:     &importQos,
		FilterRule:    &filterRule,
		ScheduleRule:  &scheduleRule,
	}})
	if err != nil {
		fmt.Errorf("create job failed, %v", err)
		return
	}

	status := "IMPORT_JOB_LAUNCHING"
	_, err = client.UpdateJob(&userId, &jobName, &mgwpackage.UpdateJobRequest{
		&mgwpackage.UpdateJobInfo{Status: &status},
	})
	if err != nil {
		fmt.Errorf("update job failed, %v", err)
		return
	}
	
	/** 步骤5:循环查看当前任务状态 */
	for {
	        jobResp, err := client.GetJob(&userId, &jobName, &mgwpackage.GetJobRequest{})
		if err != nil {
			fmt.Errorf("get job status failed, %v", err)
			return
		}
		if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_INTERRUPTED" {
			fmt.Errorf("job is interrupted")
			return
		}
		if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_FINISHED" {
			fmt.Printf("job is finished")
			break;
		}
		time.Sleep(60)
	}
	
	/** 步骤6:任务结束后查看结果。 */
	resp, err := client.ListJobHistory(&userId, &jobName, &mgwpackage.ListJobHistoryRequest{})
	if err != nil {
	        fmt.Errorf("listJobHistory failed,%v", err)
		return
	}
	fmt.Println(resp.Body.JobHistoryList.JobHistory)
}

场景二:重试失败文件

迁移任务运行完成后,可能有失败文件,迁移服务会为这些失败文件构造一份失败文件列表,以下示例先获取失败文件列表的详情,然后据此创建一个新的数据地址,再创建一个重试子任务。通过这种方式,您能够重新迁移这些失败的文件。

package main

import (
	"fmt"
	openapipackage "github.com/alibabacloud-go/darabonba-openapi/v2/client"
	mgwpackage "github.com/alibabacloud-go/hcs-mgw-20240626/client"
	"github.com/alibabacloud-go/hcs-mgw-20240626/util"
	"os"
	"strconv"
	"time"
)

/** 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。*/
var accessKeyId = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
var accessKeySecret = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
/** 填写主账号ID。*/
var userId = "11470***876***55";

func main()  {
	// 这里以北京区域为例。
	endpoint := "mgw.cn-beijing.aliyuncs.com"

	/** 步骤1:初始化。 */
	config := openapipackage.Config{
		AccessKeyId:     &accessKeyId,
		AccessKeySecret: &accessKeySecret,
		Endpoint:        &endpoint,
	}
	client, err := mgwpackage.NewClient(&config)
	if err != nil{
		fmt.Errorf("create client failed, %v", err)
		return
	}
	
        /** 步骤2:循环查看任务状态,查看任务是否结束或者中断。 */
	// 下列参数值请根据实际需求填写。
	runtimeId := int32(1)
	jobName := "examplejob"
	srcAddress := "examplesrcaddress"
	destAddress := "exampledestaddress"

	for {
		jobResp, err := client.GetJob(&userId, &jobName, &mgwpackage.GetJobRequest{})
		if err != nil {
			fmt.Errorf("get job status failed, %v", err)
			return
		}
		if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_INTERRUPTED" {
			fmt.Errorf("job is interrupted, can not retry")
			return
		}
		if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_FINISHED" {
			break
		}
		time.Sleep(60)
	}
        /** 步骤3:ListJobHistory,查看是否有失败文件。 */
	count  := int32(1)
	marker := ""
	listHistoryResp, err := client.ListJobHistory(&userId, &jobName, &mgwpackage.ListJobHistoryRequest{
		Marker: &marker,
		Count:  &count,
		RuntimeId: &runtimeId,
	})
	if err != nil || len(listHistoryResp.Body.JobHistoryList.JobHistory) < 1 {
		fmt.Errorf("list job history failed, %v", err)
		return
	}
	if *listHistoryResp.Body.JobHistoryList.JobHistory[0].FailedCount <= 0 {
		fmt.Sprintf("job %v no need retry", jobName)
		return
	}
	
	/** 步骤4:循环GetJobResult,等待Ready。 */
	for {
		result, err := client.GetJobResult(&userId, &jobName, &mgwpackage.GetJobResultRequest{
			RuntimeId: &runtimeId,
		})
		if err != nil {
			fmt.Errorf("get job result failed, %v", err)
			return
		}
		if *result.Body.ImportJobResult.ReadyRetry == "NoNeed" {
			fmt.Sprintf("job %v no need retry", jobName)
			return
		}
		if *result.Body.ImportJobResult.ReadyRetry == "Ready" {
			break
		}
		time.Sleep(60)
	}
	err = Retry(client, jobName, srcAddress, destAddress, runtimeId)
	if err != nil {
		fmt.Errorf("retry job failed, %v", err)
		return
	}
}

func Retry(client *mgwpackage.Client, jobName string, srcAddress string, destAddress string, runtimeId int32)  error {
        /** 步骤5:根据失败文件清单信息创建新的数据地址并且验证是否可用。 */
	resultResp, err := client.GetJobResult(&userId, &jobName, &mgwpackage.GetJobResultRequest{
		RuntimeId: &runtimeId,
	})
	jobResp, err := client.GetJob(&userId, &jobName, &mgwpackage.GetJobRequest{})
	if err != nil {
		return err
	}
	addressResp, err := client.GetAddress(&userId, &srcAddress)
	if err != nil {
		return err
	}
	nowTime := strconv.FormatInt(time.Now().Unix(), 10)
	retrySrcAddress := srcAddress + "_retry_" + nowTime
	_, err = client.CreateAddress(&userId, &mgwpackage.CreateAddressRequest{
		ImportAddress: &mgwpackage.CreateAddressInfo{
			Name: &retrySrcAddress,
			AddressDetail: &mgwpackage.AddressDetail{
				AddressType:     resultResp.Body.ImportJobResult.AddressType,
				InvLocation:     resultResp.Body.ImportJobResult.InvLocation,
				InvBucket:       resultResp.Body.ImportJobResult.InvBucket,
				InvAccessId:     resultResp.Body.ImportJobResult.InvAccessId,
				InvAccessSecret: resultResp.Body.ImportJobResult.InvAccessSecret,
				InvPath:         resultResp.Body.ImportJobResult.InvPath,
				InvRegionId:     resultResp.Body.ImportJobResult.InvRegionId,
				Domain:          addressResp.Body.ImportAddress.AddressDetail.Domain,
				RegionId:        addressResp.Body.ImportAddress.AddressDetail.RegionId,
				AccessId:        addressResp.Body.ImportAddress.AddressDetail.AccessId,
				AccessSecret:    addressResp.Body.ImportAddress.AddressDetail.AccessSecret,
				AgentList:       addressResp.Body.ImportAddress.AddressDetail.AgentList,
				Prefix:          addressResp.Body.ImportAddress.AddressDetail.Prefix,
				Role:            addressResp.Body.ImportAddress.AddressDetail.Role,
				InvRole:         addressResp.Body.ImportAddress.AddressDetail.InvRole,
				Bucket:          addressResp.Body.ImportAddress.AddressDetail.Bucket,
			},
		}})
	if err != nil {
		return err
	}
	verifyResp, err := client.VerifyAddress(&userId, &retrySrcAddress)
	if err != nil {
		return err
	} else if *verifyResp.Body.VerifyAddressResponse.Status != "available" {
		return fmt.Errorf("verify job %v failed, status: %v", jobName, *verifyResp.Body.VerifyAddressResponse.Status)
	}
	
	/** 步骤6:创建重试子任务并启动。 */
	retryJobName := jobName + "_retry_" + nowTime
	_, err = client.CreateJob(&userId, &mgwpackage.CreateJobRequest{ImportJob: &mgwpackage.CreateJobInfo{
		Name:          &retryJobName,
		TransferMode:  jobResp.Body.ImportJob.TransferMode,
		OverwriteMode: jobResp.Body.ImportJob.OverwriteMode,
		FilterRule:    jobResp.Body.ImportJob.FilterRule,
		CreateReport:  jobResp.Body.ImportJob.CreateReport,
		Audit: 	       jobResp.Body.ImportJob.Audit,
		SrcAddress:    &retrySrcAddress,
		DestAddress:   &destAddress,
		ParentVersion: resultResp.Body.ImportJobResult.Version,
	}})
	if err != nil {
		return err
	}
	launchStatus := "IMPORT_JOB_LAUNCHING"
	_, err = client.UpdateJob(&userId, &retryJobName, &mgwpackage.UpdateJobRequest{
		ImportJob: &mgwpackage.UpdateJobInfo{
			Status: &launchStatus,
		}})
	return err
}