Go分片拷贝

本文介绍如何使用Go SDK V2UploadPartCopy方法,将源Bucket中的多个分片文件拷贝到同一地域下相同或不同目标Bucket中,然后合并成一个完整的文件对象。

注意事项

  • 本文示例代码以华东1(杭州)的地域IDcn-hangzhou为例,默认使用外网Endpoint,如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的RegionEndpoint的对应关系,请参见OSS地域和访问域名

  • 本文以从环境变量读取访问凭证为例。如何配置访问凭证,请参见配置访问凭证

  • 要进行拷贝文件,您必须拥有源文件的读权限及目标Bucket的读写权限。

  • 不支持跨地域拷贝。例如不能将华东1(杭州)地域存储空间中的文件拷贝到华北1(青岛)地域。

  • 拷贝文件时,您需要确保源Bucket和目标Bucket均未设置合规保留策略,否则报错The object you specified is immutable.

方法定义

func (c *Client) UploadPartCopy(ctx context.Context, request *UploadPartCopyRequest, optFns ...func(*Options)) (*UploadPartCopyResult, error)

请求参数列表

参数名

类型

说明

ctx

context.Context

请求的上下文,可以用来设置请求的总时限

request

*UploadPartCopyRequest

设置具体接口的请求参数,具体请参见UploadPartCopyRequest

optFns

...func(*Options)

(可选)配置选项,具体请参见Options

返回值列表

返回值名

类型

说明

result

*UploadPartCopyResult

接口返回值,当 err 为nil 时有效,具体请参见UploadPartCopyResult

err

error

请求的状态,当请求失败时,err 不为 nil

分片拷贝流程

分片拷贝分为以下三个步骤:

  1. 初始化一个分片上传事件。

    调用Client.InitiateMultipartUpload方法返回OSS创建的全局唯一的uploadID。

  2. 上传分片。

    调用Client.UploadPartCopy方法上传分片数据。

    说明
    • 对于同一个uploadID,分片号(partNumber)标识了该分片在整个文件内的相对位置。如果使用同一个分片号上传了新的数据,那么OSS上该分片已有的数据将会被覆盖。

    • OSS将收到的分片数据的MD5值放在ETag头内返回给用户。

    • OSS计算上传数据的MD5值,并与SDK计算的MD5值比较,如果不一致则返回InvalidDigest错误码。

  3. 完成分片上传。

    所有分片上传完成后,调用Client.CompleteMultipartUpload方法将所有分片合并成完整的文件。

示例代码

您可以使用以下代码将多个分片文件从源存储空间拷贝到目标存储空间,然后合并成完整的文件对象。

package main

import (
	"context"
	"flag"
	"log"
	"sync"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

// 定义全局变量
var (
	region         string // 存储区域
	srcBucketName  string // 源存储空间名称
	srcObjectName  string // 源对象名称
	destBucketName string // 目标存储空间名称
	destObjectName string // 目标对象名称
)

// init函数用于初始化命令行参数
func init() {
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	flag.StringVar(&srcBucketName, "src-bucket", "", "The name of the source bucket.")
	flag.StringVar(&srcObjectName, "src-object", "", "The name of the source object.")
	flag.StringVar(&destBucketName, "dest-bucket", "", "The name of the destination bucket.")
	flag.StringVar(&destObjectName, "dest-object", "", "The name of the destination object.")
}

func main() {
	// 解析命令行参数
	flag.Parse()

	// 定义上传ID
	var uploadId string

	// 检查源存储空间名称是否为空
	if len(srcBucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source bucket name required")
	}

	// 检查存储区域是否为空
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// 如果目标存储空间名称未指定,则使用源存储空间名称
	if len(destBucketName) == 0 {
		destBucketName = srcBucketName
	}

	// 检查源对象名称是否为空
	if len(srcObjectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, source object name required")
	}

	// 检查目标对象名称是否为空
	if len(destObjectName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, destination object name required")
	}

	// 加载默认配置并设置凭证提供者和区域
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	// 创建OSS客户端
	client := oss.NewClient(cfg)

	// 初始化分片上传请求
	initRequest := &oss.InitiateMultipartUploadRequest{
		Bucket: oss.Ptr(destBucketName),
		Key:    oss.Ptr(destObjectName),
	}
	initResult, err := client.InitiateMultipartUpload(context.TODO(), initRequest)
	if err != nil {
		log.Fatalf("failed to initiate multipart upload %v", err)
	}

	// 打印初始化分片上传的结果
	log.Printf("initiate multipart upload result:%#v\n", *initResult.UploadId)
	uploadId = *initResult.UploadId

	// 初始化等待组和互斥锁
	var wg sync.WaitGroup
	var parts []oss.UploadPart
	count := 3
	var mu sync.Mutex

	// 启动多个goroutine进行分片上传
	for i := 0; i < count; i++ {
		wg.Add(1)
		go func(partNumber int, i int) {
			defer wg.Done()

			// 创建分片上传请求
			partRequest := &oss.UploadPartCopyRequest{
				Bucket:       oss.Ptr(destBucketName), // 目标存储空间名称
				Key:          oss.Ptr(destObjectName), // 目标对象名称
				SourceBucket: oss.Ptr(srcBucketName),  // 源存储空间名称
				SourceKey:    oss.Ptr(srcObjectName),  // 源对象名称
				PartNumber:   int32(partNumber),       // 分片编号
				UploadId:     oss.Ptr(uploadId),       // 上传ID
			}

			// 发送分片上传请求
			partResult, err := client.UploadPartCopy(context.TODO(), partRequest)
			if err != nil {
				log.Fatalf("failed to upload part copy %d: %v", partNumber, err)
			}

			// 记录分片上传结果
			part := oss.UploadPart{
				PartNumber: partRequest.PartNumber,
				ETag:       partResult.ETag,
			}

			// 使用互斥锁保护共享数据
			mu.Lock()
			parts = append(parts, part)
			mu.Unlock()
		}(i+1, i)
	}

	// 等待所有goroutine完成
	wg.Wait()

	// 完成分片上传请求
	request := &oss.CompleteMultipartUploadRequest{
		Bucket:   oss.Ptr(destBucketName),
		Key:      oss.Ptr(destObjectName),
		UploadId: oss.Ptr(uploadId),
		CompleteMultipartUpload: &oss.CompleteMultipartUpload{
			Parts: parts,
		},
	}
	result, err := client.CompleteMultipartUpload(context.TODO(), request)
	if err != nil {
		log.Fatalf("failed to complete multipart upload %v", err)
	}

	// 打印完成分片上传的结果
	log.Printf("complete multipart upload result:%#v\n", result)
}

相关文档