通过Kafka Connect写入OSS Tables

更新时间:
复制为 MD 格式

OSS Tables 兼容 Apache Iceberg REST Catalog 协议。可以通过 Kafka Connect 的 Iceberg Sink Connector 将 Kafka 消息实时写入 OSS Tables 中的表,实现流式数据入湖。

步骤一:环境准备

下载依赖JAR

将以下 JAR 包放入 Kafka Connect 的插件目录(plugin.path 指定的路径)。

JAR

版本要求

说明

iceberg-aws-bundle-1.10.1.jar

匹配iceberg版本

提供 S3FileIO 实现及 REST Catalog SigV4 签名认证所需的 AWS SDK。

iceberg-aws-1.10.1.jar

匹配iceberg版本

提供 SigV4 签名和 S3FileIO 实现。版本需与 iceberg-aws-bundle 一致。

iceberg-parquet-1.10.1.jar

匹配iceberg版本

Parquet 文件格式写入支持。

hadoop-client-runtime-3.3.6.jar

3.3.6

Hadoop 运行时依赖(Iceberg 内部加载需要),版本可按需调整。

hadoop-client-api-3.3.6.jar

3.3.6

Hadoop API 依赖(Iceberg 内部加载需要),版本可按需调整。

failsafe-3.3.2.jar

3.3.2

Iceberg SnapshotProducer 运行时需要此依赖,缺少会导致 ClassNotFoundException。

步骤二:创建Table Bucket

在开始写入数据之前,需要创建 Table Bucket 和 Namespace。可以使用 ossutil 或 AWS CLI 创建。

方式一:使用ossutil

1. 安装或升级 ossutil

安装ossutil 2.3.0以上版本,如已安装 ossutil,可执行以下命令升级到最新版本:

ossutil update -f

2. 配置凭证

执行 ossutil config 命令,按提示输入 AccessKey ID、AccessKey Secret 和 Region。

3. 创建 Table Bucket

ossutil tables-api create-table-bucket --name {table bucket名称} --endpoint http://{endpint} --region {region}

命令执行成功后,返回结果中包含 Table Bucket ARN,请记录该值。

4. 创建 Namespace

ossutil tables-api create-namespace --table-bucket-arn {Table Bucket ARN} --namespace {Namespace名称} --endpoint http://{endpint}
重要

Namespace 和 Table 名称不能包含连字符(-),可使用下划线(_),这是因为名称会用于 SQL 语句中的标识符。

5. 创建 Table

您可以选择以下任一方式创建 Iceberg 表:

  • 通过其他计算引擎创建(如 Spark)。

  • 通过 ossutil 创建:先将表 schema 保存为 JSON 文件,再调用 create-table

    以下示例的 schema 文件 schema.json 定义了 3 个字段:

    {
      "iceberg": {
        "schema": {
          "fields": [
            {"name": "event_id", "type": "string", "required": true},
            {"name": "event_time", "type": "string"},
            {"name": "event_type", "type": "string"}
          ]
        }
      }
    }

    基于 schema 文件创建 Table:

    ossutil tables-api create-table --table-bucket-arn <Table Bucket ARN> --namespace <Namespace名称> --name <表名称> --format ICEBERG --metadata file://schema.json --endpoint http://<Region>-internal.oss-tables.aliyuncs.com

方式二:使用AWS CLI

OSS Tables 兼容 S3 Tables API,也可以使用 AWS CLI 管理 Table Bucket。

1. 安装 AWS CLI

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

2. 配置凭证

执行 aws configure 命令,按提示输入 AccessKey ID、AccessKey Secret 和 Region。

3. 创建 Table Bucket

aws s3tables --endpoint http://<Region>.oss-tables.aliyuncs.com create-table-bucket --region <Region> --name <Table Bucket名称>

命令执行成功后,返回结果中包含 Table Bucket ARN。

4. 创建 Namespace

aws s3tables --endpoint http://<Region>.oss-tables.aliyuncs.com create-namespace --table-bucket-arn <Table Bucket ARN> --namespace <Namespace名称>

