OSS Tables 兼容 Apache Iceberg REST Catalog 协议。可以通过 Kafka Connect 的 Iceberg Sink Connector 将 Kafka 消息实时写入 OSS Tables 中的表,实现流式数据入湖。
步骤一:环境准备
下载依赖JAR包
将以下 JAR 包放入 Kafka Connect 的插件目录(plugin.path 指定的路径)。
JAR包 | 版本要求 | 说明 |
匹配iceberg版本 | 提供 S3FileIO 实现及 REST Catalog SigV4 签名认证所需的 AWS SDK。 | |
匹配iceberg版本 | 提供 SigV4 签名和 S3FileIO 实现。版本需与 iceberg-aws-bundle 一致。 | |
匹配iceberg版本 | Parquet 文件格式写入支持。 | |
3.3.6 | Hadoop 运行时依赖(Iceberg 内部加载需要),版本可按需调整。 | |
3.3.6 | Hadoop API 依赖(Iceberg 内部加载需要),版本可按需调整。 | |
3.3.2 | Iceberg SnapshotProducer 运行时需要此依赖,缺少会导致 ClassNotFoundException。 |
步骤二:创建Table Bucket
在开始写入数据之前,需要创建 Table Bucket 和 Namespace。可以使用 ossutil 或 AWS CLI 创建。
方式一:使用ossutil
1. 安装或升级 ossutil
请安装ossutil 2.3.0以上版本,如已安装 ossutil,可执行以下命令升级到最新版本:
ossutil update -f2. 配置凭证
执行 ossutil config 命令,按提示输入 AccessKey ID、AccessKey Secret 和 Region。
3. 创建 Table Bucket
ossutil tables-api create-table-bucket --name {table bucket名称} --endpoint http://{endpint} --region {region}命令执行成功后,返回结果中包含 Table Bucket ARN,请记录该值。
4. 创建 Namespace
ossutil tables-api create-namespace --table-bucket-arn {Table Bucket ARN} --namespace {Namespace名称} --endpoint http://{endpint}Namespace 和 Table 名称不能包含连字符(-),可使用下划线(_),这是因为名称会用于 SQL 语句中的标识符。
5. 创建 Table
您可以选择以下任一方式创建 Iceberg 表:
通过其他计算引擎创建(如 Spark)。
通过 ossutil 创建:先将表 schema 保存为 JSON 文件,再调用
create-table。以下示例的 schema 文件
schema.json定义了 3 个字段:{ "iceberg": { "schema": { "fields": [ {"name": "event_id", "type": "string", "required": true}, {"name": "event_time", "type": "string"}, {"name": "event_type", "type": "string"} ] } } }基于 schema 文件创建 Table:
ossutil tables-api create-table --table-bucket-arn <Table Bucket ARN> --namespace <Namespace名称> --name <表名称> --format ICEBERG --metadata file://schema.json --endpoint http://<Region>-internal.oss-tables.aliyuncs.com
方式二:使用AWS CLI
OSS Tables 兼容 S3 Tables API,也可以使用 AWS CLI 管理 Table Bucket。
1. 安装 AWS CLI
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install2. 配置凭证
执行 aws configure 命令,按提示输入 AccessKey ID、AccessKey Secret 和 Region。
3. 创建 Table Bucket
aws s3tables --endpoint http://<Region>.oss-tables.aliyuncs.com create-table-bucket --region <Region> --name <Table Bucket名称>命令执行成功后,返回结果中包含 Table Bucket ARN。
4. 创建 Namespace
aws s3tables --endpoint http://<Region>.oss-tables.aliyuncs.com create-namespace --table-bucket-arn <Table Bucket ARN> --namespace <Namespace名称>5. 创建 Table
通过其他计算引擎(如 Spark)创建表
使用 AWS CLI 创建。使用 AWS CLI 时,先将完整的入参保存为 JSON 文件
create-table.json,再调用create-table。{ "tableBucketARN": "<Table Bucket ARN>", "namespace": "<Namespace名称>", "name": "<表名称>", "format": "ICEBERG", "metadata": { "iceberg": { "schema": { "fields": [ {"name": "event_id", "type": "string", "required": true}, {"name": "event_time", "type": "string"}, {"name": "event_type", "type": "string"} ] } } } }aws s3tables --endpoint http://<Region>.oss-tables.aliyuncs.com create-table --cli-input-json file://create-table.json
6. 管理后台维护任务
OSS Tables 支持自动执行 Iceberg 表的后台维护(如文件清理、文件合并等),通过 AWS CLI 可以查询和配置维护任务。
查询 Table 维护任务状态:
aws s3tables get-table-maintenance-job-status \
--table-bucket-arn=<Table Bucket ARN> \
--namespace=<Namespace名称> \
--name=<表名>配置 Bucket 级维护策略(文件清理):
aws s3tables put-table-bucket-maintenance-configuration \
--table-bucket-arn <Table Bucket ARN> \
--type icebergUnreferencedFileRemoval \
--value '{"status":"enabled","settings":{"icebergUnreferencedFileRemoval":{"unreferencedDays":4,"nonCurrentDays":10}}}'配置 Table 级维护策略(小文件合并):
aws s3tables put-table-maintenance-configuration \
--table-bucket-arn <Table Bucket ARN> \
--type icebergCompaction \
--namespace <Namespace名称> \
--name <表名> \
--value='{"status":"enabled","settings":{"icebergCompaction":{"targetFileSizeMB":256}}}'步骤三:配置Kafka Connect
OSS Tables 提供 Iceberg REST Catalog 端点,Kafka Connect 通过 Iceberg Sink Connector 连接该端点写入数据。
外网Endpoint格式:
https://<REGION>.oss-tables.aliyuncs.com/iceberg内网Endpoint格式:
https://<REGION>-internal.oss-tables.aliyuncs.com/iceberg
Connector配置
创建 Iceberg Sink Connector 时,指定 Connector 类为 org.apache.iceberg.connect.IcebergSinkConnector,并配置以下属性:
# --- Iceberg Catalog (REST) ---
iceberg.catalog.type: rest
iceberg.catalog.uri: https://<REGION>-internal.oss-tables.aliyuncs.com/iceberg
iceberg.catalog.rest.sigv4-enabled: true
iceberg.catalog.rest.signing-region: <Region>
iceberg.catalog.warehouse: <Table Bucket ARN>
iceberg.catalog.rest.signing-name: osstables
iceberg.catalog.rest.access-key-id: <AccessKey ID>
iceberg.catalog.rest.secret-access-key: <AccessKey Secret>
# --- Force S3FileIO (catalog returns oss:// but storage is S3-compatible) ---
iceberg.catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO
# --- S3FileIO 存储配置 ---
iceberg.catalog.s3.endpoint: https://oss-<Region>-internal.aliyuncs.com
iceberg.catalog.s3.access-key-id: <AccessKey ID>
iceberg.catalog.s3.secret-access-key: <AccessKey Secret>
iceberg.catalog.s3.path-style-access: true
iceberg.catalog.client.region: <Region>
# --- 数据格式转换 ---
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false如果使用较高版本的 AWS SDK(2.20+),可能出现签名错误:aws-chunked encoding is not supported with the specified x-amz-content-sha256 value。此时需要在 Kafka Connect 的 Java 启动参数中添加以下 JVM 选项:
-Daws.requestChecksumCalculation=when_required
-Daws.responseChecksumValidation=when_required配置参数说明
参数 | 是否必填 | 说明 |
| 是 | 固定为 |
| 是 | REST Catalog 端点 URL。格式:
|
| 是 | Table Bucket ARN。格式: |
| 是 | 固定为 |
| 是 | 固定为 |
| 是 | 固定为 |
| 是 | OSS 数据面端点。格式:
|
| 是 | 固定为 |
权限配置
使用 RAM 用户或 STS 临时凭证访问 OSS Tables 时,需确保对应身份具备所需的操作权限。
资源定义
Table Bucket ARN:
acs:osstables:<Region>:<阿里云账号ID>:bucket/<bucket_name>Table ARN:
acs:osstables:<Region>:<阿里云账号ID>:bucket/<bucket_name>/table/<table_id>
Action 定义
下表列出 OSS Tables 支持的 Action,及其是否支持跨账号授权:
分类 | Action | 跨账号访问 |
Table Bucket 级别 |
| 不允许 |
| 允许 | |
| 不允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 不允许 | |
| 不允许 | |
| 不允许 | |
| 允许 | |
| 允许 | |
| 不允许 | |
| 不允许 | |
| 不允许 | |
Table 级别 |
| 允许 |
| 允许 | |
| 不允许 | |
| 不允许 | |
| 不允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 允许 | |
| 不允许 | |
| 不允许 | |
| 允许 |
Iceberg REST操作与权限映射
下表列出 Iceberg REST Catalog 各操作所需的 OSS Action:
Iceberg REST 操作 | 所需 OSS Action |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|