OSS Tables提供自动维护功能,自动执行压缩、快照管理和未引用文件清理三种优化任务。这些功能持续优化Table的查询性能并降低存储成本,无需配置外部计算资源。
自动维护功能包含三种优化任务,分别作用于不同的资源级别:
未引用文件清理:Bucket级别配置。自动识别并删除不再被任何表快照引用的对象。
压缩:表级别配置。将小文件合并为大文件,提升查询性能。
快照管理:表级别配置。自动清理过期快照,减少元数据文件数量和存储开销。
创建Table Bucket和Table后,系统自动应用以下默认维护配置。
维护功能 | 配置级别 | 参数 | 默认值 |
未引用文件清理 | Bucket级别 | status | enabled |
unreferencedDays | 3 | ||
nonCurrentDays | 10 | ||
压缩 | 表级别 | status | enabled |
targetFileSizeMB | 512 | ||
strategy | auto | ||
快照管理 | 表级别 | status | enabled |
maxSnapshotAgeHours | 120 | ||
minSnapshotsToKeep | 1 |
服务关联角色
OSS Tables的自动维护功能(压缩、未引用文件清理、快照管理)由后台服务执行,该服务需要通过服务关联角色AliyunServiceRoleForOssTableMaintenance获取Table Bucket的访问权限。更多关于服务关联角色的信息,请参见服务关联角色。
授权方式
首次访问OSS控制台的Table Bucket 列表页面时,如果当前账号尚未创建该服务关联角色,页面顶部会显示提示:缺少权限:AliyunServiceRoleForOssTableMaintenance,单击提示中的立即创建即可完成授权。授权完成后,OSS Tables后台维护服务将自动开始工作。
如果使用RAM用户操作,需确保该用户具备创建服务关联角色的权限。可由主账号为RAM用户添加以下权限策略:
{
"Version": "1",
"Statement": [
{
"Action": "ram:CreateServiceLinkedRole",
"Resource": "*",
"Effect": "Allow",
"Condition": {
"StringEquals": {
"ram:ServiceName": "tablemaintenance.oss.aliyuncs.com"
}
}
}
]
}权限说明
该服务关联角色的权限策略(AliyunServiceRolePolicyForOssTableMaintenance)包含以下权限,用于读取表配置、执行数据维护和更新元数据:
权限(Action) | 用途 |
oss:GetTableBucketMaintenanceConfiguration | 读取Bucket级别维护配置 |
oss:GetTableMaintenanceConfiguration | 读取Table级别维护配置 |
oss:ListTableBuckets | 列举Table Bucket |
oss:GetTableBucket | 获取Table Bucket信息 |
oss:ListTables | 列举Table |
oss:GetTable | 获取Table信息 |
oss:GetTableData | 读取表数据文件(执行压缩和清理) |
oss:PutTableData | 写入表数据文件(压缩后生成新文件) |
oss:GetTableMetadataLocation | 获取元数据文件位置 |
oss:UpdateTableMetadataLocation | 更新元数据文件位置(维护完成后提交变更) |
未引用文件清理
未引用文件清理是Bucket级别的优化操作,作用于Table Bucket下的所有Table。未引用文件清理功能会识别并删除所有未被任何表快照引用的对象。在数据写入、更新或删除过程中,可能会因为写入作业失败、快照过期等原因产生不再被任何元数据引用的数据文件(如Parquet、Avro、ORC文件)。这些孤立文件持续占用存储空间。
开启未引用文件清理后,如果向Table Bucket中PUT了任何不属于表的Parquet、Avro、ORC文件,这些文件也将被视为未引用文件并被自动清理。
未引用文件清理采用两阶段处理机制:首先将未被引用的文件标记为non-current状态(由unreferencedDays控制),然后在文件处于non-current状态指定天数后执行删除(由nonCurrentDays控制)。
未引用文件清理的配置参数如下表所示。
参数 | 说明 | 取值范围 | 默认值 |
status | 是否启用未引用文件清理。 | enabled或disabled。 | enabled |
unreferencedDays | 文件不被任何元数据引用后,标记为non-current状态的等待天数。 | 1~2147483647。 | 3 |
nonCurrentDays | 文件被标记为non-current状态后,执行删除操作的等待天数。 | 1~2147483647。 | 10 |
控制台
通过OSS管理控制台,直接查看和修改Table Bucket的自动维护配置。
操作步骤如下:
登录OSS管理控制台,在左侧导航栏选择Table Bucket 列表。
单击目标Table Bucket名称,进入Bucket详情页。
选择数据维护 Tab页。
在未引用文件清理区域,查看当前配置状态:
状态:显示该功能是否已启用。
未引用天数:文件不被任何元数据引用后,标记为non-current状态的等待天数。默认值为3天。
非当前天数:文件被标记为non-current状态后,执行删除操作的等待天数。默认值为10天。
单击编辑按钮,修改未引用文件清理的配置参数,完成后单击保存。
ossutil
通过ossutil修改未引用文件清理配置:
ossutil tables-api put-table-bucket-maintenance-configuration \
--table-bucket-arn {ARN} \
--type icebergUnreferencedFileRemoval \
--value '{"status":"enabled","settings":{"icebergUnreferencedFileRemoval":{"unreferencedDays":5,"nonCurrentDays":15}}}'SDK
以下代码展示了如何通过SDK查询和修改Table Bucket的未引用文件清理配置。
Python
查询未引用文件清理配置:
import argparse
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.tables as oss_tables
parser = argparse.ArgumentParser(description="get table bucket maintenance configuration sample")
parser.add_argument('--region', help='The region in which the table bucket is located.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS Tables.')
parser.add_argument('--table-bucket-arn', help='The ARN of the table bucket.', required=True)
def main():
args = parser.parse_args()
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint
client = oss_tables.Client(cfg)
result = client.get_table_bucket_maintenance_configuration(
oss_tables.models.GetTableBucketMaintenanceConfigurationRequest(
table_bucket_arn=args.table_bucket_arn,
)
)
print(f'status code: {result.status_code},'
f' request id: {result.request_id},'
f' table bucket arn: {result.table_bucket_arn},'
f' configuration: {result.configuration}')
if __name__ == "__main__":
main()修改未引用文件清理配置:
import argparse
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.tables as oss_tables
parser = argparse.ArgumentParser(description="put table bucket maintenance configuration sample")
parser.add_argument('--region', help='The region in which the table bucket is located.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS Tables.')
parser.add_argument('--table-bucket-arn', help='The ARN of the table bucket.', required=True)
parser.add_argument('--type', help='The maintenance type, e.g., icebergUnreferencedFileRemoval.', required=True)
def main():
args = parser.parse_args()
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint
client = oss_tables.Client(cfg)
value = {
'status': 'enabled',
'settings': {
'icebergUnreferencedFileRemoval': {
'unreferencedDays': 7,
'nonCurrentDays': 30
}
}
}
result = client.put_table_bucket_maintenance_configuration(
oss_tables.models.PutTableBucketMaintenanceConfigurationRequest(
table_bucket_arn=args.table_bucket_arn,
type=args.type,
value=value,
)
)
print(f'status code: {result.status_code},'
f' request id: {result.request_id}')
print(f'successfully updated maintenance configuration for: {args.table_bucket_arn}')
if __name__ == "__main__":
main()Go
查询未引用文件清理配置:
package main
import (
"context"
"flag"
"log"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/tables"
)
var (
region string
tableBucketArn string
)
func init() {
flag.StringVar(®ion, "region", "", "The region in which the bucket is located.")
flag.StringVar(&tableBucketArn, "table-bucket-arn", "", "The arn of the table bucket.")
}
func main() {
flag.Parse()
if len(tableBucketArn) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table bucket arn required")
}
if len(region) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, region required")
}
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region)
client := tables.NewTablesClient(cfg)
result, err := client.GetTableBucketMaintenanceConfiguration(context.TODO(), &tables.GetTableBucketMaintenanceConfigurationRequest{
TableBucketARN: oss.Ptr(tableBucketArn),
})
if err != nil {
log.Fatalf("failed to get table bucket maintenance configuration %v", err)
}
log.Printf("get table bucket maintenance configuration result:%#v\n", result)
}修改未引用文件清理配置:
package main
import (
"context"
"flag"
"log"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/tables"
)
var (
region string
tableBucketArn string
)
func init() {
flag.StringVar(®ion, "region", "", "The region in which the bucket is located.")
flag.StringVar(&tableBucketArn, "table-bucket-arn", "", "The arn of the table bucket.")
}
func main() {
flag.Parse()
if len(tableBucketArn) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table bucket arn required")
}
if len(region) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, region required")
}
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region)
client := tables.NewTablesClient(cfg)
result, err := client.PutTableBucketMaintenanceConfiguration(context.TODO(), &tables.PutTableBucketMaintenanceConfigurationRequest{
TableBucketARN: oss.Ptr(tableBucketArn),
Type: oss.Ptr("icebergUnreferencedFileRemoval"),
Value: &tables.MaintenanceValue{
Settings: &tables.MaintenanceSettings{
IcebergUnreferencedFileRemoval: &tables.SettingsDetail{
UnreferencedDays: oss.Ptr(4),
NonCurrentDays: oss.Ptr(10),
},
},
Status: oss.Ptr("enabled"),
},
})
if err != nil {
log.Fatalf("failed to put table bucket maintenance configuration %v", err)
}
log.Printf("put table bucket maintenance configuration result:%#v\n", result)
}Java
查询未引用文件清理配置:
import com.aliyun.sdk.service.oss2.credentials.EnvironmentVariableCredentialsProvider;
import com.aliyun.sdk.service.oss2.tables.OSSTablesClient;
import com.aliyun.sdk.service.oss2.tables.models.*;
public class GetTableBucketMaintenanceConfigurationSample {
public static void main(String[] args) throws Exception {
String region = "cn-hangzhou";
String tableBucketARN = "acs:osstables:cn-hangzhou:1234567890:bucket/mytable-bucket";
try (OSSTablesClient client = OSSTablesClient.newBuilder()
.credentialsProvider(new EnvironmentVariableCredentialsProvider())
.region(region)
.build()) {
GetTableBucketMaintenanceConfigurationRequest request = GetTableBucketMaintenanceConfigurationRequest.newBuilder()
.tableBucketARN(tableBucketARN)
.build();
GetTableBucketMaintenanceConfigurationResult result = client.getTableBucketMaintenanceConfiguration(request);
System.out.printf("Status code:%d, request id:%s%n",
result.statusCode(), result.requestId());
System.out.printf("Successfully got maintenance configuration for table bucket: %s%n", tableBucketARN);
System.out.printf("Table Bucket ARN: %s%n", result.tableBucketARN());
result.configuration().forEach((key, value) -> {
System.out.printf("Configuration Type: %s, Status: %s%n", key, value.status());
});
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}修改未引用文件清理配置:
import com.aliyun.sdk.service.oss2.credentials.EnvironmentVariableCredentialsProvider;
import com.aliyun.sdk.service.oss2.tables.OSSTablesClient;
import com.aliyun.sdk.service.oss2.tables.models.*;
public class PutTableBucketMaintenanceConfigurationSample {
public static void main(String[] args) throws Exception {
String region = "cn-hangzhou";
String tableBucketARN = "acs:osstables:cn-hangzhou:1234567890:bucket/mytable-bucket";
String type = "icebergUnreferencedFileRemoval";
String status = "enabled";
int unreferencedDays = 3;
int nonCurrentDays = 3;
IcebergUnreferencedFileRemovalSettings removalSettings = IcebergUnreferencedFileRemovalSettings.newBuilder()
.unreferencedDays(unreferencedDays)
.nonCurrentDays(nonCurrentDays)
.build();
TableBucketMaintenanceSettings settings = TableBucketMaintenanceSettings.newBuilder()
.icebergUnreferencedFileRemoval(removalSettings)
.build();
TableBucketMaintenanceConfigurationValue value = TableBucketMaintenanceConfigurationValue.newBuilder()
.status(status)
.settings(settings)
.build();
try (OSSTablesClient client = OSSTablesClient.newBuilder()
.credentialsProvider(new EnvironmentVariableCredentialsProvider())
.region(region)
.build()) {
PutTableBucketMaintenanceConfigurationRequest request = PutTableBucketMaintenanceConfigurationRequest.newBuilder()
.tableBucketARN(tableBucketARN)
.type(type)
.value(value)
.build();
PutTableBucketMaintenanceConfigurationResult result = client.putTableBucketMaintenanceConfiguration(request);
System.out.printf("Status code:%d, request id:%s%n",
result.statusCode(), result.requestId());
System.out.printf("Successfully configured maintenance for table bucket: %s%n", tableBucketARN);
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}API
通过以下API操作:
压缩
压缩是表级别的优化操作。在Apache Iceberg表中,频繁的流式写入或小批量写入会产生大量小文件,导致查询时需要打开和扫描过多的文件,降低查询效率。压缩将多个小文件合并为目标大小的文件,减少文件数量,提升查询性能,同时降低元数据开销。
压缩的配置参数如下表所示。
参数 | 说明 | 取值范围 | 默认值 |
status | 是否启用自动压缩。 | enabled或disabled。 | enabled |
targetFileSizeMB | 合并后的目标文件大小(单位:MB)。合并任务会将小于该大小的文件合并为接近该大小的文件。 | 1~2147483647。 | 512 |
strategy | 压缩策略。不同策略决定文件的合并方式和排序行为。 | auto、binpack、sort或z-order。当设置为sort或z-order时,表的metadata中必须已定义sort order。 | auto |
压缩策略的含义:
auto:系统自动选择最优的压缩策略。对具有排序顺序的表应用排序压缩策略。对没有排序顺序的表应用二进制包压缩策略。
binpack:仅按文件大小进行合并,不改变数据排序。合并速度最快,适用于对排序无要求的场景。
sort:按照表中定义的sort order对数据进行排序后合并。适用于范围查询场景。需要表的metadata中已定义sort order。
z-order:使用Z-Order曲线对多个列同时排序后合并。适用于多维度查询场景。需要表的metadata中已定义sort order。
查看和配置压缩。
控制台
在Table的数据维护页面查看和修改压缩配置。
登录OSS管理控制台,在左侧导航栏选择Table Bucket 列表。
单击目标Table Bucket名称,在Table 列表中单击目标Table名称进入Table详情页。
选择数据维护 Tab页。
在压缩区域,查看当前配置状态:
状态:显示功能是否已启用。
目标文件大小:合并后的目标文件大小,默认为512 MB。
压缩策略:当前使用的压缩策略,默认为 auto。
作业状态:显示最近一次压缩作业的执行状态和时间。
单击编辑按钮,修改压缩的配置参数,完成后单击保存。
ossutil
通过ossutil修改压缩配置:
ossutil tables-api put-table-maintenance-configuration \
--table-bucket-arn {ARN} \
--namespace mynamespace \
--name mytable \
--type icebergCompaction \
--value '{"status":"enabled","settings":{"icebergCompaction":{"targetFileSizeMB":256,"strategy":"binpack"}}}'SDK
以下代码展示了如何通过SDK查询和修改Table的压缩配置。
Python
查询压缩配置:
import argparse
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.tables as oss_tables
parser = argparse.ArgumentParser(description="get table maintenance configuration sample")
parser.add_argument('--region', help='The region in which the table bucket is located.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS Tables.')
parser.add_argument('--table-bucket-arn', help='The ARN of the table bucket.', required=True)
parser.add_argument('--namespace', help='The namespace of the table.', required=True)
parser.add_argument('--name', help='The name of the table.', required=True)
def main():
args = parser.parse_args()
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint
client = oss_tables.Client(cfg)
result = client.get_table_maintenance_configuration(
oss_tables.models.GetTableMaintenanceConfigurationRequest(
table_bucket_arn=args.table_bucket_arn,
namespace=args.namespace,
name=args.name,
)
)
print(f'status code: {result.status_code},'
f' request id: {result.request_id},'
f' table arn: {result.table_arn},'
f' configuration: {result.configuration}')
if __name__ == "__main__":
main()修改压缩配置:
import argparse
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.tables as oss_tables
parser = argparse.ArgumentParser(description="put table maintenance configuration sample")
parser.add_argument('--region', help='The region in which the table bucket is located.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS Tables.')
parser.add_argument('--table-bucket-arn', help='The ARN of the table bucket.', required=True)
parser.add_argument('--namespace', help='The namespace of the table.', required=True)
parser.add_argument('--name', help='The name of the table.', required=True)
parser.add_argument('--type', help='The maintenance type, e.g., icebergCompaction.', required=True)
def main():
args = parser.parse_args()
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint
client = oss_tables.Client(cfg)
value = {
'status': 'enabled',
'settings': {
'icebergCompaction': {
'targetFileSizeMB': 512,
'strategy': 'auto'
}
}
}
result = client.put_table_maintenance_configuration(
oss_tables.models.PutTableMaintenanceConfigurationRequest(
table_bucket_arn=args.table_bucket_arn,
namespace=args.namespace,
name=args.name,
type=args.type,
value=value,
)
)
print(f'status code: {result.status_code},'
f' request id: {result.request_id}')
print(f'successfully updated maintenance configuration for: {args.namespace}/{args.name}')
if __name__ == "__main__":
main()Go
查询压缩配置:
package main
import (
"context"
"flag"
"log"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/tables"
)
var (
region string
tableBucketArn string
namespace string
name string
)
func init() {
flag.StringVar(®ion, "region", "", "The region in which the bucket is located.")
flag.StringVar(&tableBucketArn, "table-bucket-arn", "", "The arn of the table bucket.")
flag.StringVar(&namespace, "namespace", "", "The name of the namespace.")
flag.StringVar(&name, "name", "", "The name of the table.")
}
func main() {
flag.Parse()
if len(region) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, region required")
}
if len(tableBucketArn) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table bucket arn required")
}
if len(namespace) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, namespace name required")
}
if len(name) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table name required")
}
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region)
client := tables.NewTablesClient(cfg)
result, err := client.GetTableMaintenanceConfiguration(context.TODO(), &tables.GetTableMaintenanceConfigurationRequest{
TableBucketARN: oss.Ptr(tableBucketArn),
Namespace: oss.Ptr(namespace),
Name: oss.Ptr(name),
})
if err != nil {
log.Fatalf("failed to get table maintenance configuration %v", err)
}
log.Printf("get table maintenance configuration result:%#v\n", result)
}修改压缩配置:
package main
import (
"context"
"flag"
"log"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/tables"
)
var (
region string
tableBucketArn string
namespace string
name string
)
func init() {
flag.StringVar(®ion, "region", "", "The region in which the bucket is located.")
flag.StringVar(&tableBucketArn, "table-bucket-arn", "", "The arn of the table bucket.")
flag.StringVar(&namespace, "namespace", "", "The name of the namespace.")
flag.StringVar(&name, "name", "", "The name of the table.")
}
func main() {
flag.Parse()
if len(region) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, region required")
}
if len(tableBucketArn) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table bucket arn required")
}
if len(namespace) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, namespace name required")
}
if len(name) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table name required")
}
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region)
client := tables.NewTablesClient(cfg)
// icebergCompaction type
result, err := client.PutTableMaintenanceConfiguration(context.TODO(), &tables.PutTableMaintenanceConfigurationRequest{
TableBucketARN: oss.Ptr(tableBucketArn),
Namespace: oss.Ptr(namespace),
Name: oss.Ptr(name),
Type: oss.Ptr("icebergCompaction"),
Value: &tables.TableMaintenanceValue{
Status: oss.Ptr("enabled"),
Settings: &tables.TableMaintenanceSettings{
IcebergCompaction: &tables.IcebergCompactionSettingsDetail{
TargetFileSizeMB: oss.Ptr(400),
Strategy: oss.Ptr("auto"),
},
},
},
})
// icebergSnapshotManagement type
//result, err := client.PutTableMaintenanceConfiguration(context.TODO(), &tables.PutTableMaintenanceConfigurationRequest{
// TableBucketARN: oss.Ptr(tableBucketArn),
// Namespace: oss.Ptr(namespace),
// Name: oss.Ptr(name),
// Type: oss.Ptr("icebergSnapshotManagement"),
// Value: &tables.TableMaintenanceValue{
// Status: oss.Ptr("enabled"),
// Settings: &tables.TableMaintenanceSettings{
// IcebergSnapshotManagement: &tables.IcebergSnapshotManagementSettingsDetail{
// MaxSnapshotAgeHours: oss.Ptr(350),
// MinSnapshotsToKeep: oss.Ptr(1),
// },
// },
// },
//})
if err != nil {
log.Fatalf("failed to put table maintenance configuration %v", err)
}
log.Printf("put table maintenance configuration result:%#v\n", result)
}Java
查询压缩配置:
import com.aliyun.sdk.service.oss2.credentials.EnvironmentVariableCredentialsProvider;
import com.aliyun.sdk.service.oss2.tables.OSSTablesClient;
import com.aliyun.sdk.service.oss2.tables.models.*;
public class GetTableMaintenanceConfigurationSample {
public static void main(String[] args) throws Exception {
String region = "cn-hangzhou";
String tableBucketARN = "acs:osstables:cn-hangzhou:1234567890:bucket/mytable-bucket";
String namespace = "mynamespace";
String name = "mytable";
try (OSSTablesClient client = OSSTablesClient.newBuilder()
.credentialsProvider(new EnvironmentVariableCredentialsProvider())
.region(region)
.build()) {
GetTableMaintenanceConfigurationRequest request = GetTableMaintenanceConfigurationRequest.newBuilder()
.tableBucketARN(tableBucketARN)
.namespace(namespace)
.name(name)
.build();
GetTableMaintenanceConfigurationResult result = client.getTableMaintenanceConfiguration(request);
System.out.printf("Status code:%d, request id:%s%n",
result.statusCode(), result.requestId());
System.out.printf("Table ARN: %s%n", result.tableARN());
if (result.configuration() != null && !result.configuration().isEmpty()) {
System.out.println("Maintenance configurations:");
result.configuration().forEach((type, config) -> {
System.out.printf(" Type: %s, Status: %s%n", type, config.status());
if (config.settings() != null) {
if (config.settings().icebergCompaction() != null) {
System.out.printf(" Compaction - TargetFileSizeMB: %d, Strategy: %s%n",
config.settings().icebergCompaction().targetFileSizeMB(),
config.settings().icebergCompaction().strategy());
}
if (config.settings().icebergSnapshotManagement() != null) {
System.out.printf(" SnapshotManagement - MinSnapshotsToKeep: %d, MaxSnapshotAgeHours: %d%n",
config.settings().icebergSnapshotManagement().minSnapshotsToKeep(),
config.settings().icebergSnapshotManagement().maxSnapshotAgeHours());
}
}
});
} else {
System.out.println("No maintenance configuration found.");
}
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}修改压缩配置:
import com.aliyun.sdk.service.oss2.credentials.EnvironmentVariableCredentialsProvider;
import com.aliyun.sdk.service.oss2.tables.OSSTablesClient;
import com.aliyun.sdk.service.oss2.tables.models.*;
public class PutTableMaintenanceConfigurationSample {
public static void main(String[] args) throws Exception {
String region = "cn-hangzhou";
String tableBucketARN = "acs:osstables:cn-hangzhou:1234567890:bucket/mytable-bucket";
String namespace = "mynamespace";
String name = "mytable";
String type = "icebergCompaction";
String status = "enabled";
int targetFileSizeMB = 256;
String strategy = "auto";
try (OSSTablesClient client = OSSTablesClient.newBuilder()
.credentialsProvider(new EnvironmentVariableCredentialsProvider())
.region(region)
.build()) {
IcebergCompactionSettings compactionSettings = IcebergCompactionSettings.newBuilder()
.targetFileSizeMB(targetFileSizeMB)
.strategy(strategy)
.build();
TableMaintenanceSettings settings = TableMaintenanceSettings.newBuilder()
.icebergCompaction(compactionSettings)
.build();
TableMaintenanceConfigurationValue value = TableMaintenanceConfigurationValue.newBuilder()
.status(status)
.settings(settings)
.build();
PutTableMaintenanceConfigurationRequest request = PutTableMaintenanceConfigurationRequest.newBuilder()
.tableBucketARN(tableBucketARN)
.namespace(namespace)
.name(name)
.type(type)
.value(value)
.build();
PutTableMaintenanceConfigurationResult result = client.putTableMaintenanceConfiguration(request);
System.out.printf("Status code:%d, request id:%s%n",
result.statusCode(), result.requestId());
System.out.printf("Successfully put table maintenance configuration for table: %s/%s, type: %s%n", namespace, name, type);
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}API
压缩配置通过以下API操作:
快照管理
快照管理是表级别的优化操作。Apache Iceberg每次写入操作都会生成一个新的快照(Snapshot),用于记录数据的某个时间点状态。随着写入次数增加,过多的快照会消耗大量存储空间并增加元数据操作的开销。快照管理自动清理过期快照,在保证数据安全的前提下减少存储成本。
快照管理采用数据安全优先原则:当minSnapshotsToKeep和maxSnapshotAgeHours两个条件冲突时,优先满足最小保留快照数量,确保不会因清理快照而丢失必要的数据恢复能力。
快照管理的配置参数如下表所示。
参数 | 说明 | 取值范围 | 默认值 |
status | 是否启用快照管理。 | enabled或disabled。 | enabled |
maxSnapshotAgeHours | 快照最长保留时间(单位:小时)。超过该时间的快照将被清理。 | 1~2147483647。 | 120 |
minSnapshotsToKeep | 最少保留的快照数量。即使快照已超过最大保留时间,也会保留该数量的最近快照。 | 1~2147483647。 | 1 |
查看和配置快照管理。
控制台
在Table的数据维护页面查看和修改快照管理配置。
登录OSS管理控制台,在左侧导航栏选择Table Bucket 列表。
单击目标Table Bucket名称,在Table 列表中单击目标Table名称进入Table详情页。
选择数据维护 Tab页。
在快照管理区域,查看当前配置状态:
状态:显示功能是否已启用。
最大快照保留时间:快照的最大保留时间,默认为120小时。
最少保留快照数:至少保留的快照数量,默认为1个。
作业状态:显示最近一次快照清理作业的执行状态和时间。
单击编辑按钮,修改快照管理的配置参数,完成后单击保存。
ossutil
通过ossutil修改快照管理配置:
ossutil tables-api put-table-maintenance-configuration \
--table-bucket-arn {ARN} \
--namespace mynamespace \
--name mytable \
--type icebergSnapshotManagement \
--value '{"status":"enabled","settings":{"icebergSnapshotManagement":{"maxSnapshotAgeHours":72,"minSnapshotsToKeep":3}}}'SDK
以下代码展示了如何通过SDK查询和修改Table的快照管理配置。快照管理与压缩使用相同的表级维护配置接口(GetTableMaintenanceConfiguration / PutTableMaintenanceConfiguration)。
Python
查询快照管理配置:
import argparse
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.tables as oss_tables
parser = argparse.ArgumentParser(description="get table maintenance configuration sample")
parser.add_argument('--region', help='The region in which the table bucket is located.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS Tables.')
parser.add_argument('--table-bucket-arn', help='The ARN of the table bucket.', required=True)
parser.add_argument('--namespace', help='The namespace of the table.', required=True)
parser.add_argument('--name', help='The name of the table.', required=True)
def main():
args = parser.parse_args()
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint
client = oss_tables.Client(cfg)
result = client.get_table_maintenance_configuration(
oss_tables.models.GetTableMaintenanceConfigurationRequest(
table_bucket_arn=args.table_bucket_arn,
namespace=args.namespace,
name=args.name,
)
)
print(f'status code: {result.status_code},'
f' request id: {result.request_id},'
f' table arn: {result.table_arn},'
f' configuration: {result.configuration}')
if __name__ == "__main__":
main()修改快照管理配置:
import argparse
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.tables as oss_tables
parser = argparse.ArgumentParser(description="put table maintenance configuration sample")
parser.add_argument('--region', help='The region in which the table bucket is located.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS Tables.')
parser.add_argument('--table-bucket-arn', help='The ARN of the table bucket.', required=True)
parser.add_argument('--namespace', help='The namespace of the table.', required=True)
parser.add_argument('--name', help='The name of the table.', required=True)
parser.add_argument('--type', help='The maintenance type, e.g., icebergSnapshotManagement.', required=True)
def main():
args = parser.parse_args()
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint
client = oss_tables.Client(cfg)
value = {
'status': 'enabled',
'settings': {
'icebergSnapshotManagement': {
'maxSnapshotAgeHours': 72,
'minSnapshotsToKeep': 3
}
}
}
result = client.put_table_maintenance_configuration(
oss_tables.models.PutTableMaintenanceConfigurationRequest(
table_bucket_arn=args.table_bucket_arn,
namespace=args.namespace,
name=args.name,
type=args.type,
value=value,
)
)
print(f'status code: {result.status_code},'
f' request id: {result.request_id}')
print(f'successfully updated maintenance configuration for: {args.namespace}/{args.name}')
if __name__ == "__main__":
main()Go
查询快照管理配置:
package main
import (
"context"
"flag"
"log"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/tables"
)
var (
region string
tableBucketArn string
namespace string
name string
)
func init() {
flag.StringVar(®ion, "region", "", "The region in which the bucket is located.")
flag.StringVar(&tableBucketArn, "table-bucket-arn", "", "The arn of the table bucket.")
flag.StringVar(&namespace, "namespace", "", "The name of the namespace.")
flag.StringVar(&name, "name", "", "The name of the table.")
}
func main() {
flag.Parse()
if len(region) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, region required")
}
if len(tableBucketArn) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table bucket arn required")
}
if len(namespace) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, namespace name required")
}
if len(name) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table name required")
}
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region)
client := tables.NewTablesClient(cfg)
result, err := client.GetTableMaintenanceConfiguration(context.TODO(), &tables.GetTableMaintenanceConfigurationRequest{
TableBucketARN: oss.Ptr(tableBucketArn),
Namespace: oss.Ptr(namespace),
Name: oss.Ptr(name),
})
if err != nil {
log.Fatalf("failed to get table maintenance configuration %v", err)
}
log.Printf("get table maintenance configuration result:%#v\n", result)
}修改快照管理配置:
package main
import (
"context"
"flag"
"log"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/tables"
)
var (
region string
tableBucketArn string
namespace string
name string
)
func init() {
flag.StringVar(®ion, "region", "", "The region in which the bucket is located.")
flag.StringVar(&tableBucketArn, "table-bucket-arn", "", "The arn of the table bucket.")
flag.StringVar(&namespace, "namespace", "", "The name of the namespace.")
flag.StringVar(&name, "name", "", "The name of the table.")
}
func main() {
flag.Parse()
if len(region) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, region required")
}
if len(tableBucketArn) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table bucket arn required")
}
if len(namespace) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, namespace name required")
}
if len(name) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table name required")
}
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region)
client := tables.NewTablesClient(cfg)
// icebergSnapshotManagement type
result, err := client.PutTableMaintenanceConfiguration(context.TODO(), &tables.PutTableMaintenanceConfigurationRequest{
TableBucketARN: oss.Ptr(tableBucketArn),
Namespace: oss.Ptr(namespace),
Name: oss.Ptr(name),
Type: oss.Ptr("icebergSnapshotManagement"),
Value: &tables.TableMaintenanceValue{
Status: oss.Ptr("enabled"),
Settings: &tables.TableMaintenanceSettings{
IcebergSnapshotManagement: &tables.IcebergSnapshotManagementSettingsDetail{
MaxSnapshotAgeHours: oss.Ptr(350),
MinSnapshotsToKeep: oss.Ptr(1),
},
},
},
})
if err != nil {
log.Fatalf("failed to put table maintenance configuration %v", err)
}
log.Printf("put table maintenance configuration result:%#v\n", result)
}Java
查询快照管理配置:
import com.aliyun.sdk.service.oss2.credentials.EnvironmentVariableCredentialsProvider;
import com.aliyun.sdk.service.oss2.tables.OSSTablesClient;
import com.aliyun.sdk.service.oss2.tables.models.*;
public class GetTableMaintenanceConfigurationSample {
public static void main(String[] args) throws Exception {
String region = "cn-hangzhou";
String tableBucketARN = "acs:osstables:cn-hangzhou:1234567890:bucket/mytable-bucket";
String namespace = "mynamespace";
String name = "mytable";
try (OSSTablesClient client = OSSTablesClient.newBuilder()
.credentialsProvider(new EnvironmentVariableCredentialsProvider())
.region(region)
.build()) {
GetTableMaintenanceConfigurationRequest request = GetTableMaintenanceConfigurationRequest.newBuilder()
.tableBucketARN(tableBucketARN)
.namespace(namespace)
.name(name)
.build();
GetTableMaintenanceConfigurationResult result = client.getTableMaintenanceConfiguration(request);
System.out.printf("Status code:%d, request id:%s%n",
result.statusCode(), result.requestId());
System.out.printf("Table ARN: %s%n", result.tableARN());
if (result.configuration() != null && !result.configuration().isEmpty()) {
System.out.println("Maintenance configurations:");
result.configuration().forEach((type, config) -> {
System.out.printf(" Type: %s, Status: %s%n", type, config.status());
if (config.settings() != null) {
if (config.settings().icebergCompaction() != null) {
System.out.printf(" Compaction - TargetFileSizeMB: %d, Strategy: %s%n",
config.settings().icebergCompaction().targetFileSizeMB(),
config.settings().icebergCompaction().strategy());
}
if (config.settings().icebergSnapshotManagement() != null) {
System.out.printf(" SnapshotManagement - MinSnapshotsToKeep: %d, MaxSnapshotAgeHours: %d%n",
config.settings().icebergSnapshotManagement().minSnapshotsToKeep(),
config.settings().icebergSnapshotManagement().maxSnapshotAgeHours());
}
}
});
} else {
System.out.println("No maintenance configuration found.");
}
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}修改快照管理配置:
import com.aliyun.sdk.service.oss2.credentials.EnvironmentVariableCredentialsProvider;
import com.aliyun.sdk.service.oss2.tables.OSSTablesClient;
import com.aliyun.sdk.service.oss2.tables.models.*;
public class PutTableMaintenanceConfigurationSample {
public static void main(String[] args) throws Exception {
String region = "cn-hangzhou";
String tableBucketARN = "acs:osstables:cn-hangzhou:1234567890:bucket/mytable-bucket";
String namespace = "mynamespace";
String name = "mytable";
String type = "icebergSnapshotManagement";
String status = "enabled";
int maxSnapshotAgeHours = 72;
int minSnapshotsToKeep = 3;
try (OSSTablesClient client = OSSTablesClient.newBuilder()
.credentialsProvider(new EnvironmentVariableCredentialsProvider())
.region(region)
.build()) {
IcebergSnapshotManagementSettings snapshotManagementSettings = IcebergSnapshotManagementSettings.newBuilder()
.maxSnapshotAgeHours(maxSnapshotAgeHours)
.minSnapshotsToKeep(minSnapshotsToKeep)
.build();
TableMaintenanceSettings settings = TableMaintenanceSettings.newBuilder()
.icebergSnapshotManagement(snapshotManagementSettings)
.build();
TableMaintenanceConfigurationValue value = TableMaintenanceConfigurationValue.newBuilder()
.status(status)
.settings(settings)
.build();
PutTableMaintenanceConfigurationRequest request = PutTableMaintenanceConfigurationRequest.newBuilder()
.tableBucketARN(tableBucketARN)
.namespace(namespace)
.name(name)
.type(type)
.value(value)
.build();
PutTableMaintenanceConfigurationResult result = client.putTableMaintenanceConfiguration(request);
System.out.printf("Status code:%d, request id:%s%n",
result.statusCode(), result.requestId());
System.out.printf("Successfully put table maintenance configuration for table: %s/%s, type: %s%n", namespace, name, type);
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}API
快照管理配置通过以下API操作:
运行机制
触发时机:维护任务由系统后台自动调度执行,非固定时间点触发。通常情况下,针对同一个Table的同一种维护任务大约每24小时运行一次。当系统负载高时,维护任务可能会延后启动。
资源占用:自动维护任务不占用用户侧的吞吐和QPS配额,不影响生产任务的正常运行。
事务隔离:维护任务基于Iceberg的快照隔离机制运行。在合并文件的同时,生产任务可以继续写入新数据。读取任务也不受影响,查询引擎会自动选择最新的有效快照。
查询维护任务状态
查询Table的三种维护任务(压缩、快照管理、未引用文件清理)的执行状态。每种维护任务独立返回各自的状态信息。
控制台
在Table的数据维护页面查看各维护任务的作业状态。
登录OSS管理控制台,在左侧导航栏选择Table Bucket 列表。
单击目标Table Bucket名称,在Table 列表中单击目标Table名称进入Table详情页。
选择数据维护 Tab页。
在各维护模块区域查看作业状态信息:
压缩:显示压缩任务的执行状态(如"失败"或"成功")及最后运行时间。
快照管理:显示快照清理任务的执行状态及最后运行时间。
未引用文件清理:Bucket级别配置,单击在Table Bucket编辑配置链接跳转到Table Bucket的数据维护页面查看。
ossutil
ossutil tables-api get-table-maintenance-job-status \
--table-bucket-arn acs:osstables:cn-hangzhou:1234567890:bucket/mytable-bucket \
--namespace mynamespace \
--name mytableSDK
Python
import argparse
import alibabacloud_oss_v2 as oss
import alibabacloud_oss_v2.tables as oss_tables
parser = argparse.ArgumentParser(description="get table maintenance job status sample")
parser.add_argument('--region', help='The region in which the table bucket is located.', required=True)
parser.add_argument('--endpoint', help='The domain names that other services can use to access OSS Tables.')
parser.add_argument('--table-bucket-arn', help='The ARN of the table bucket.', required=True)
parser.add_argument('--namespace', help='The namespace of the table.', required=True)
parser.add_argument('--name', help='The name of the table.', required=True)
def main():
args = parser.parse_args()
credentials_provider = oss.credentials.EnvironmentVariableCredentialsProvider()
cfg = oss.config.load_default()
cfg.credentials_provider = credentials_provider
cfg.region = args.region
if args.endpoint is not None:
cfg.endpoint = args.endpoint
client = oss_tables.Client(cfg)
result = client.get_table_maintenance_job_status(
oss_tables.models.GetTableMaintenanceJobStatusRequest(
table_bucket_arn=args.table_bucket_arn,
namespace=args.namespace,
name=args.name,
)
)
print(f'status code: {result.status_code},'
f' request id: {result.request_id},'
f' table arn: {result.table_arn},'
f' status: {result.status}')
if __name__ == "__main__":
main()Go
package main
import (
"context"
"flag"
"log"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/tables"
)
var (
region string
tableBucketArn string
namespace string
name string
)
func init() {
flag.StringVar(®ion, "region", "", "The region in which the bucket is located.")
flag.StringVar(&tableBucketArn, "table-bucket-arn", "", "The arn of the table bucket.")
flag.StringVar(&namespace, "namespace", "", "The name of the namespace.")
flag.StringVar(&name, "name", "", "The name of the table.")
}
func main() {
flag.Parse()
if len(region) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, region required")
}
if len(tableBucketArn) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table bucket arn required")
}
if len(namespace) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, namespace name required")
}
if len(name) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, table name required")
}
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region)
client := tables.NewTablesClient(cfg)
result, err := client.GetTableMaintenanceJobStatus(context.TODO(), &tables.GetTableMaintenanceJobStatusRequest{
TableBucketARN: oss.Ptr(tableBucketArn),
Namespace: oss.Ptr(namespace),
Name: oss.Ptr(name),
})
if err != nil {
log.Fatalf("failed to get table maintenance job status %v", err)
}
log.Printf("get table maintenance job status result:%#v\n", result)
}Java
import com.aliyun.sdk.service.oss2.credentials.EnvironmentVariableCredentialsProvider;
import com.aliyun.sdk.service.oss2.tables.OSSTablesClient;
import com.aliyun.sdk.service.oss2.tables.models.*;
public class GetTableMaintenanceJobStatusSample {
public static void main(String[] args) throws Exception {
String region = "cn-hangzhou";
String tableBucketARN = "acs:osstables:cn-hangzhou:1234567890:bucket/mytable-bucket";
String namespace = "mynamespace";
String name = "mytable";
try (OSSTablesClient client = OSSTablesClient.newBuilder()
.credentialsProvider(new EnvironmentVariableCredentialsProvider())
.region(region)
.build()) {
GetTableMaintenanceJobStatusRequest request = GetTableMaintenanceJobStatusRequest.newBuilder()
.tableBucketARN(tableBucketARN)
.namespace(namespace)
.name(name)
.build();
GetTableMaintenanceJobStatusResult result = client.getTableMaintenanceJobStatus(request);
System.out.printf("Status code:%d, request id:%s%n",
result.statusCode(), result.requestId());
System.out.printf("Table ARN: %s%n", result.tableARN());
if (result.jobStatus() != null && !result.jobStatus().isEmpty()) {
System.out.println("Maintenance job status:");
result.jobStatus().forEach((type, status) -> {
System.out.printf(" Type: %s, Status: %s%n", type, status.status());
if (status.lastRunTimestamp() != null) {
System.out.printf(" LastRunTimestamp: %s%n", status.lastRunTimestamp());
}
if (status.failureMessage() != null && !status.failureMessage().isEmpty()) {
System.out.printf(" FailureMessage: %s%n", status.failureMessage());
}
});
} else {
System.out.println("No maintenance job status found.");
}
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
}API
使用GetTableMaintenanceJobStatus接口查询维护任务状态。