5. 创建 Table

  • 通过其他计算引擎(如 Spark)创建表

  • 使用 AWS CLI 创建。使用 AWS CLI 时,先将完整的入参保存为 JSON 文件 create-table.json,再调用 create-table

    {
      "tableBucketARN": "<Table Bucket ARN>",
      "namespace": "<Namespace名称>",
      "name": "<表名称>",
      "format": "ICEBERG",
      "metadata": {
        "iceberg": {
          "schema": {
            "fields": [
              {"name": "event_id", "type": "string", "required": true},
              {"name": "event_time", "type": "string"},
              {"name": "event_type", "type": "string"}
            ]
          }
        }
      }
    }
    aws s3tables --endpoint http://<Region>.oss-tables.aliyuncs.com create-table --cli-input-json file://create-table.json

6. 管理后台维护任务

OSS Tables 支持自动执行 Iceberg 表的后台维护(如文件清理、文件合并等),通过 AWS CLI 可以查询和配置维护任务。

查询 Table 维护任务状态:

aws s3tables get-table-maintenance-job-status \
   --table-bucket-arn=<Table Bucket ARN> \
   --namespace=<Namespace名称> \
   --name=<表名>

配置 Bucket 级维护策略(文件清理):

aws s3tables put-table-bucket-maintenance-configuration \
   --table-bucket-arn <Table Bucket ARN> \
   --type icebergUnreferencedFileRemoval \
   --value '{"status":"enabled","settings":{"icebergUnreferencedFileRemoval":{"unreferencedDays":4,"nonCurrentDays":10}}}'

配置 Table 级维护策略(小文件合并):

aws s3tables put-table-maintenance-configuration \
   --table-bucket-arn <Table Bucket ARN> \
   --type icebergCompaction \
   --namespace <Namespace名称> \
   --name <表名> \
   --value='{"status":"enabled","settings":{"icebergCompaction":{"targetFileSizeMB":256}}}'

步骤三:配置Kafka Connect

OSS Tables 提供 Iceberg REST Catalog 端点,Kafka Connect 通过 Iceberg Sink Connector 连接该端点写入数据。

  • 外网Endpoint格式:

    https://<REGION>.oss-tables.aliyuncs.com/iceberg
  • 内网Endpoint格式:

    https://<REGION>-internal.oss-tables.aliyuncs.com/iceberg

Connector配置

创建 Iceberg Sink Connector 时,指定 Connector 类为 org.apache.iceberg.connect.IcebergSinkConnector,并配置以下属性:

# --- Iceberg Catalog (REST) ---
iceberg.catalog.type: rest
iceberg.catalog.uri: https://<REGION>-internal.oss-tables.aliyuncs.com/iceberg
iceberg.catalog.rest.sigv4-enabled: true
iceberg.catalog.rest.signing-region: <Region>
iceberg.catalog.warehouse: <Table Bucket ARN>
iceberg.catalog.rest.signing-name: osstables
iceberg.catalog.rest.access-key-id: <AccessKey ID>
iceberg.catalog.rest.secret-access-key: <AccessKey Secret>


# --- Force S3FileIO (catalog returns oss:// but storage is S3-compatible) ---
iceberg.catalog.io-impl: org.apache.iceberg.aws.s3.S3FileIO

# --- S3FileIO 存储配置 ---
iceberg.catalog.s3.endpoint: https://oss-<Region>-internal.aliyuncs.com
iceberg.catalog.s3.access-key-id: <AccessKey ID>
iceberg.catalog.s3.secret-access-key: <AccessKey Secret>
iceberg.catalog.s3.path-style-access: true
iceberg.catalog.client.region: <Region>

# --- 数据格式转换 ---
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
重要

如果使用较高版本的 AWS SDK(2.20+),可能出现签名错误:aws-chunked encoding is not supported with the specified x-amz-content-sha256 value。此时需要在 Kafka Connect 的 Java 启动参数中添加以下 JVM 选项:

