使用OSS数据索引进行大规模数据统计

通过OSS数据索引,您可以高效统计海量文件的数量、大小等信息,相较传统的ListObjects接口统计方式,显著提升统计效率,简化操作流程,适用于大规模数据统计场景。

方案优势

A企业在华南3(广州)地域名为mybucket的存储空间(Bucket)中,存储了2亿个按业务前缀分类的文件,共计180万个目录。使用OSS数据索引后,文件统计时间可减少83%。

传统方式

OSS数据索引

耗时

每日统计花费 2 小时

每日统计花费 20 分钟

复杂度

对于文件数量大于1000的目录,需要多次调用ListObject接口。

每个目录只需调用一次DoMetaQuery接口。

方案概览

使用OSS数据索引进行大规模数据统计的过程如下:

image

要实现以上过程,您只需要:

  1. 开启数据索引:OSS会帮您自动创建索引表,包含OSS元数据、自定义元数据和对象标签。

  2. 发起检索和统计:您需要设置检索条件,然后调用DoMetaQuery接口,OSS会进行快速检索。

最后,OSS会返回符合条件文件的数量、总大小和平均文件大小等统计信息,供您分析使用。

快速体验

步骤一:开启OSS数据索引

使用OSS控制台

  1. 登录OSS管理控制台

  2. 单击Bucket 列表,然后单击目标Bucket名称。

  3. 在左侧导航栏, 选择文件管理 > 数据索引

  4. 数据索引页面,单击立即开启

  5. 选择标量检索,单击确认开启

    image

使用阿里云SDK

Java SDK、Python SDK以及Go SDK支持通过标量检索功能查询满足指定条件的Object。

import com.aliyun.oss.*;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyun.oss.common.comm.SignVersion;

public class Demo {

    // Endpoint以华南3(广州)为例,其它Region请按实际情况填写。
    private static String endpoint = "https://oss-cn-guangzhou.aliyuncs.com";
    // 填写Bucket名称,例如examplebucket。
    private static String bucketName = "examplebucket";

    public static void main(String[] args) throws com.aliyuncs.exceptions.ClientException {
        // 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
        EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
        // 填写Bucket所在地域。以华南3(广州)为例,Region填写为cn-guangzhou。
        String region = "cn-guangzhou";

        // 创建OSSClient实例。
        ClientBuilderConfiguration clientBuilderConfiguration = new ClientBuilderConfiguration();
        clientBuilderConfiguration.setSignatureVersion(SignVersion.V4);
        OSS ossClient = OSSClientBuilder.create()
                .endpoint(endpoint)
                .credentialsProvider(credentialsProvider)
                .clientConfiguration(clientBuilderConfiguration)
                .region(region)
                .build();

        try {
            // 开启数据索引功能。
            ossClient.openMetaQuery(bucketName);
        } catch (OSSException oe) {
            System.out.println("Error Message:" + oe.getErrorMessage());
            System.out.println("Error Code:" + oe.getErrorCode());
            System.out.println("Request ID:" + oe.getRequestId());
            System.out.println("Host ID:" + oe.getHostId());
        } catch (ClientException ce) {
            System.out.println("Error Message: " + ce.getMessage());
        } finally {
            // 关闭OSSClient。
            ossClient.shutdown();
        }
    }
}

# -*- coding: utf-8 -*-
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider

# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())

# 填写Bucket所在地域对应的Endpoint。以华南3(广州)为例,Endpoint填写为https://oss-cn-guangzhou.aliyuncs.com。
endpoint = "https://oss-cn-guangzhou.aliyuncs.com"
# 填写Endpoint对应的Region信息,例如cn-guangzhou。注意,v4签名下,必须填写该参数
region = "cn-guangzhou"

# examplebucket填写存储空间名称。
bucket = oss2.Bucket(auth, endpoint, "examplebucket", region=region)

# 开启数据索引功能。
bucket.open_bucket_meta_query()
package main

import (
	"context"
	"flag"   
	"log"     

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"          
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" 
)

var (
	region     string // 定义一个变量来保存从命令行获取的区域(Region)信息
	bucketName string // 定义一个变量来保存从命令行获取的存储空间名称
)

// init函数在main函数之前执行,用来初始化程序
func init() {
	// 设置命令行参数来指定region
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	// 设置命令行参数来指定bucket名称
	flag.StringVar(&bucketName, "bucket", "", "The name of the bucket.")
}

