使用OSS Foreign Table访问Iceberg数据

更新时间:

本文介绍如何在云原生数据仓库 AnalyticDB PostgreSQL 版中,通过OSS FDW(Foreign Data Wrapper)访问Apache Iceberg数据。

背景信息

Apache Iceberg是一种开放的表格式,用于超大规模的分析数据集。它提供了版本控制、Schema演进、分区演进和高效查询等特性。通过AnalyticDB for PostgreSQL提供的Iceberg支持,您可以:

  • 直接访问Iceberg表中的数据,无需导入。

  • 利用AnalyticDB for PostgreSQLMPP架构来加速分析Iceberg数据。

  • 将本地表和Iceberg表进行关联查询、分析等。

  • 与支持Iceberg的其他数据系统(如开源大数据组件Hive、Spark、StarRocks以及云上EMR等)之间实现无缝数据共享。

版本限制

内核版本为7.3.0.0及以上的AnalyticDB for PostgreSQL7.0版实例。

说明

您可以在控制台实例的基本信息页查看内核小版本。如不满足上述版本要求,需要您升级内核小版本

前提条件

  • 使用阿里云账号或具备AliyunGPDBFullAccessAliyunECSFullAccessAliyunEMRFullAccessAliyunOSSFullAccess权限的RAM用户,并为其创建AccessKey IDAccessKey Secret

  • 云原生数据仓库 AnalyticDB PostgreSQL 版实例和OSS存储空间Bucket位于同一地域。

  • EMR集群AnalyticDB for PostgreSQL实例位于同一VPC。

    说明

    除了EMR集群外,您也可以使用其他工具(如Flink、StarRocks、Doris等)写入Iceberg数据。

  • Iceberg表和数据已存在于OSS Bucket。

使用限制

  • 确保AnalyticDB for PostgreSQL实例能够访问Iceberg的存储系统和元数据服务。AnalyticDB for PostgreSQL支持以下存储系统和元数据服务。

    • 存储系统:支持阿里云对象存储OSS。暂不支持OSS-HDFS存储和外部HDFS存储。

    • 元数据服务:Hive Metastore(以下简称HMS)以及基于目录的Hadoop Catalog,其中HMS的连接地址应该为同一地域的内网地址,确保AnalyticDB for PostgreSQL的协调节点与HMS之间的网络畅通。

  • 当前仅支持Iceberg表的查询,暂不支持INSERTUPDATEDELETE等修改操作。

  • 仅支持Iceberg表底层文件格式为ORCParquet。

步骤一:创建OSS Server

使用以下SQL创建OSS Server,指定需要访问的OSS服务端信息。

语法

CREATE SERVER <server_name>
    FOREIGN DATA WRAPPER <fdw_name>
    OPTIONS (
        endpoint '<endpoint_name>',  -- OSS区域节点,请替换为实际使用的区域
        bucket '<bucket_name>',      -- OSS Bucket名称
        catalog_type '<catalog_name>', -- Iceberg catalog类型
        CatalogParams
    );

参数说明

参数

类型

是否必填

说明

server_name

字符串

OSS Server的名称。

fdw_name

字符串

管理服务器的外部数据容器的名称,固定为oss_fdw。

OPTIONS参数选项请参见下表。

参数名称

参数类型

是否必填

描述

endpoint

字符串

访问域名,即访问OSS的入口。云原生数据仓库 AnalyticDB PostgreSQL 版仅支持配置内网域名。详情请参见OSS地域和访问域名公共云小节。

bucket

字符串

数据文件所属的Bucket的名称,获取方法请参见准备工作

说明
  • OSS ServerOSS FDW中必须有一个设置了Bucket。关于OSS FDWBucket相关信息,请参见创建OSS FDW

  • 如果OSS ServerOSS FDW都设置了Bucket,则OSS FDW中的Bucket生效。

catalog_type

字符串

Iceberg Catalog的类型,取值如下:

  • hive:用于访问Hive Catalog。

  • hadoop:用于访问Hadoop Catalog。

  • onemeta:用于访问DMS OneMeta Catalog。

重要

推荐您使用hivehadoop。如需使用onemeta,请提交工单联系技术支持将Iceberg数据写入DMS OneMeta。

CatalogParams

hms_uris

字符串

Hive MetaStoreURL,用于访问Hive Catalog。格式为thrift://<metastore-host>:<port>

  • metastore-hostHive Metastore服务的主机名或内网IP地址。

  • portHive Metastore服务的端口号,默认为9083。

