场景示例

更新时间:

本文介绍一些常见场景的代码示例,如从零开始运行一个迁移任务、重试失败文件等。

场景一:从零开始运行迁移任务

以下示例展示从零开始运行一个迁移任务,分为如下步骤:

  1. 创建通道,无需代理跳过此步骤。

  2. 创建代理,无需代理跳过此步骤。

  3. 创建源数据地址和目的数据地址。

  4. 创建迁移任务。

  5. 运行任务并查看运行结果。

说明

在不使用代理的情况下,请忽略下面代码中的创建通道和代理的部分。使用代理前需要先部署代理,详情请参见 代理管理

package main

import (
    "fmt"
    "log"
    "os"
    "time"

    openapipackage "github.com/alibabacloud-go/darabonba-openapi/v2/client"
    mgwpackage "github.com/alibabacloud-go/hcs-mgw-20240626/client"
)

/** 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。*/
var accessKeyId = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
var accessKeySecret = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

/** 填写主账号ID。*/
var userId = "11470***876***55"

func main() {
    // 这里以北京区域为例。
    endpoint := "mgw.cn-beijing.aliyuncs.com"

    /** 步骤1: 初始化。 */
    config := openapipackage.Config{
        AccessKeyId:     &accessKeyId,
        AccessKeySecret: &accessKeySecret,
        Endpoint:        &endpoint,
    }
    client, err := mgwpackage.NewClient(&config)
    if err != nil {
        log.Printf("create client failed: %v", err)
        return
    }
    // MaxBandwidth, MaxQps 默认为0, 表示没有限制, MaxBandWidth的单位是bit,请按照实际需求填写。
    maxBandwidth := int64(1073741824)
    maxQps := int32(1000)
    request := &mgwpackage.CreateTunnelRequest{
        ImportTunnel: &mgwpackage.CreateTunnelInfo{
            TunnelQos: &mgwpackage.TunnelQos{
                MaxBandwidth: &maxBandwidth,
                MaxQps:       &maxQps,
            },
        },
    }

    /** 步骤2:创建通道和代理(可选,请根据实际情况选择是否创建通道和代理),代理可以创建多个关联一个通道。 */
    resp, err := client.CreateTunnel(&userId, request)
    if err != nil {
        log.Printf("create tunnel failed: %v", err)
        return
    }
    fmt.Println("通道ID:" + *resp.Headers["x-oss-import-tunnel-id"])
    tunnelId := *resp.Headers["x-oss-import-tunnel-id"]

    // 填写代理名称。
    agentName := "exampleagent"
    // 使用网络,公网请填写public, 专线或者VPN请填写vpc。
    agentEndpoint := "public"
    // 部署方式,目前仅支持填写default。
    deployMethod := "default"
    info := mgwpackage.CreateAgentInfo{
        AgentEndpoint: &agentEndpoint,
        DeployMethod:  &deployMethod,
        Name:          &agentName,
        TunnelId:      &tunnelId,
    }
    _, err = client.CreateAgent(&userId, &mgwpackage.CreateAgentRequest{ImportAgent: &info})
    if err != nil {
        log.Printf("create agent failed: %v", err)
        return
    }

    /** 步骤3:创建源地址和目的地址并验证可用性,这里以创建源为oss,目的为oss的地址为例,其它源示例请参见数据地址篇创建数据地址小节。 */
    // 填写数据地址名称。
    srcAddress := "examplesrcaddress"
    srcAddressType := "oss"
    // 以下参数请根据实际值填写。
    srcRegionId := "oss-cn-beijing"
    srcBucket := "examplebucket"
    srcPrefix := "***/"
    srcRoleName := "rolename_******"
    // 关联的代理列表,多个代理可使用,分隔。在不使用代理的情况下,该项可不填。
    agentList := agentName
    srcDetail := mgwpackage.AddressDetail{
        AddressType: &srcAddressType,
        RegionId:    &srcRegionId,
        Bucket:      &srcBucket,
        Prefix:      &srcPrefix,
        Role:        &srcRoleName,
        AgentList:   &agentList,
    }
    _, err = client.CreateAddress(&userId, &mgwpackage.CreateAddressRequest{
        ImportAddress: &mgwpackage.CreateAddressInfo{
            Name:          &srcAddress,
            AddressDetail: &srcDetail,
        }})
    if err != nil {
        log.Printf("create address failed: %v", err)
        return
    }
    // 验证源端数据地址可用性。
    verifySrcAddrResp, err := client.VerifyAddress(&userId, &srcAddress)
    if err != nil {
        log.Printf("verify address failed: %v", err)
        return
    }
    if *verifySrcAddrResp.Body.VerifyAddressResponse.Status != "available" {
        log.Printf("verify address status failed, status: " + *verifySrcAddrResp.Body.VerifyAddressResponse.Status)
        return
    }

    // 填写数据地址名称。
    destAddress := "exampledestaddress"
    destAddressType := "oss"
    // 以下参数请根据实际值填写。
    destRegionId := "oss-cn-beijing"
    destBucket := "examplebucket"
    destPrefix := "***/"
    destRoleName := "rolename_******"
    detail := mgwpackage.AddressDetail{
        AddressType: &destAddressType,
        RegionId:    &destRegionId,
        Bucket:      &destBucket,
        Prefix:      &destPrefix,
        Role:        &destRoleName,
    }
    _, err = client.CreateAddress(&userId, &mgwpackage.CreateAddressRequest{
        ImportAddress: &mgwpackage.CreateAddressInfo{
            Name:          &destAddress,
            AddressDetail: &detail,
        }})
    if err != nil {
        log.Printf("create address failed: %v", err)
        return
    }
    verifyDestAddrResp, err := client.VerifyAddress(&userId, &srcAddress)
    if err != nil {
        log.Printf("verify address failed: %v", err)
        return
    }
    if *verifyDestAddrResp.Body.VerifyAddressResponse.Status != "available" {
        log.Printf("verify address status failed, status: " + *verifySrcAddrResp.Body.VerifyAddressResponse.Status)
        return
    }

    /** 步骤4:创建并启动任务 */
    jobName := "examplejob"
    /*
       overwriteMode和transferMode需要组合使用,具体组合含义如下
       always,all 全覆盖
       always,lastmodified 根据文件最后修改时间覆盖
       never,changed 不覆盖
    */
    /* 文件覆盖方式, 可能的值 1.never  2.always */
    overwriteMode := "always"
    /* 文件传输的方式, 可能的值为 1.changed 2.all 3.lastmodified */
    transferMode := "lastmodified"

    // maxBandWidth,maxImportTaskQps 根据实际需求值填写。
    maxImportTaskQps := int64(1000)
    maxBandWidth := int64(2147483648)
    // 如果maxImportTaskQps值为0或者不设置,会被设置为默认值;MaxBandWidth值为0或者不设置,会被设置为默认值,maxBandWidth值单位为bits。
    importQos := mgwpackage.ImportQos{
        MaxImportTaskQps: &maxImportTaskQps,
        MaxBandWidth:     &maxBandWidth,
    }

    // 配置过滤规则,包含文件类型过滤器、文件过滤器、时间过滤器,具体参数含义请参看API文档。
    // 文件类型过滤器,适用于localfs。
    excludeSymlink := false
    excludeDir := false
    fileTypeFilters := &mgwpackage.FileTypeFilters{
        &excludeDir, &excludeSymlink,
    }

    // 文件过滤器,根据实际需求值填写。
    var includeRegex []*string
    var excludeRegex []*string
    jpgFile := ".*.jpg"
    gifFile := ".*.gif"
    txtFile := ".*.txt"
    jsFile := ".*.js"
    includeRegex = append(includeRegex, &jpgFile, &gifFile)
    excludeRegex = append(excludeRegex, &txtFile, &jsFile)
    KeyFilters := &mgwpackage.KeyFilters{
        Excludes: &mgwpackage.KeyFilterItem{
            excludeRegex,
        }, Includes: &mgwpackage.KeyFilterItem{
            includeRegex,
        },
    }
    // 时间过滤器, 时间格式遵循UTC时间格式,根据实际需求值填写。
    includeStartTime := "2006-01-01T00:00:00Z"
    includeEndTime := "2007-01-01T00:00:00Z"
    excludeStartTime := "2009-01-01T00:00:00Z"
    excludeEndTime := "2010-01-01T00:00:00Z"
    includeTimeFilter := []*mgwpackage.TimeFilter{{
        &includeStartTime, &includeEndTime,
    }}
    includeLastModifyFilters := &mgwpackage.LastModifyFilterItem{
        includeTimeFilter,
    }
    excludeTimeFilter := []*mgwpackage.TimeFilter{{
        EndTime: &excludeStartTime, StartTime: &excludeEndTime,
    }}
    excludeLastModifyFilters := &mgwpackage.LastModifyFilterItem{
        TimeFilter: excludeTimeFilter,
    }
    lastModifiedFilters := &mgwpackage.LastModifiedFilters{
        Excludes: excludeLastModifyFilters, Includes: includeLastModifyFilters,
    }
    filterRule := mgwpackage.FilterRule{
        LastModifiedFilters: lastModifiedFilters,
        KeyFilters:          KeyFilters,
        FileTypeFilters:     fileTypeFilters,
    }

    // 配置调度规则,具体参数含义请参看API文档。
    maxScheduleCount := int64(5)
    startCronExpression := "0 0 10 * * ?"
    suspendCronExpression := "0 0 14 * * ?"
    scheduleRule := mgwpackage.ScheduleRule{
        MaxScheduleCount:      &maxScheduleCount,
        StartCronExpression:   &startCronExpression,
        SuspendCronExpression: &suspendCronExpression,
    }
    _, err = client.CreateJob(&userId, &mgwpackage.CreateJobRequest{ImportJob: &mgwpackage.CreateJobInfo{
        Name:          &jobName,
        TransferMode:  &transferMode,
        OverwriteMode: &overwriteMode,
        SrcAddress:    &srcAddress,
        DestAddress:   &destAddress,
        ImportQos:     &importQos,
        FilterRule:    &filterRule,
        ScheduleRule:  &scheduleRule,
    }})
    if err != nil {
        log.Printf("create job failed: %v", err)
        return
    }

    status := "IMPORT_JOB_LAUNCHING"
    _, err = client.UpdateJob(&userId, &jobName, &mgwpackage.UpdateJobRequest{
        &mgwpackage.UpdateJobInfo{Status: &status},
    })
    if err != nil {
        log.Printf("update job failed: %v", err)
        return
    }

    /** 步骤5:循环查看当前任务状态 */
    for {
        jobResp, err := client.GetJob(&userId, &jobName, &mgwpackage.GetJobRequest{})
        if err != nil {
            log.Printf("get job status failed: %v", err)
            return
        }
        if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_INTERRUPTED" {
            log.Printf("job is interrupted")
            return
        }
        if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_FINISHED" {
            fmt.Println("job is finished")
            break
        }
        time.Sleep(60)
    }

    /** 步骤6:任务结束后查看结果。 */
    historyResp, err := client.ListJobHistory(&userId, &jobName, &mgwpackage.ListJobHistoryRequest{})
    if err != nil {
        log.Printf("listJobHistory failed: %v", err)
        return
    }
    fmt.Println(historyResp.Body.JobHistoryList.JobHistory)
}

