Go分片上传

OSS提供的分片上传(Multipart Upload)功能,将要上传的较大文件(Object)分成多个分片(Part)来分别上传,上传完成后再调用CompleteMultipartUpload接口将这些Part组合成一个Object来达到断点续传的效果。

注意事项

  • 本文以华东1(杭州)外网Endpoint为例。如果您希望通过与OSS同地域的其他阿里云产品访问OSS,请使用内网Endpoint。关于OSS支持的RegionEndpoint的对应关系,请参见OSS访问域名、数据中心、开放端口

  • 本文以从环境变量读取访问凭证为例。如何配置访问凭证,请参见配置访问凭证

  • 本文以OSS域名新建OSSClient为例。如果您希望通过自定义域名、STS等方式新建OSSClient,请参见初始化

  • 要分片上传,您必须有oss:PutObject权限。具体操作,请参见RAM用户授权自定义的权限策略

  • Go SDK 2.2.5及以上版本支持以下示例代码中包含的所有属性。

分片上传流程

分片上传(Multipart Upload)分为以下三个步骤:

  1. 初始化一个分片上传事件。

    调用Bucket.InitiateMultipartUpload方法返回OSS创建的全局唯一的uploadID。

  2. 上传分片。

    调用Bucket.UploadPart方法上传分片数据。

    说明
    • 对于同一个uploadID,分片号(partNumber)标识了该分片在整个文件内的相对位置。如果使用同一个分片号上传了新的数据,那么OSS上该分片已有的数据将会被覆盖。

    • OSS将收到的分片数据的MD5值放在ETag头内返回给用户。

    • OSS计算上传数据的MD5值,并与SDK计算的MD5值比较,如果不一致则返回InvalidDigest错误码。

  3. 完成分片上传。

    所有分片上传完成后,调用Bucket.CompleteMultipartUpload方法将所有分片合并成完整的文件。

示例代码

您可以使用以下代码进行一次完整的分片上传流程。

package main

import (
	"fmt"
	"log"
	"os"

	"github.com/aliyun/aliyun-oss-go-sdk/oss"
)

func main() {
	// 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
	provider, err := oss.NewEnvironmentVariableCredentialsProvider()
	if err != nil {
		log.Fatalf("Error: %v", err)
	}

	// 创建OSSClient实例。
	// yourEndpoint填写Bucket对应的Endpoint,以华东1(杭州)为例,填写为https://oss-cn-hangzhou.aliyuncs.com。其它Region请按实际情况填写。
	// yourRegion填写Bucket所在地域,以华东1(杭州)为例,填写为cn-hangzhou。其它Region请按实际情况填写。
	clientOptions := []oss.ClientOption{oss.SetCredentialsProvider(&provider)}
	clientOptions = append(clientOptions, oss.Region("yourRegion"))
	// 设置签名版本
	clientOptions = append(clientOptions, oss.AuthVersion(oss.AuthV4))
	client, err := oss.New("yourEndpoint", "", "", clientOptions...)
	if err != nil {
		log.Fatalf("Error: %v", err)
	}

	// 设置存储空间名称
	bucketName := "examplebucket"
	// 设置Object完整路径。Object完整路径中不能包含Bucket名称。
	objectName := "exampleobject.txt"
	// 设置本地文件的完整路径。如果未指定本地路径,则默认从示例程序所属项目对应本地路径中上传文件。
	localFilename := "/localpath/exampleobject.txt"

	bucket, err := client.Bucket(bucketName)
	if err != nil {
		log.Fatalf("Error: %v", err)
	}

	// 设置分片大小(单位:字节),指定5MB为分片大小。
	partSize := int64(5 * 1024 * 1024)

	// 调用分片上传函数。
	if err := uploadMultipart(bucket, objectName, localFilename, partSize); err != nil {
		log.Fatalf("Failed to upload multipart: %v", err)
	}

}