重要
  • 配置该参数前,请确保AnalyticDB for PostgreSQL实例的master节点可以访问HMS,例如已为EMR集群master节点所属的ECS安全组添加9083端口的入方向规则。

  • 如果您的EMR集群中Hive MetaStore开启了高可用模式,此处可以填写多个HMS地址并用逗号分隔。例如:"thrift://<HMS IP地址1>:<HMS端口号1>,thrift://<HMS IP地址2>:<HMS 端口号2>,thrift://<HMS IP地址3>:<HMS端口号3>"

warehouse

字符串

Iceberg数据仓库的OSS路径,用于访问Hadoop Catalog。

dms_endpoint

字符串

阿里云DMS Endpoint,用于访问DMS OneMeta。您可根据服务接入点获取Endpoint与地域的对应关系。

dms_region

字符串

阿里云DMS地域ID,用于访问DMS OneMeta。在DMS控制台点击对应Catalog实例地区对应的地域ID

更多详情请参见使用OSS Foreign Table进行数据湖分析

步骤二:创建OSS User Mapping

创建OSS Server后,您还需要创建一个访问OSS Server的用户。您可以使用CREATE USER MAPPING语句创建OSS User Mapping,用于定义AnalyticDB PostgreSQL数据库用户与访问OSS Server用户的映射关系。更多介绍,请参见CREATE USER MAPPING

语法

CREATE USER MAPPING FOR {username | USER | CURRENT_USER | PUBLIC}
    SERVER <server_name>
    OPTIONS (
        id '<AccessKey ID>',               -- 访问OSS的AccessKey ID
        key '<AccessKey Secret>'           -- 访问OSS的AccessKey Secret
        dms_id '<AccessKey ID>',           -- 访问DMS的AccessKey ID
        dms_key '<AccessKey Secret>'       -- 访问DMS的AccessKey Secret
    );

参数说明

参数

类型

是否必填

说明

username

字符串

是,四选一

指定映射的AnalyticDB PostgreSQL实例的用户名。

USER

字符串

映射当前的AnalyticDB PostgreSQL实例的用户名。

CURRENT_USER

字符串

PUBLIC

字符串

匹配所有AnalyticDB PostgreSQL实例的用户名,包括以后创建的用户。

server_name

字符串

OSS Server的名称。

OPTIONS参数选项请参见下表。

参数

类型

是否必填

说明

id

字符串

AccessKey ID,获取方法,请参见创建AccessKey

key

字符串

AccessKey Secret,获取方法,请参见创建AccessKey

dms_id

字符串

用于访问DMS OneMetaAccessKey ID,可与本表格参数id共用同一个AccessKey ID。仅在需要访问DMS OneMeta时配置该参数。

dms_key

字符串

用于访问DMS OneMetaAccessKey Secret,可与本表格参数key共用同一个AccessKey Key。仅在需要访问DMS OneMeta时配置该参数。

步骤三:创建OSS FDW

拥有OSS Server和访问OSS Server的用户后,您可以开始创建OSS FDW。您可以使用CREATE FOREIGN TABLE语句创建OSS FDW,映射到Iceberg表。

重要

Iceberg的基本数据类型会自动映射到AnalyticDB for PostgreSQL的数据类型,如BIGINT映射到bigint,STRING映射到text等。为了创建外表时确保类型兼容,您可查看OSS Foreign Table数据类型对照表获取更多映射类型详情。

语法

CREATE FOREIGN TABLE <table_name> (
    <column_name> <data_type>
    [,...]
  )
    SERVER <server_name>
    OPTIONS (
        format 'iceberg',                 -- 指定格式为Iceberg
        dms_catalog_name '<dms-catalog-name>', --DMS Onemeta Catalog 名称
        database_name '<database_name>',  -- Iceberg数据库名
        table_name '<iceberg_table_name>' -- Iceberg表名    
);

参数说明

参数名称

类型

是否必填

说明

table_name

字符串

OSS FDW名称。

column_name

字符串

列名。

data_type

字符串

该列的数据类型。

server_name

字符串

OSS Server的名称。

OPTIONS参数选项请参见下表。

参数

类型

是否必填

说明

format

字符串

此处设置为iceberg,表示使用Iceberg格式。

dms_catalog_name

字符串

DMS Onemeta Catalog名称,仅在需要访问DMS OneMeta时配置该参数。您可以在DMS控制台点击对应Catalog实例查看。