场景二:重试失败文件

迁移任务运行完成后,可能有失败文件,迁移服务会为这些失败文件构造一份失败文件列表,以下示例展示如何重试这些失败文件,分为如下步骤:

  1. 获取失败文件列表的详情。

  2. 创建一个新的数据地址。

  3. 创建一个重试子任务。

  4. 运行子任务并查看运行结果。

说明

通过API重试失败文件较为复杂,建议您通过控制台操作。

package main

import (
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    openapipackage "github.com/alibabacloud-go/darabonba-openapi/v2/client"
    mgwpackage "github.com/alibabacloud-go/hcs-mgw-20240626/client"
)

/** 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。*/
var accessKeyId = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
var accessKeySecret = os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

/** 填写主账号ID。*/
var userId = "11470***876***55"

func main() {
    // 这里以北京区域为例。
    endpoint := "mgw.cn-beijing.aliyuncs.com"

    /** 步骤1:初始化。 */
    config := openapipackage.Config{
        AccessKeyId:     &accessKeyId,
        AccessKeySecret: &accessKeySecret,
        Endpoint:        &endpoint,
    }
    client, err := mgwpackage.NewClient(&config)
    if err != nil {
        log.Printf("create client failed: %v", err)
        return
    }

    /** 步骤2:循环查看任务状态,查看任务是否结束或者中断。 */
    // 下列参数值请根据实际需求填写。
    runtimeId := int32(1)
    jobName := "examplejob"
    srcAddress := "examplesrcaddress"
    destAddress := "exampledestaddress"

    for {
        jobResp, err := client.GetJob(&userId, &jobName, &mgwpackage.GetJobRequest{})
        if err != nil {
            log.Printf("get job status failed: %v", err)
            return
        }
        if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_INTERRUPTED" {
            log.Printf("job is interrupted, can not retry")
            return
        }
        if *jobResp.Body.ImportJob.Status == "IMPORT_JOB_FINISHED" {
            break
        }
        time.Sleep(60)
    }
    /** 步骤3:ListJobHistory,查看是否有失败文件。 */
    count := int32(1)
    marker := ""
    listHistoryResp, err := client.ListJobHistory(&userId, &jobName, &mgwpackage.ListJobHistoryRequest{
        Marker:    &marker,
        Count:     &count,
        RuntimeId: &runtimeId,
    })
    if err != nil || len(listHistoryResp.Body.JobHistoryList.JobHistory) < 1 {
        log.Printf("list job history failed: %v", err)
        return
    }
    if *listHistoryResp.Body.JobHistoryList.JobHistory[0].FailedCount <= 0 {
        fmt.Printf("job %v no need retry\n", jobName)
        return
    }

    /** 步骤4:循环GetJobResult,等待Ready。 */
    for {
        result, err := client.GetJobResult(&userId, &jobName, &mgwpackage.GetJobResultRequest{
            RuntimeId: &runtimeId,
        })
        if err != nil {
            log.Printf("get job result failed: %v", err)
            return
        }
        if *result.Body.ImportJobResult.ReadyRetry == "NoNeed" {
            fmt.Printf("job %v no need retry", jobName)
            return
        }
        if *result.Body.ImportJobResult.ReadyRetry == "Ready" {
            break
        }
        time.Sleep(60)
    }
    err = Retry(client, jobName, srcAddress, destAddress, runtimeId)
    if err != nil {
        log.Printf("retry job failed: %v", err)
        return  
    }
}

