通过Spark访问OSS Tables

更新时间:
复制为 MD 格式

OSS Tables 兼容 Apache Iceberg REST Catalog 标准协议,无需搭建 Hive Metastore 等外部 Catalog 服务。Spark等计算引擎通过Iceberg REST Catalog直接对接OSS Tables,使用标准SQL进行建表、查询、写入等操作。

前提条件

  • 已安装Spark 3.5及以上版本,且Java版本为11或以上。

  • 已创建OSS TablesTable Bucket。如未创建,请参见OSS Tables

步骤一:环境准备

下载依赖JAR

将以下JAR包放入$SPARK_HOME/jars目录,或在启动时通过--jars参数指定。

JAR

说明

iceberg-spark-runtime-3.5_2.12-1.10.1.jar

Iceberg Spark Runtime包,提供IcebergSpark集成能力。请根据Spark版本选择对应的包(如Spark 3.5对应iceberg-spark-runtime-3.5_2.12)。

iceberg-aws-bundle-1.10.1.jar

Iceberg AWS集成包,提供S3FileIO实现及REST Catalog sigv4签名认证所需的AWS SDK。版本需与Runtime包一致。

配置环境变量

Iceberg REST Catalog 使用 sigv4 签名认证,S3FileIO 访问数据面也需要凭证。推荐通过环境变量统一传递,在启动 Spark 之前设置以下环境变量:

说明

环境变量名使用 AWS_ 前缀,是因为 Iceberg 的 sigv4 签名模块和 S3FileIO 复用 AWS SDK 的标准凭证链。实际填入的是您阿里云账号的 AccessKey ID 和 AccessKey Secret。

export AWS_ACCESS_KEY_ID=<阿里云AccessKey ID>
export AWS_SECRET_ACCESS_KEY=<阿里云AccessKey Secret>
export AWS_REGION=<地域,例如cn-hangzhou>
# 可选,使用STS临时凭证时配置
export AWS_SESSION_TOKEN=<阿里云STS TOKEN>

配置Spark 配置项

除了通过环境变量传递凭证外,也可以在 PySpark 或 Spark SQL 中通过 Spark Catalog 相关配置项显式传递凭证。该方式更适用于多 Catalog 场景,或不便设置环境变量的情况。

# S3FileIO数据面凭证
spark.sql.catalog.oss_tables.s3.access-key-id=<阿里云AccessKey ID>
spark.sql.catalog.oss_tables.s3.secret-access-key=<阿里云AccessKey Secret>
spark.sql.catalog.oss_tables.client.region=<地域,例如cn-hangzhou>
# 可选,使用STS临时凭证时配置
spark.sql.catalog.oss_tables.s3.session-token=<阿里云STS TOKEN>

# REST Catalog签名凭证
spark.sql.catalog.oss_tables.rest.access-key-id=<阿里云AccessKey ID>
spark.sql.catalog.oss_tables.rest.secret-access-key=<阿里云AccessKey Secret>
spark.sql.catalog.oss_tables.rest.signing-region=<地域,例如cn-hangzhou>
# 可选,使用STS临时凭证时配置
spark.sql.catalog.oss_tables.rest.session-token=<阿里云STS TOKEN>

步骤二:配置Spark连接

  • OSS Tables提供Iceberg REST Catalog端点,Spark通过该端点管理表元数据。Endpoint格式如下:

    • 内网https://{tableBucketName}.osstables-{region}-internal.aliyuncs.com/iceberg

    • 外网https://{tableBucketName}.osstables-{region}.aliyuncs.com/iceberg

  • OSS Tables 提供S3FileIO访问OSS数据面使用的访问端点,Spark 通过该端点访问表数据。Endpoint格式如下:

    • 内网https://oss-{region}-internal.aliyuncs.com

    • 外网https://oss-{region}.aliyuncs.com

说明

io-impl 请勿配置为 org.apache.iceberg.hadoop.HadoopFileIO

Iceberg 的设计理念是去 List 化,OSS Table Bucket 作为面向 Iceberg 深度优化的存储类型,禁止 List 操作。HadoopFileIO 基于对象存储的文件语义实现,为兼容通用场景会触发 List 操作,因此不适用于 OSS Table Bucket 的数据访问。

通过PySpark启动