func main() {
	flag.Parse() // 解析命令行参数

	// 检查是否提供了存储空间名称,如果没有提供,则输出默认参数并退出程序
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, bucket name required") // 记录错误并终止程序
	}

	// 检查是否提供了区域信息,如果没有提供,则输出默认参数并退出程序
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required") // 记录错误并终止程序
	}

	// 创建客户端配置,并使用环境变量作为凭证提供者
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	client := oss.NewClient(cfg) // 使用配置创建一个新的OSS客户端实例

	// 构建一个OpenMetaQuery请求,用于开启特定存储空间的元数据查询功能
	request := &oss.OpenMetaQueryRequest{
		Bucket: oss.Ptr(bucketName), // 指定要操作的存储空间名称
	}
	result, err := client.OpenMetaQuery(context.TODO(), request) // 执行请求以开启存储空间的元数据查询功能
	if err != nil {
		log.Fatalf("failed to open meta query %v", err) // 如果有错误发生,记录错误信息并终止程序
	}

	log.Printf("open meta query result:%#v\n", result) // 打印开启元数据查询的结果
}

步骤二:发起检索和统计

使用OSS控制台

检索条件设置

  1. 在左侧导航栏, 选择文件管理 > 数据索引

  2. 存储类型选择标准存储,读写权限选择私有

  3. 通过模糊匹配目录前缀“a/b”来指定目录。

    image

结果输出设置

  1. 按对象的最后修改时间降序排序结果。

  2. 对筛选后文件大小进行求和平均值的计算。

  3. 存储类型进行分组计数,以统计文件数量。

image

  1. 单击立即查询

使用阿里云SDK

import com.aliyun.oss.*;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyun.oss.common.comm.SignVersion;
import com.aliyun.oss.model.*;

import java.util.ArrayList;
import java.util.List;

public class Demo {

    // Endpoint以华南3(广州)为例,其它Region请按实际情况填写。
    private static String endpoint = "https://oss-cn-guangzhou.aliyuncs.com";
    // 填写Bucket名称,例如examplebucket。
    private static String bucketName = "examplebucket";