// 分片上传函数
func uploadMultipart(bucket *oss.Bucket, objectName, localFilename string, partSize int64) error {
	// 将本地文件分片
	chunks, err := oss.SplitFileByPartSize(localFilename, partSize)
	if err != nil {
		return fmt.Errorf("failed to split file into chunks: %w", err)
	}

	// 打开本地文件。
	file, err := os.Open(localFilename)
	if err != nil {
		return fmt.Errorf("failed to open file: %w", err)
	}
	defer file.Close()

	// 步骤1:初始化一个分片上传事件。
	imur, err := bucket.InitiateMultipartUpload(objectName)
	if err != nil {
		return fmt.Errorf("failed to initiate multipart upload: %w", err)
	}

	// 步骤2:上传分片。
	var parts []oss.UploadPart
	for _, chunk := range chunks {
		part, err := bucket.UploadPart(imur, file, chunk.Size, chunk.Number)
		if err != nil {
			// 如果上传某个部分失败,尝试取消整个上传任务。
			if abortErr := bucket.AbortMultipartUpload(imur); abortErr != nil {
				log.Printf("Failed to abort multipart upload: %v", abortErr)
			}
			return fmt.Errorf("failed to upload part: %w", err)
		}
		parts = append(parts, part)
	}

	// 指定Object的读写权限为私有,默认为继承Bucket的读写权限。
	objectAcl := oss.ObjectACL(oss.ACLPrivate)

	// 步骤3:完成分片上传。
	_, err = bucket.CompleteMultipartUpload(imur, parts, objectAcl)
	if err != nil {
		// 如果完成上传失败,尝试取消上传。
		if abortErr := bucket.AbortMultipartUpload(imur); abortErr != nil {
			log.Printf("Failed to abort multipart upload: %v", abortErr)
		}
		return fmt.Errorf("failed to complete multipart upload: %w", err)
	}

	log.Printf("Multipart upload completed successfully.")
	return nil
}

常见问题

如何取消分片上传事件?

当您遇到以下场景时,可以使用Bucket.AbortMultipartUpload方法取消分片上传事件。

  1. 文件出错

    • 如果在上传过程中发现文件有错误,如文件损坏或包含恶意代码,您可以选择取消上传以避免潜在的风险。

  2. 网络不稳定

    • 当网络连接不稳定或中断时,可能会导致上传过程中的分片丢失或损坏,您可以选择取消上传并重新开始,以确保数据的完整性和一致性。

  3. 资源限制

    • 当您的存储空间有限,而上传的文件过大,您可以取消上传以释放存储资源,将资源分配给其他更重要的任务。

  4. 误操作:

    • 当不小心启动了一个不必要的上传任务,或者上传了一个错误的文件版本,您可以取消此次上传事件

...
if err = bucket.AbortMultipartUpload(imur); err != nil {
log.Fatalf("failed to abort multipart upload: %w", err)
}

log.Printf("Multipart upload aborted successfully.")

如何列举已上传的分片?

当您遇到以下场景时,可以使用Bucket.ListUploadedParts方法列举某个分片上传事件中已经成功上传的分片。

监控上传进度:

  1. 大型文件上传

    • 当上传非常大的文件时,您通过列举已上传的分片,确保上传过程按照预期进行,并及时发现问题。

  2. 断点续传

    • 在网络不稳定或上传过程中发生中断时,您可以通过查看已上传的分片来决定是否需要重试上传未完成的部分,从而实现断点续传。

  3. 故障排除

    • 如果上传过程中出现错误,通过检查已上传的分片,您可以快速定位问题所在,比如某个特定分片上传失败,然后针对性地解决问题。

  4. 资源管理

    • 对于需要严格控制资源使用情况的场景,通过监控上传进度,可以更好地管理存储空间和带宽资源,确保资源的有效利用。

...	
if lsRes, err := bucket.ListUploadedParts(imur); err != nil {
log.Fatalf("Failed to list uploaded parts: %v", err)
}

for _, upload := range lsRes.UploadedParts {
log.Printf("List PartNumber: %d, ETag: %s, LastModified: %v\n", upload.PartNumber, upload.ETag, upload.LastModified)
}