以下示例以 Table Bucket 名称 my-data-lake、地域 cn-hangzhou 为例。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("OSS Tables Demo") \
    .config("spark.jars", "/path/to/iceberg-spark-runtime-3.5_2.12-1.10.1.jar,"
            "/path/to/iceberg-aws-bundle-1.10.1.jar") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.oss_tables", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.oss_tables.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
    .config("spark.sql.catalog.oss_tables.uri", "https://my-data-lake.osstables-cn-hangzhou-internal.aliyuncs.com/iceberg") \
    .config("spark.sql.catalog.oss_tables.warehouse", "my-data-lake") \
    .config("spark.sql.catalog.oss_tables.rest.sigv4-enabled", "true") \
    .config("spark.sql.catalog.oss_tables.rest.signing-region", "cn-hangzhou") \
    .config("spark.sql.catalog.oss_tables.rest.signing-name", "osstables") \
    .config("spark.sql.catalog.oss_tables.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.oss_tables.s3.endpoint", "https://oss-cn-hangzhou-internal.aliyuncs.com") \
    .getOrCreate()

通过spark-sql启动

spark-sql \
  --jars /path/to/iceberg-spark-runtime-3.5_2.12-1.10.1.jar,/path/to/iceberg-aws-bundle-1.10.1.jar \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.catalog.oss_tables=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.oss_tables.catalog-impl=org.apache.iceberg.rest.RESTCatalog \
  --conf spark.sql.catalog.oss_tables.uri=https://my-data-lake.osstables-cn-hangzhou-internal.aliyuncs.com/iceberg \
  --conf spark.sql.catalog.oss_tables.warehouse=my-data-lake \
  --conf spark.sql.catalog.oss_tables.rest.sigv4-enabled=true \
  --conf spark.sql.catalog.oss_tables.rest.signing-region=cn-hangzhou \
  --conf spark.sql.catalog.oss_tables.rest.signing-name=osstables \
  --conf spark.sql.catalog.oss_tables.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
  --conf spark.sql.catalog.oss_tables.s3.endpoint=https://oss-cn-hangzhou-internal.aliyuncs.com

配置参数说明

参数

是否必填

说明

catalog-impl

固定为org.apache.iceberg.rest.RESTCatalog,指定使用REST Catalog。

uri

REST Catalog端点URL。格式:

  • 内网https://{tableBucketName}.osstables-{region}-internal.aliyuncs.com/iceberg

  • 外网https://{tableBucketName}.osstables-{region}.aliyuncs.com/iceberg

warehouse

Table Bucket名称。

rest.sigv4-enabled

固定为true,启用sigv4签名认证。

rest.signing-region

否(环境变量配置凭证时)

REST Catalog签名地域,需与Table Bucket所在地域一致,例如cn-hangzhou。使用AWS_REGION环境变量配置凭证时可省略;通过Spark配置项传递凭证时必填。

rest.signing-name

签名服务名,固定为osstables

io-impl

指定Iceberg读写底层数据文件的FileIO实现,固定为org.apache.iceberg.aws.s3.S3FileIO

s3.endpoint

S3FileIO访问OSS数据面使用的Endpoint,必须包含https://前缀。格式:

  • 内网https://oss-{region}-internal.aliyuncs.com

  • 外网https://oss-{region}.aliyuncs.com

步骤三:使用SQL操作数据

连接成功后,使用标准SQL语句进行数据操作。

管理Namespace

Namespace(命名空间)用于对表进行逻辑分组,作用相当于数据库。

-- 查看现有Namespace
SHOW NAMESPACES IN oss_tables;

-- 创建Namespace
CREATE NAMESPACE oss_tables.my_namespace;

-- 删除Namespace(需先删除其中所有Table)
DROP NAMESPACE oss_tables.my_namespace;

建表与表管理

-- 创建非分区表
CREATE TABLE oss_tables.my_namespace.users (
    id BIGINT NOT NULL COMMENT '用户ID',
    name STRING COMMENT '用户名',
    email STRING COMMENT '邮箱',
    created_at TIMESTAMP COMMENT '创建时间'
) USING iceberg;

-- 创建分区表(按天分区)
CREATE TABLE oss_tables.my_namespace.events (
    id BIGINT NOT NULL,
    event_type STRING,
    data STRING,
    ts TIMESTAMP
) USING iceberg
PARTITIONED BY (days(ts));

-- 查看Namespace中的所有Table
SHOW TABLES IN oss_tables.my_namespace;