    public static void main(String[] args) throws Exception {

        // 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
        EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
        // 填写Bucket所在地域。以华南3(广州)为例,Region填写为cn-guangzhou。
        String region = "cn-guangzhou";

        // 创建OSSClient实例。
        ClientBuilderConfiguration clientBuilderConfiguration = new ClientBuilderConfiguration();
        clientBuilderConfiguration.setSignatureVersion(SignVersion.V4);
        OSS ossClient = OSSClientBuilder.create()
                .endpoint(endpoint)
                .credentialsProvider(credentialsProvider)
                .clientConfiguration(clientBuilderConfiguration)
                .region(region)
                .build();

        try {
            // 设置查询参数,指定返回文件的最大数量(20个)
            int maxResults = 20;
            // 设置查询条件:文件名包含"a/b",且文件存储类型为"Standard",访问权限为"private"
            // 查询语句使用逻辑运算符"and"来连接多个子查询条件
            String query = "{\n" +
                    "  \"Operation\": \"and\",\n" +
                    "  \"SubQueries\": [\n" +
                    "    {\n" +
                    "      \"Field\": \"Filename\",\n" +
                    "      \"Value\": \"a/b\",\n" +
                    "      \"Operation\": \"match\"\n" +
                    "    },\n" +
                    "    {\n" +
                    "      \"Field\": \"OSSStorageClass\",\n" +
                    "      \"Value\": \"Standard\",\n" +
                    "      \"Operation\": \"eq\"\n" +
                    "    },\n" +
                    "    {\n" +
                    "      \"Field\": \"ObjectACL\",\n" +
                    "      \"Value\": \"private\",\n" +
                    "      \"Operation\": \"eq\"\n" +
                    "    }\n" +
                    "  ]\n" +
                    "}";
            String sort = "FileModifiedTime";// 设置按文件修改时间排序

            // 创建聚合操作实例,用于统计文件的大小(Size)的总和、数量和平均值
            Aggregation aggregationRequest1 = new Aggregation();
            aggregationRequest1.setField("Size");// 设置聚合字段为文件大小
            aggregationRequest1.setOperation("sum");// 计算文件大小的总和

            Aggregation aggregationRequest2 = new Aggregation();
            aggregationRequest2.setField("Size");// 设置聚合字段为文件大小
            aggregationRequest2.setOperation("count");// 计算文件数量

            Aggregation aggregationRequest3 = new Aggregation();
            aggregationRequest3.setField("Size");// 设置聚合字段为文件大小
            aggregationRequest3.setOperation("average");// 计算文件大小的平均值

            // 将所有聚合请求添加到一个列表中
            Aggregations aggregations = new Aggregations();
            List<Aggregation> aggregationList = new ArrayList<>();
            aggregationList.add(aggregationRequest1);// 添加求和聚合
            aggregationList.add(aggregationRequest2);// 添加计数聚合
            aggregationList.add(aggregationRequest3);// 添加平均值聚合
            aggregations.setAggregation(aggregationList);// 将所有聚合操作设置到Aggregations对象中

            // 创建DoMetaQueryRequest请求对象,传入Bucket名称、最大返回文件数、查询条件和排序规则
            DoMetaQueryRequest doMetaQueryRequest = new DoMetaQueryRequest(bucketName, maxResults, query, sort);

            // 将聚合操作添加到Meta查询请求中
            doMetaQueryRequest.setAggregations(aggregations);
            // 设置排序方式为降序(DESC)
            doMetaQueryRequest.setOrder(SortOrder.DESC);

            // 执行Meta查询请求,获取查询结果
            DoMetaQueryResult doMetaQueryResult = ossClient.doMetaQuery(doMetaQueryRequest);
            // 判断查询结果
            if (doMetaQueryResult.getFiles() != null) {
                // 如果文件列表不为空,遍历文件信息并打印
                for (ObjectFile file : doMetaQueryResult.getFiles().getFile()) {
                    System.out.println("Filename: " + file.getFilename()); // 文件名
                    System.out.println("ETag: " + file.getETag());// 文件ETag
                    System.out.println("ObjectACL: " + file.getObjectACL()); // 文件访问权限
                    System.out.println("OssObjectType: " + file.getOssObjectType());// 文件类型
                    System.out.println("OssStorageClass: " + file.getOssStorageClass());// 存储类型
                    System.out.println("TaggingCount: " + file.getOssTaggingCount()); // 标签数量
                    if (file.getOssTagging() != null) {
                        // 打印文件标签
                        for (Tagging tag : file.getOssTagging().getTagging()) {
                            System.out.println("Key: " + tag.getKey());
                            System.out.println("Value: " + tag.getValue());
                        }
                    }
                    if (file.getOssUserMeta() != null) {
                        // 打印用户元数据
                        for (UserMeta meta : file.getOssUserMeta().getUserMeta()) {
                            System.out.println("Key: " + meta.getKey());
                            System.out.println("Value: " + meta.getValue());
                        }
                    }
                }
            } else if (doMetaQueryResult.getAggregations() != null) {
                // 如果有聚合结果,遍历并打印聚合信息
                for (Aggregation aggre : doMetaQueryResult.getAggregations().getAggregation()) {
                    System.out.println("Field: " + aggre.getField());// 聚合字段
                    System.out.println("Operation: " + aggre.getOperation()); // 聚合操作
                    System.out.println("Value: " + aggre.getValue());// 聚合结果值
                    if (aggre.getGroups() != null && aggre.getGroups().getGroup().size() > 0) {
                        // 获取分组聚合的值。
                        System.out.println("Groups value: " + aggre.getGroups().getGroup().get(0).getValue());
                        // 获取分组聚合的总个数。
                        System.out.println("Groups count: " + aggre.getGroups().getGroup().get(0).getCount());
                    }
                }
            } else {
                System.out.println("NextToken: " + doMetaQueryResult.getNextToken());
            }

        } catch (OSSException oe) {
            // 捕获OSS异常并输出相关信息
            System.out.println("Error Message:" + oe.getErrorMessage());
            System.out.println("Error Code:" + oe.getErrorCode());
            System.out.println("Request ID:" + oe.getRequestId());
            System.out.println("Host ID:" + oe.getHostId());
        } catch (ClientException ce) {
            // 捕获客户端异常并输出错误信息
            System.out.println("Error Message: " + ce.getMessage());
        } finally {
            // 确保关闭OSSClient实例
            ossClient.shutdown();
        }
    }
}
# -*- coding: utf-8 -*-
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
from oss2.models import MetaQuery, AggregationsRequest  
import json
# 从环境变量中获取访问凭证。运行本代码示例之前,请确保已设置环境变量OSS_ACCESS_KEY_ID和OSS_ACCESS_KEY_SECRET。
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())

