OSS Tables 兼容 Apache Iceberg REST Catalog 标准协议,无需搭建 Hive Metastore 等外部 Catalog 服务。Spark等计算引擎通过Iceberg REST Catalog直接对接OSS Tables,使用标准SQL进行建表、查询、写入等操作。
前提条件
已安装Spark 3.5及以上版本,且Java版本为11或以上。
已创建OSS Tables的Table Bucket。如未创建,请参见OSS Tables。
步骤一:环境准备
下载依赖JAR包
将以下JAR包放入$SPARK_HOME/jars目录,或在启动时通过--jars参数指定。
JAR包 | 说明 |
Iceberg Spark Runtime包,提供Iceberg的Spark集成能力。请根据Spark版本选择对应的包(如Spark 3.5对应 | |
Iceberg AWS集成包,提供S3FileIO实现及REST Catalog sigv4签名认证所需的AWS SDK。版本需与Runtime包一致。 |
配置环境变量
Iceberg REST Catalog 使用 sigv4 签名认证,S3FileIO 访问数据面也需要凭证。推荐通过环境变量统一传递,在启动 Spark 之前设置以下环境变量:
环境变量名使用 AWS_ 前缀,是因为 Iceberg 的 sigv4 签名模块和 S3FileIO 复用 AWS SDK 的标准凭证链。实际填入的是您阿里云账号的 AccessKey ID 和 AccessKey Secret。
export AWS_ACCESS_KEY_ID=<阿里云AccessKey ID>
export AWS_SECRET_ACCESS_KEY=<阿里云AccessKey Secret>
export AWS_REGION=<地域,例如cn-hangzhou>
# 可选,使用STS临时凭证时配置
export AWS_SESSION_TOKEN=<阿里云STS TOKEN>配置Spark 配置项
除了通过环境变量传递凭证外,也可以在 PySpark 或 Spark SQL 中通过 Spark Catalog 相关配置项显式传递凭证。该方式更适用于多 Catalog 场景,或不便设置环境变量的情况。
# S3FileIO数据面凭证
spark.sql.catalog.oss_tables.s3.access-key-id=<阿里云AccessKey ID>
spark.sql.catalog.oss_tables.s3.secret-access-key=<阿里云AccessKey Secret>
spark.sql.catalog.oss_tables.client.region=<地域,例如cn-hangzhou>
# 可选,使用STS临时凭证时配置
spark.sql.catalog.oss_tables.s3.session-token=<阿里云STS TOKEN>
# REST Catalog签名凭证
spark.sql.catalog.oss_tables.rest.access-key-id=<阿里云AccessKey ID>
spark.sql.catalog.oss_tables.rest.secret-access-key=<阿里云AccessKey Secret>
spark.sql.catalog.oss_tables.rest.signing-region=<地域,例如cn-hangzhou>
# 可选,使用STS临时凭证时配置
spark.sql.catalog.oss_tables.rest.session-token=<阿里云STS TOKEN>步骤二:配置Spark连接
OSS Tables提供Iceberg REST Catalog端点,Spark通过该端点管理表元数据。Endpoint格式如下:
内网:
https://{tableBucketName}.osstables-{region}-internal.aliyuncs.com/iceberg外网:
https://{tableBucketName}.osstables-{region}.aliyuncs.com/iceberg
OSS Tables 提供S3FileIO访问OSS数据面使用的访问端点,Spark 通过该端点访问表数据。Endpoint格式如下:
内网:
https://oss-{region}-internal.aliyuncs.com外网:
https://oss-{region}.aliyuncs.com
io-impl 请勿配置为 org.apache.iceberg.hadoop.HadoopFileIO。
Iceberg 的设计理念是去 List 化,OSS Table Bucket 作为面向 Iceberg 深度优化的存储类型,禁止 List 操作。HadoopFileIO 基于对象存储的文件语义实现,为兼容通用场景会触发 List 操作,因此不适用于 OSS Table Bucket 的数据访问。
通过PySpark启动
以下示例以 Table Bucket 名称 my-data-lake、地域 cn-hangzhou 为例。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("OSS Tables Demo") \
.config("spark.jars", "/path/to/iceberg-spark-runtime-3.5_2.12-1.10.1.jar,"
"/path/to/iceberg-aws-bundle-1.10.1.jar") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.oss_tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.oss_tables.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
.config("spark.sql.catalog.oss_tables.uri", "https://my-data-lake.osstables-cn-hangzhou-internal.aliyuncs.com/iceberg") \
.config("spark.sql.catalog.oss_tables.warehouse", "my-data-lake") \
.config("spark.sql.catalog.oss_tables.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.oss_tables.rest.signing-region", "cn-hangzhou") \
.config("spark.sql.catalog.oss_tables.rest.signing-name", "osstables") \
.config("spark.sql.catalog.oss_tables.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.catalog.oss_tables.s3.endpoint", "https://oss-cn-hangzhou-internal.aliyuncs.com") \
.getOrCreate()通过spark-sql启动
spark-sql \
--jars /path/to/iceberg-spark-runtime-3.5_2.12-1.10.1.jar,/path/to/iceberg-aws-bundle-1.10.1.jar \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.oss_tables=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.oss_tables.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
--conf spark.sql.catalog.oss_tables.uri=https://my-data-lake.osstables-cn-hangzhou-internal.aliyuncs.com/iceberg \
--conf spark.sql.catalog.oss_tables.warehouse=my-data-lake \
--conf spark.sql.catalog.oss_tables.rest.sigv4-enabled=true \
--conf spark.sql.catalog.oss_tables.rest.signing-region=cn-hangzhou \
--conf spark.sql.catalog.oss_tables.rest.signing-name=osstables \
--conf spark.sql.catalog.oss_tables.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.oss_tables.s3.endpoint=https://oss-cn-hangzhou-internal.aliyuncs.com配置参数说明
参数 | 是否必填 | 说明 |
| 是 | 固定为 |
| 是 | REST Catalog端点URL。格式:
|
| 是 | Table Bucket名称。 |
| 是 | 固定为 |
| 否(环境变量配置凭证时) | REST Catalog签名地域,需与Table Bucket所在地域一致,例如 |
| 是 | 签名服务名,固定为 |
| 是 | 指定Iceberg读写底层数据文件的FileIO实现,固定为 |
| 是 | S3FileIO访问OSS数据面使用的Endpoint,必须包含
|
步骤三:使用SQL操作数据
连接成功后,使用标准SQL语句进行数据操作。
管理Namespace
Namespace(命名空间)用于对表进行逻辑分组,作用相当于数据库。
-- 查看现有Namespace
SHOW NAMESPACES IN oss_tables;
-- 创建Namespace
CREATE NAMESPACE oss_tables.my_namespace;
-- 删除Namespace(需先删除其中所有Table)
DROP NAMESPACE oss_tables.my_namespace;建表与表管理
-- 创建非分区表
CREATE TABLE oss_tables.my_namespace.users (
id BIGINT NOT NULL COMMENT '用户ID',
name STRING COMMENT '用户名',
email STRING COMMENT '邮箱',
created_at TIMESTAMP COMMENT '创建时间'
) USING iceberg;
-- 创建分区表(按天分区)
CREATE TABLE oss_tables.my_namespace.events (
id BIGINT NOT NULL,
event_type STRING,
data STRING,
ts TIMESTAMP
) USING iceberg
PARTITIONED BY (days(ts));
-- 查看Namespace中的所有Table
SHOW TABLES IN oss_tables.my_namespace;
-- 查看表结构
DESCRIBE TABLE oss_tables.my_namespace.users;
-- 删除表(OSS Tables 要求必须带 PURGE 关键字,否则会报错:OSS Tables only supports dropping tables with purge enabled)
DROP TABLE oss_tables.my_namespace.users PURGE;数据写入与查询
-- 插入数据
INSERT INTO oss_tables.my_namespace.users VALUES
(1, '张三', 'zhangsan@example.com', TIMESTAMP '2024-01-15 10:30:00'),
(2, '李四', 'lisi@example.com', TIMESTAMP '2024-01-16 14:20:00'),
(3, '王五', 'wangwu@example.com', TIMESTAMP '2024-01-17 09:15:00');
-- 全表查询
SELECT * FROM oss_tables.my_namespace.users;
-- 条件查询
SELECT * FROM oss_tables.my_namespace.users WHERE id = 2;
-- 聚合查询
SELECT COUNT(*) AS total FROM oss_tables.my_namespace.users;
-- 分组聚合
SELECT name, COUNT(*) AS cnt FROM oss_tables.my_namespace.users GROUP BY name;
-- 更新数据
UPDATE oss_tables.my_namespace.users SET name = '赵六' WHERE id = 3;
-- 删除数据
DELETE FROM oss_tables.my_namespace.users WHERE id = 1;
-- 查询验证
SELECT * FROM oss_tables.my_namespace.users ORDER BY id;分区表操作
-- 插入分区数据
INSERT INTO oss_tables.my_namespace.events VALUES
(1, 'click', '{"page": "home"}', TIMESTAMP '2024-01-15 10:30:00'),
(2, 'view', '{"page": "product"}', TIMESTAMP '2024-01-15 11:00:00'),
(3, 'click', '{"page": "detail"}', TIMESTAMP '2024-01-16 09:00:00');
-- 分区裁剪查询(仅扫描匹配分区)
SELECT * FROM oss_tables.my_namespace.events
WHERE ts >= TIMESTAMP '2024-01-15 00:00:00'
AND ts < TIMESTAMP '2024-01-16 00:00:00';
-- 聚合统计
SELECT event_type, COUNT(*) AS cnt
FROM oss_tables.my_namespace.events
GROUP BY event_type;时间旅行查询
Iceberg 支持时间旅行(Time Travel)查询,可读取历史某个时间点的数据快照。
-- 查看快照历史
SELECT snapshot_id, committed_at, operation
FROM oss_tables.my_namespace.users.snapshots;
-- 基于快照ID查询历史数据
SELECT * FROM oss_tables.my_namespace.users
VERSION AS OF <snapshot_id>;
-- 查询指定时间点的数据
SELECT * FROM oss_tables.my_namespace.users
TIMESTAMP AS OF TIMESTAMP '2024-01-16 00:00:00';
-- 查看数据文件分布
SELECT * FROM oss_tables.my_namespace.users.files;注意事项
版本要求:推荐使用Spark 3.5+和Iceberg 1.10.1。Spark版本与iceberg-spark-runtime JAR包的版本号需要匹配(如Spark 3.5对应
iceberg-spark-runtime-3.5_2.12)。凭证配置:S3FileIO 与 REST Catalog sigv4 签名使用同一对 AccessKey ID 和 AccessKey Secret。推荐通过以下环境变量统一配置(同时覆盖 REST Catalog 签名和 S3FileIO 数据访问):
AWS_ACCESS_KEY_ID:阿里云AccessKey IDAWS_SECRET_ACCESS_KEY:阿里云AccessKey SecretAWS_REGION:Table Bucket所在地域,用于REST Catalog签名。设置此环境变量后,Catalog配置中可省略rest.signing-region。
表格式:OSS Tables目前仅支持Iceberg格式。建表时必须指定
USING iceberg。数据维护:OSS Tables内置文件合并、快照清理和未引用文件清理功能,无需在Spark中手动执行Iceberg Maintenance Procedures,详情请参见数据维护。
OSS Tables Endpoint 支持:
内网:
https://{tableBucketName}.osstables-{region}-internal.aliyuncs.com/iceberg外网:
https://{tableBucketName}.osstables-{region}.aliyuncs.com/iceberg