-Daws.requestChecksumCalculation=when_required
-Daws.responseChecksumValidation=when_required

配置参数说明

参数

是否必填

说明

iceberg.catalog.type

固定为 rest,指定使用 REST Catalog。

iceberg.catalog.uri

REST Catalog 端点 URL。格式:

  • 内网https://{region}-internal.oss-tables.aliyuncs.com/iceberg

  • 外网https://{region}.oss-tables.aliyuncs.com/iceberg

iceberg.catalog.warehouse

Table Bucket ARN。格式:acs:osstables:<Region>:<阿里云账号ID>:bucket/<Table Bucket名称>

iceberg.catalog.rest.sigv4-enabled

固定为 true,启用 SigV4 签名认证。

iceberg.catalog.rest.signing-name

固定为 osstables,OSS Tables 服务端点的 SigV4 签名服务名。

iceberg.catalog.io-impl

固定为 org.apache.iceberg.aws.s3.S3FileIO,使用 S3 协议访问 OSS 数据面。

iceberg.catalog.s3.endpoint

OSS 数据面端点。格式:

  • 内网https://oss-{region}-internal.aliyuncs.com

  • 外网https://oss-{region}.aliyuncs.com

iceberg.catalog.s3.path-style-access

固定为 true,使用 Path-Style 访问模式。

权限配置

使用 RAM 用户或 STS 临时凭证访问 OSS Tables 时,需确保对应身份具备所需的操作权限。

资源定义

  • Table Bucket ARN:acs:osstables:<Region>:<阿里云账号ID>:bucket/<bucket_name>

  • Table ARN:acs:osstables:<Region>:<阿里云账号ID>:bucket/<bucket_name>/table/<table_id>

Action 定义

下表列出 OSS Tables 支持的 Action,及其是否支持跨账号授权:

分类

Action

跨账号访问

Table Bucket 级别

oss:CreateTableBucket

不允许

oss:GetTableBucket

允许

oss:ListTableBuckets

不允许

oss:CreateNamespace

允许

oss:GetNamespace

允许

oss:ListNamespaces

允许

oss:DeleteNamespace

允许

oss:DeleteTableBucket

允许

oss:PutTableBucketPolicy

不允许

oss:GetTableBucketPolicy

不允许

oss:DeleteTableBucketPolicy

不允许

oss:GetTableBucketMaintenanceConfiguration

允许

oss:PutTableBucketMaintenanceConfiguration

允许

oss:PutTableBucketEncryption

不允许

oss:GetTableBucketEncryption

不允许

oss:DeleteTableBucketEncryption

不允许

Table 级别

oss:GetTableMaintenanceConfiguration

允许

oss:PutTableMaintenanceConfiguration

允许

oss:PutTablePolicy

不允许

oss:GetTablePolicy

不允许

oss:DeleteTablePolicy

不允许

oss:CreateTable

允许

oss:GetTable

允许

oss:GetTableMetadataLocation

允许

oss:ListTables

允许

oss:RenameTable

允许

oss:UpdateTableMetadataLocation

允许

oss:GetTableData

允许

oss:PutTableData

允许

oss:GetTableEncryption

不允许

oss:PutTableEncryption

不允许

oss:DeleteTable

允许

Iceberg REST操作与权限映射

下表列出 Iceberg REST Catalog 各操作所需的 OSS Action:

Iceberg REST 操作

所需 OSS Action

getConfig

oss:GetTableBucket

listNamespaces

oss:ListNamespaces

createNamespace

oss:CreateNamespace

loadNamespaceMetadata

oss:GetNamespace

dropNamespace

oss:DeleteNamespace

listTables

oss:ListTables

createTable

oss:CreateTableoss:PutTableData

loadTable

oss:GetTableMetadataLocationoss:GetTableData

updateTable

oss:UpdateTableMetadataLocationoss:PutTableDataoss:GetTableData

dropTable

oss:DeleteTable

renameTable

oss:RenameTable

tableExists

oss:GetTable

namespaceExists

oss:GetNamespace