# 填写Bucket所在地域对应的Endpoint。以华南3(广州)为例,Endpoint填写为https://oss-cn-guangzhou.aliyuncs.com。
endpoint = "https://oss-cn-guangzhou.aliyuncs.com"
# 填写Endpoint对应的Region信息,例如cn-guangzhou。注意,v4签名下,必须填写该参数
region = "cn-guangzhou"

# 填写存储空间名称,以examplebucket为例。
bucket = oss2.Bucket(auth, endpoint, "examplebucket", region=region)

# 查询条件:文件名包含"a/b",且文件存储类型为"Standard",访问权限为"private"
query = {
    "Operation": "and",
    "SubQueries": [
        {"Field": "Filename", "Value": "a/b", "Operation": "match"},
        {"Field": "OSSStorageClass", "Value": "Standard", "Operation": "eq"},
        {"Field": "ObjectACL", "Value": "private", "Operation": "eq"}
    ]
}
# 将字典转换为JSON字符串
query_json = json.dumps(query)

# 创建聚合操作实例,用于统计文件的大小(Size)的总和、数量和平均值
aggregations = [
    AggregationsRequest(field="Size", operation="sum"),  # 计算文件大小的总和
    AggregationsRequest(field="Size", operation="count"),  # 计算文件的数量
    AggregationsRequest(field="Size", operation="average")  # 计算文件大小的平均值
]


# 创建MetaQuery请求对象,指定查询条件、最大返回文件数、排序字段和方式以及聚合操作
do_meta_query_request = MetaQuery(
    max_results=20,  # 返回最多20个文件
    query=query_json,  # 设置查询条件
    sort="FileModifiedTime",  # 按文件修改时间排序
    order="desc",  # 降序排序
    aggregations=aggregations  # 设置聚合操作
)

# 执行Meta查询请求,获取查询结果
result = bucket.do_bucket_meta_query(do_meta_query_request)

# 打印查询结果中满足条件的文件信息
if result.files:
    for file in result.files:
        print(f"Filename: {file.file_name}")  # 打印文件名
        print(f"ETag: {file.etag}")  # 打印文件的ETag
        print(f"ObjectACL: {file.object_acl}")  # 打印文件的访问控制列表(ACL)
        print(f"OssObjectType: {file.oss_object_type}")  # 打印文件的OSS对象类型
        print(f"OssStorageClass: {file.oss_storage_class}")  # 打印文件的存储类型
        print(f"TaggingCount: {file.oss_tagging_count}")  # 打印文件的标签数量
        
        # 打印文件的所有标签
        if file.oss_tagging:
            for tag in file.oss_tagging:
                print(f"Key: {tag.key}")  # 打印标签的Key
                print(f"Value: {tag.value}")  # 打印标签的Value
        
        # 打印文件的用户元数据
        if file.oss_user_meta:
            for meta in file.oss_user_meta:
                print(f"Key: {meta.key}")  # 打印用户元数据的Key
                print(f"Value: {meta.value}")  # 打印用户元数据的Value


# 打印聚合结果
if result.aggregations:
    for aggre in result.aggregations:
        print(f"Field: {aggre.field}")  # 打印聚合操作字段
        print(f"Operation: {aggre.operation}")  # 打印聚合操作类型(如sum、count、average)
        print(f"Value: {aggre.value}")  # 打印聚合结果值
package main