func Retry(client *mgwpackage.Client, jobName string, srcAddress string, destAddress string, runtimeId int32) error {
    /** 步骤5:根据失败文件清单信息创建新的数据地址并且验证是否可用。 */
    resultResp, err := client.GetJobResult(&userId, &jobName, &mgwpackage.GetJobResultRequest{
        RuntimeId: &runtimeId,
    })
    jobResp, err := client.GetJob(&userId, &jobName, &mgwpackage.GetJobRequest{})
    if err != nil {
        return err
    }
    addressResp, err := client.GetAddress(&userId, &srcAddress)
    if err != nil {
        return err
    }
    nowTime := strconv.FormatInt(time.Now().Unix(), 10)
    retrySrcAddress := srcAddress + "_retry_" + nowTime
    _, err = client.CreateAddress(&userId, &mgwpackage.CreateAddressRequest{
        ImportAddress: &mgwpackage.CreateAddressInfo{
            Name: &retrySrcAddress,
            AddressDetail: &mgwpackage.AddressDetail{
                AddressType:     resultResp.Body.ImportJobResult.AddressType,
                InvLocation:     resultResp.Body.ImportJobResult.InvLocation,
                InvBucket:       resultResp.Body.ImportJobResult.InvBucket,
                InvAccessId:     resultResp.Body.ImportJobResult.InvAccessId,
                InvAccessSecret: resultResp.Body.ImportJobResult.InvAccessSecret,
                InvPath:         resultResp.Body.ImportJobResult.InvPath,
                InvRegionId:     resultResp.Body.ImportJobResult.InvRegionId,
                Domain:          addressResp.Body.ImportAddress.AddressDetail.Domain,
                RegionId:        addressResp.Body.ImportAddress.AddressDetail.RegionId,
                AccessId:        addressResp.Body.ImportAddress.AddressDetail.AccessId,
                AccessSecret:    addressResp.Body.ImportAddress.AddressDetail.AccessSecret,
                AgentList:       addressResp.Body.ImportAddress.AddressDetail.AgentList,
                Prefix:          addressResp.Body.ImportAddress.AddressDetail.Prefix,
                Role:            addressResp.Body.ImportAddress.AddressDetail.Role,
                Bucket:          addressResp.Body.ImportAddress.AddressDetail.Bucket,
            },
        }})
    if err != nil {
        return err
    }
    verifyResp, err := client.VerifyAddress(&userId, &retrySrcAddress)
    if err != nil {
        return err
    } else if *verifyResp.Body.VerifyAddressResponse.Status != "available" {
        return fmt.Errorf("verify job %v failed, status: %v", jobName, *verifyResp.Body.VerifyAddressResponse.Status)
    }

    /** 步骤6:创建重试子任务并启动。 */
    retryJobName := jobName + "_retry_" + nowTime
    _, err = client.CreateJob(&userId, &mgwpackage.CreateJobRequest{ImportJob: &mgwpackage.CreateJobInfo{
        Name:          &retryJobName,
        TransferMode:  jobResp.Body.ImportJob.TransferMode,
        OverwriteMode: jobResp.Body.ImportJob.OverwriteMode,
        FilterRule:    jobResp.Body.ImportJob.FilterRule,
        CreateReport:  jobResp.Body.ImportJob.CreateReport,
        Audit:         jobResp.Body.ImportJob.Audit,
        SrcAddress:    &retrySrcAddress,
        DestAddress:   &destAddress,
        ParentVersion: resultResp.Body.ImportJobResult.Version,
    }})
    if err != nil {
        return err
    }
    launchStatus := "IMPORT_JOB_LAUNCHING"
    _, err = client.UpdateJob(&userId, &retryJobName, &mgwpackage.UpdateJobRequest{
        ImportJob: &mgwpackage.UpdateJobInfo{
            Status: &launchStatus,
        }})
    return err
}