列举分片上传事件

当您遇到以下场景时,可以使用Bucket.ListMultipartUploads方法列举某个存储空间所有进行中的分片上传事件。

监控场景:

  1. 批量文件上传管理

    • 当需要上传大量文件时,为了确保所有文件都能正确完成分片上传,您可以使用ListMultipartUploads方法来实时监控所有的分片上传活动。

  2. 故障检测与恢复

    • 在上传过程中如果遇到网络问题或其他故障,可能导致部分分片未能成功上传。通过监控正在进行中的分片上传事件,可以及时发现这些问题,并采取措施恢复上传。

  3. 资源优化与管理

    • 在大规模的文件上传过程中,监控正在进行中的分片上传事件可以帮助优化资源分配,例如根据上传进度调整带宽使用或优化上传策略。

  4. 数据迁移:

    • 在进行大规模的数据迁移项目时,监控所有正在进行的分片上传事件可以确保迁移任务的顺利进行,及时发现并解决任何潜在的问题。

参数设置

参数

说明

Delimiter

用于对Object名字进行分组的字符。所有名字包含指定的前缀且第一次出现Delimiter字符之间的Object作为一组元素。

MaxUploads

限定此次返回分片上传事件的最大数目,默认值和最大值均为1000。

KeyMarker

所有文件名称的字母序大于KeyMarker参数值的分片上传事件,可以与UploadIDMarker参数一同使用来指定返回结果的起始位置。

Prefix

限定返回的文件名称必须以指定的Prefix作为前缀。注意使用Prefix查询时,返回的文件名称中仍会包含Prefix。

UploadIDMarker

KeyMarker参数一同使用来指定返回结果的起始位置。

  • 如果KeyMarker参数未设置,则OSS忽略该参数。

  • 如果KeyMarker参数被设置,查询结果中包含:

    • 所有Object名字的字典序大于KeyMarker参数值的分片上传事件。

    • Object名字等于KeyMarker参数值,但是UploadIDUploadIDMarker参数值大的分片上传事件。

  • 使用默认参数

    ...
    lsRes, err := bucket.ListMultipartUploads(oss.KeyMarker(keyMarker), oss.UploadIDMarker(uploadIdMarker))
    if err != nil {
    log.Fatalf("failed to list multipart uploads: %w", err)
    }
    
    for _, upload := range lsRes.Uploads {
    log.Printf("Upload: %s, UploadID: %s\n", upload.Key, upload.UploadID)
    }
  • 指定前缀为file

    ...
    lsRes, err := bucket.ListMultipartUploads(oss.Prefix('file'))
    if err != nil {
    log.Fatalf("failed to list multipart uploads with prefix: %w", err)
    }
    
    log.Printf("Uploads:", lsRes.Uploads)
  • 指定最多返回100条结果数据

    ...
    lsRes, err := bucket.ListMultipartUploads(oss.MaxUploads(100))
    if err != nil {
    log.Fatalf("failed to list multipart uploads with limit: %w", err)
    }
    
    log.Printf("Uploads:", lsRes.Uploads)
  • 指定前缀为file且最多返回100条结果数据

    ...
    lsRes, err := bucket.ListMultipartUploads(oss.Prefix("file"), oss.MaxUploads(100))
    if err != nil {
    log.Fatalf("failed to list multipart uploads with prefix and limit: %w", err)
    }
    
    log.Printf("Uploads:", lsRes.Uploads)

相关文档

  • 关于分片上传的完整示例代码,请参见GitHub示例

  • 分片上传的完整实现涉及三个API接口,详情如下:

  • 关于取消分片上传事件的API接口说明,请参见AbortMultipartUpload

  • 关于列举已上传分片的API接口说明,请参见ListUploadedParts

  • 关于列举所有执行中的分片上传事件(即已初始化但尚未完成或已取消的分片上传事件)的API接口说明,请参见ListMultipartUploads