import (
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"log"

	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
	"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)

var (
	region     string // 定义一个变量来保存从命令行获取的区域(Region)信息
	bucketName string // 定义一个变量来保存从命令行获取的存储空间名称
)

// init函数在main函数之前执行,用来初始化程序
func init() {
	// 设置命令行参数来指定region,默认为空字符串
	flag.StringVar(&region, "region", "", "The region in which the bucket is located.")
	// 设置命令行参数来指定bucket名称,默认为空字符串
	flag.StringVar(&bucketName, "bucket", "", "The name of the bucket.")
}

func main() {
	flag.Parse() // 解析命令行参数

	// 检查是否提供了存储空间名称,如果没有提供,则输出默认参数并退出程序
	if len(bucketName) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, bucket name required")
	}

	// 检查是否提供了区域信息,如果没有提供,则输出默认参数并退出程序
	if len(region) == 0 {
		flag.PrintDefaults()
		log.Fatalf("invalid parameters, region required")
	}

	// 创建客户端配置,并使用环境变量作为凭证提供者和指定的区域
	cfg := oss.LoadDefaultConfig().
		WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
		WithRegion(region)

	client := oss.NewClient(cfg) // 使用配置创建一个新的OSS客户端实例

	// 构建查询条件,文件名包含"a/b",存储类型为"Standard",访问权限为"private"
	query := map[string]interface{}{
		"Operation": "and",
		"SubQueries": []map[string]interface{}{
			{"Field": "Filename", "Value": "a/b", "Operation": "match"},
			{"Field": "OSSStorageClass", "Value": "Standard", "Operation": "eq"},
			{"Field": "ObjectACL", "Value": "private", "Operation": "eq"},
		},
	}
	// 将查询条件转为JSON字符串
	queryJSON, err := json.Marshal(query)
	if err != nil {
		log.Fatalf("failed to marshal query %v", err)
	}

	// 创建聚合操作,用于统计文件大小的总和、数量和平均值
	aggregations := []oss.MetaQueryAggregation{
		{Field: oss.Ptr("Size"), Operation: oss.Ptr("sum")},     // 计算文件大小总和
		{Field: oss.Ptr("Size"), Operation: oss.Ptr("count")},   // 计算文件数量
		{Field: oss.Ptr("Size"), Operation: oss.Ptr("average")}, // 计算文件大小平均值
	}

	// 构建一个DoMetaQuery请求,用于执行元数据查询
	request := &oss.DoMetaQueryRequest{
		Bucket: oss.Ptr(bucketName), // 指定要查询的存储空间名称
		MetaQuery: &oss.MetaQuery{
			MaxResults: oss.Ptr(int64(20)),         // 最大返回结果数:20个结果
			Query:      oss.Ptr(string(queryJSON)), // 查询条件:大小大于1MB的对象
			Sort:       oss.Ptr("Size"),            // 排序字段:按对象大小排序
			Order:      oss.MetaQueryOrderAsc,      // 排序顺序:升序
			Aggregations: &oss.MetaQueryAggregations{
				Aggregations: aggregations}, // 聚合操作:计算文件大小的总和、数量和平均值
		},
	}
	result, err := client.DoMetaQuery(context.TODO(), request) // 发送请求以执行元数据查询
	if err != nil {
		log.Fatalf("failed to do meta query %v", err)
	}

	// 打印NextToken,用于分页查询下一页的数据
	fmt.Printf("NextToken:%s\n", *result.NextToken)

	// 遍历返回的结果,打印出每个文件的详细信息
	for _, file := range result.Files {
		fmt.Printf("File name: %s\n", *file.Filename)
		fmt.Printf("size: %d\n", file.Size)
		fmt.Printf("File Modified Time:%s\n", *file.FileModifiedTime)
		fmt.Printf("Oss Object Type:%s\n", *file.OSSObjectType)
		fmt.Printf("Oss Storage Class:%s\n", *file.OSSStorageClass)
		fmt.Printf("Object ACL:%s\n", *file.ObjectACL)
		fmt.Printf("ETag:%s\n", *file.ETag)
		fmt.Printf("Oss CRC64:%s\n", *file.OSSCRC64)
		if file.OSSTaggingCount != nil {
			fmt.Printf("Oss Tagging Count:%d\n", *file.OSSTaggingCount)
		}

		// 打印对象的标签信息
		for _, tagging := range file.OSSTagging {
			fmt.Printf("Oss Tagging Key:%s\n", *tagging.Key)
			fmt.Printf("Oss Tagging Value:%s\n", *tagging.Value)
		}

		// 打印用户自定义元数据信息
		for _, userMeta := range file.OSSUserMeta {
			fmt.Printf("Oss User Meta Key:%s\n", *userMeta.Key)
			fmt.Printf("Oss User Meta Key Value:%s\n", *userMeta.Value)
		}
	}

	// 打印聚合结果
	for _, aggregation := range result.Aggregations {
		fmt.Printf("Aggregation Field:%s\n", *aggregation.Field)
		fmt.Printf("Aggregation Operation:%s\n", *aggregation.Operation)
		fmt.Printf("Aggregation Value:%f\n", *aggregation.Value)
	}

}

步骤三:结果验证

使用OSS控制台

如图所示,符合条件的100个标准存储类型文件总大小为19.53MB,平均每个文件约200KB。

image

使用阿里云SDK

如图所示,符合条件的100个标准存储类型文件总大小为19.53MB,平均每个文件约200KB。

image

了解更多

  • 如果您的程序自定义要求较高,您可以直接发起REST API请求。直接发起REST API请求需要手动编写代码计算签名。更多信息,请参见签名版本4DoMetaQuery API