-- 查看表结构
DESCRIBE TABLE oss_tables.my_namespace.users;

-- 删除表(OSS Tables 要求必须带 PURGE 关键字,否则会报错:OSS Tables only supports dropping tables with purge enabled)
DROP TABLE oss_tables.my_namespace.users PURGE;

数据写入与查询

-- 插入数据
INSERT INTO oss_tables.my_namespace.users VALUES
    (1, '张三', 'zhangsan@example.com', TIMESTAMP '2024-01-15 10:30:00'),
    (2, '李四', 'lisi@example.com', TIMESTAMP '2024-01-16 14:20:00'),
    (3, '王五', 'wangwu@example.com', TIMESTAMP '2024-01-17 09:15:00');

-- 全表查询
SELECT * FROM oss_tables.my_namespace.users;

-- 条件查询
SELECT * FROM oss_tables.my_namespace.users WHERE id = 2;

-- 聚合查询
SELECT COUNT(*) AS total FROM oss_tables.my_namespace.users;

-- 分组聚合
SELECT name, COUNT(*) AS cnt FROM oss_tables.my_namespace.users GROUP BY name;

-- 更新数据
UPDATE oss_tables.my_namespace.users SET name = '赵六' WHERE id = 3;

-- 删除数据
DELETE FROM oss_tables.my_namespace.users WHERE id = 1;

-- 查询验证
SELECT * FROM oss_tables.my_namespace.users ORDER BY id;

分区表操作

-- 插入分区数据
INSERT INTO oss_tables.my_namespace.events VALUES
    (1, 'click', '{"page": "home"}', TIMESTAMP '2024-01-15 10:30:00'),
    (2, 'view', '{"page": "product"}', TIMESTAMP '2024-01-15 11:00:00'),
    (3, 'click', '{"page": "detail"}', TIMESTAMP '2024-01-16 09:00:00');

-- 分区裁剪查询(仅扫描匹配分区)
SELECT * FROM oss_tables.my_namespace.events
WHERE ts >= TIMESTAMP '2024-01-15 00:00:00'
  AND ts < TIMESTAMP '2024-01-16 00:00:00';

-- 聚合统计
SELECT event_type, COUNT(*) AS cnt
FROM oss_tables.my_namespace.events
GROUP BY event_type;

时间旅行查询

Iceberg 支持时间旅行(Time Travel)查询,可读取历史某个时间点的数据快照。

-- 查看快照历史
SELECT snapshot_id, committed_at, operation
FROM oss_tables.my_namespace.users.snapshots;

-- 基于快照ID查询历史数据
SELECT * FROM oss_tables.my_namespace.users
VERSION AS OF <snapshot_id>;

-- 查询指定时间点的数据
SELECT * FROM oss_tables.my_namespace.users
TIMESTAMP AS OF TIMESTAMP '2024-01-16 00:00:00';

-- 查看数据文件分布
SELECT * FROM oss_tables.my_namespace.users.files;

注意事项

  • 版本要求:推荐使用Spark 3.5+和Iceberg 1.10.1。Spark版本与iceberg-spark-runtime JAR包的版本号需要匹配(如Spark 3.5对应iceberg-spark-runtime-3.5_2.12)。

  • 凭证配置:S3FileIO 与 REST Catalog sigv4 签名使用同一对 AccessKey ID 和 AccessKey Secret。推荐通过以下环境变量统一配置(同时覆盖 REST Catalog 签名和 S3FileIO 数据访问):

    • AWS_ACCESS_KEY_ID:阿里云AccessKey ID

    • AWS_SECRET_ACCESS_KEY:阿里云AccessKey Secret

    • AWS_REGION:Table Bucket所在地域,用于REST Catalog签名。设置此环境变量后,Catalog配置中可省略rest.signing-region

  • 表格式:OSS Tables目前仅支持Iceberg格式。建表时必须指定USING iceberg

  • 数据维护:OSS Tables内置文件合并、快照清理和未引用文件清理功能,无需在Spark中手动执行Iceberg Maintenance Procedures,详情请参见数据维护

  • OSS Tables Endpoint 支持

    • 内网https://{tableBucketName}.osstables-{region}-internal.aliyuncs.com/iceberg

    • 外网https://{tableBucketName}.osstables-{region}.aliyuncs.com/iceberg