database_name

字符串

目标Iceberg表所在的database名称,OSS上的完整路径为oss://<BucketName>/<path-to-warehouse>/<database_name>

table_name

字符串

目标Iceberg表的名称,OSS上的完整路径为oss://<BucketName>/<path-to-warehouse>/<database_name>/<table_name>

步骤四:访问Iceberg数据

创建OSS Foreign Table后,您可以像查询普通表一样查询Iceberg数据。

SELECT * FROM <iceberg_table_name>;
SELECT COUNT(*) FROM <iceberg_table_name>;

使用示例

本章节演示如何在云原生数据仓库 AnalyticDB PostgreSQL 版中访问Iceberg数据。

示例一:访问EMRIceberg数据(Hadoop Catalog)

步骤一:登录EMR实例

使用SSH方式登录EMR集群

步骤二:登录Spark SQL

执行以下命令,通过Spark SQL读写Iceberg配置。

spark-sql       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
                --conf spark.sql.catalog.hadoop=org.apache.iceberg.spark.SparkCatalog \
                --conf spark.sql.catalog.hadoop.type=hadoop \
                --conf spark.sql.catalog.hadoop.warehouse=oss://testBucketName/warehouse
说明

请将--conf spark.sql.catalog.hadoop.warehouse=oss://testBucketName/warehouse中的OSS路径替换为实际OSS路径。

当出现如下信息时,表示已进入spark-sql命令行。

spark-sql>

步骤三:创建Iceberg表并写入数据

创建hadoop数据库。

CREATE DATABASE IF NOT EXISTS hadoop.hadoop_db;

创建测试表并写入数据。

-- 创建测试表
CREATE TABLE IF NOT EXISTS hadoop.hadoop_db.hadoop_sample(
    id BIGINT COMMENT 'unique id', 
    data STRING
) 
    USING iceberg;
-- 写入数据
INSERT INTO hadoop.hadoop_db.hadoop_sample VALUES (1, 'a'), (2, 'b'), (3, 'c');

查询数据。

SELECT * FROM hadoop.hadoop_db.hadoop_sample;

步骤四:验证OSS文件结构

登录OSS控制台,在目标Bucket中确认Iceberg元数据及数据文件结构如下。

oss://testBucketName/warehouse/hadoop_db/hadoop_sample/  
├── metadata/
│   ├── metadata.json  
│   ├── snapshots.avro  
│   └── manifests.avro
└── data/  
    └── parquet files...

步骤五:创建Iceberg外表

根据EMR创建的Iceberg表结构,在AnalyticDB for PostgreSQL执行以下SQL创建Iceberg外表。

-- 创建OSS SERVER
CREATE SERVER oss_hadoop_srv
    FOREIGN DATA WRAPPER oss_fdw
    OPTIONS (
        endpoint 'oss-cn-hangzhou-********.aliyuncs.com', 
        bucket 'testBucketName',
        catalog_type 'hadoop',
        warehouse 'oss://testBucketName/warehouse'
  );
-- 创建OSS User Mapping
CREATE USER MAPPING FOR PUBLIC
    SERVER oss_hadoop_srv
    OPTIONS (
        id 'LTAI****************',           -- 访问OSS的AccessKey ID
        key 'yourAccessKeySecret'       -- 访问OSS的AccessKey Secret
  );
-- 创建OSS FDW
CREATE FOREIGN TABLE sample (
    id BIGINT,
    data text
  )
    SERVER oss_hadoop_srv 
    OPTIONS (
        format 'iceberg',
        database_name 'hadoop_db',
        table_name 'hadoop_sample'
  );

步骤六:执行联邦查询

使用如下SQL查询数据。

SELECT * FROM sample;

示例二:访问EMRIceberg数据(Hive Catalog)

步骤一:登录EMR实例

使用SSH方式登录EMR集群

步骤二:登录Spark SQL

执行以下命令,通过Spark SQL读写Iceberg配置。

spark-sql       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
                --conf spark.sql.catalog.hive=org.apache.iceberg.spark.SparkCatalog \
                --conf spark.sql.catalog.hive.type=hive

当出现如下信息时,表示已进入spark-sql命令行。

spark-sql>

步骤三:创建Iceberg表并写入数据

创建hive数据库。

CREATE DATABASE IF NOT EXISTS hive.hive_db
    COMMENT '数据库存储在OSS上'
    LOCATION 'oss://testBucketName/warehouse/hive_db.db/';

创建测试表并写入数据。

-- 创建测试表
CREATE TABLE IF NOT EXISTS hive.hive_db.hive_sample(
    id BIGINT COMMENT 'unique id', 
    data STRING
) 
    USING iceberg;
-- 写入数据
INSERT INTO hive.hive_db.hive_sample VALUES (1, 'a'), (2, 'b'), (3, 'c');

查询数据。

SELECT * FROM hive.hive_db.hive_sample;

步骤四:验证OSS存储结构

登录OSS控制台,在目标Bucket中确认Iceberg元数据及数据文件结构如下。

oss://testBucketName/warehouse/hive_db.db/hive_sample/ 
├── metadata/
│   ├── metadata.json  
│   ├── snapshots.avro  
│   └── manifests.avro
└── data/  
    └── parquet files...

步骤五:创建Iceberg外表

根据EMR创建的Iceberg表结构,在AnalyticDB for PostgreSQL执行以下SQL创建Iceberg外表。

-- 创建OSS SERVER
CREATE SERVER oss_hive_srv
    FOREIGN DATA WRAPPER oss_fdw
    OPTIONS (
        endpoint 'oss-cn-hangzhou-********.aliyuncs.com', 
        bucket 'testBucketName',
        catalog_type 'hive',
        hms_uris 'thrift://192.168.XXX.XXX:9083'
);
-- 创建OSS User Mapping
CREATE USER MAPPING FOR PUBLIC
    SERVER oss_hive_srv
    OPTIONS (
        id 'LTAI****************',           -- 访问OSS的AccessKey ID
        key 'yourAccessKeySecret'       -- 访问OSS的AccessKey Secret
    );
-- 创建OSS FDW
CREATE FOREIGN TABLE hive_sample(
    id BIGINT,
    data text
)
    SERVER oss_hive_srv OPTIONS (
        format 'iceberg',
        database_name 'hive_db',
        table_name 'hive_sample'
);

步骤六:执行联邦查询

使用如下SQL查询数据。

SELECT * FROM hive_sample;

示例三:访问DMS OneMetaIceberg数据

重要

如需使用DMS OneMeta Catalog,请提交工单联系技术支持将Iceberg数据写入DMS OneMeta。

步骤一:创建Iceberg外表

根据DMS OneMeta中创建的Iceberg表结构,在AnalyticDB for PostgreSQL执行以下SQL创建Iceberg外表。

-- 创建SERVER
CREATE SERVER oss_dms_serv
    FOREIGN DATA WRAPPER oss_fdw
    OPTIONS (
        endpoint 'oss-cn-hangzhou-********.aliyuncs.com',
        bucket 'testBucketName',
        catalog_type 'onemeta',
        dms_endpoint '<dms_endpoint_name>',
        dms_region '<dms_region_id>'
  );
-- 创建OSS User Mapping
CREATE USER MAPPING FOR PUBLIC
    SERVER oss_dms_serv
    OPTIONS ( 
        id 'LTAI****************',           -- 访问OSS的AccessKey ID
        key 'yourAccessKeySecret',       -- 访问OSS的AccessKey Secret
        dms_id 'LTAI****************',       -- 访问DMS的AccessKey ID
        dms_key 'yourAccessKeySecret'   -- 访问DMS的AccessKey Secret
    );
-- 创建外表
CREATE FOREIGN TABLE sample(
    id BIGINT,
    data text
)
    SERVER oss_dms_serv OPTIONS (
        format 'iceberg',
        dms_catalog_name '<dms-catalog-name>',
        database_name '<dms-database-name>',
        table_name '<dms-table-name>'
);

步骤二:执行联邦查询

使用如下SQL查询数据。

SELECT * FROM sample;

常见问题

Iceberg表中的数据类型如何映射到AnalyticDB for PostgreSQL

Iceberg的基本数据类型会自动映射到AnalyticDB for PostgreSQL的数据类型,如BIGINT映射到bigint,STRING映射到text等。为了创建外表时确保类型兼容,您可查看OSS Foreign Table数据类型对照表获取更多映射类型详情。

能否对Iceberg外表进行写入操作?

不能。AnalyticDB for PostgreSQL仅支持查询Iceberg外表。

如果Iceberg表所在Schema变更怎么办?

Iceberg表所在的Schema发生变更时,需要重新创建AnalyticDB for PostgreSQL中的外表DDL以反映最新的表结构。