Iceberg

更新时间:2025-04-14 05:39:18

本文介绍如何使用Iceberg连接器。

背景信息

Apache Iceberg是一种开放的数据湖表格格式。您可以借助Apache Iceberg快速地在HDFS或者云端OSS上构建自己的数据湖存储服务,并借助开源大数据生态的Flink、Spark、Hive、Presto等计算引擎来实现数据湖的分析。

类别

详情

类别

详情

支持类型

源表和结果表

运行模式

批模式和流模式

数据格式

暂不适用

特有监控指标

暂无

API种类

SQL

是否支持更新或删除结果表数据

特色功能

目前Apache Iceberg提供以下核心能力:

  • 基于HDFS或者对象存储构建低成本的轻量级数据湖存储服务。

  • 完善的ACID语义。

  • 支持历史版本回溯。

  • 支持高效的数据过滤。

  • 支持Schema Evolution。

  • 支持Partition Evolution。

  • 支持和自建Hive Metastore配合使用,详情请参见使用Hive Catalog,配合自建Hive Metastore(HMS)使用。

说明

您可以借助Flink高效的容错能力和流处理能力,把海量的日志行为数据实时导入到Apache Iceberg数据湖内,再借助Flink或者其他分析引擎来实现数据价值的提取。

使用限制

  • Flink计算引擎VVR 4.0.8及以上版本支持Iceberg连接器。Iceberg连接器需要搭配DLF Catalog一起使用,详情请参见管理DLF Catalog

  • Iceberg连接器支持Apache Iceberg v1v2表格式,详情请参见Iceberg Table Spec

    说明

    仅实时计算引擎VVR 8.0.7及以上版本支持v2表格式。

  • 流读模式下,仅支持将Append OnlyIceberg表作为源表。

语法结构

CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);

WITH参数

通用(源表)

参数

说明

数据类型

是否必填

默认值

备注

参数

说明

数据类型

是否必填

默认值

备注

connector

源表类型

String

固定值为iceberg

catalog-name

Catalog名称

String

请填写为自定义的英文名。

catalog-database

数据库名称

String

default

对应在DLF上创建的数据库名称,例如dlf_db。

说明

如果您没有创建对应的DLF数据库,请创建DLF数据库,详情请参见数据库表及函数

io-impl

分布式文件系统的实现类名

String

固定值为org.apache.iceberg.aliyun.oss.OSSFileIO

oss.endpoint

阿里云对象存储服务OSSEndpoint

String

请详情参见OSS地域和访问域名

说明
  • 推荐您为oss.endpoint参数配置OSSVPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则oss.endpoint需要配置为oss-cn-hangzhou-internal.aliyuncs.com。

  • 如果您需要跨VPC访问OSS,则请参见如何访问跨VPC的其他服务?

  • access.key.id:VVR 8.0.6及以下版本

  • access-key-id:VVR 8.0.7及以上版本

阿里云账号的AccessKey ID

String

详情请参见如何查看AccessKey IDAccessKey Secret信息?

重要

为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量

  • access.key.secret:VVR 8.0.6及以下版本

  • access-key-secret:VVR 8.0.7及以上版本

阿里云账号的AccessKey Secret

String

catalog-impl

CatalogClass类名

String

固定值为org.apache.iceberg.aliyun.dlf.DlfCatalog

warehouse

表数据存放在OSS的路径

String

无。

dlf.catalog-id

阿里云账号的账号ID

String

可通过用户信息页面获取账号ID。

dlf.endpoint

DLF服务的Endpoint

String

详情请参见已开通的地域和访问域名

说明
  • 推荐您为dlf.endpoint参数配置DLFVPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com

  • 如果您需要跨VPC访问DLF,则请参见空间管理与操作

dlf.region-id

DLF服务的地域名

String

详情请参见已开通的地域和访问域名

说明

请和dlf.endpoint选择的地域保持一致。

uri

Hive metastorethrift URI

String

仅当使用Hive Catalog时,必填。

配合自建Hive Metastore使用。

结果表独有

参数

说明

数据类型

是否必填

默认值

备注

参数

说明

数据类型

是否必填

默认值

备注

write.operation

写入操作模式

String

upsert

  • upsert(默认):数据更新。

  • insert:数据追加写入。

  • bulk_insert:批量写入(不更新)。

hive_sync.enable

是否开启同步元数据到Hive功能

boolean

false

参数取值如下:

  • true:开启

  • false(默认值):不开启。

hive_sync.mode

Hive数据同步模式

String

hms

  • hms(默认值):采用Hive Metastore或者DLF Catalog时,需要设置hms。

  • jdbc:采用jdbc Catalog时,需要设置为jdbc。

hive_sync.db

同步到Hive的数据库名称

String

当前TableCatalog中的数据库名

无。

hive_sync.table

同步到Hive的表名称

String

当前Table

无。

dlf.catalog.region

DLF服务的地域名

String

详情请参见已开通的地域和访问域名

说明
  • 仅当hive_sync.mode设置为hms时,dlf.catalog.region参数设置才生效。

  • 请和dlf.catalog.endpoint选择的地域保持一致。

dlf.catalog.endpoint

DLF服务的Endpoint

String

详情请参见已开通的地域和访问域名

说明
  • 仅当hive_sync.mode设置为hms时,dlf.catalog.endpoint参数设置才生效。

  • 推荐您为dlf.catalog.endpoint参数配置DLFVPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.catalog.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com

  • 如果您需要跨VPC访问DLF,则请参见空间管理与操作

类型映射

Iceberg字段类型

Flink字段类型

Iceberg字段类型

Flink字段类型

BOOLEAN

BOOLEAN

INT

INT

LONG

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(P,S)

DECIMAL(P,S)

DATE

DATE

TIME

TIME

说明

Iceberg时间戳精度为微秒,Flink时间戳精度为毫秒。在使用Flink读取Iceberg数据时,时间精度会对齐到毫秒。

TIMESTAMP

TIMESTAMP

TIMESTAMPTZ

TIMESTAMP_LTZ

STRING

STRING

FIXED(L)

BYTES

BINARY

VARBINARY

STRUCT<...>

ROW

LIST<E>

LIST

MAP<K,V>

MAP

代码示例

请确认您已创建了OSS BucketDLF数据库。详情请参见控制台创建存储空间数据库表及函数

说明

在创建DLF数据库选择路径时,建议按照${warehouse}/${database_name}.db格式填写。例如,如果warehouse地址为oss://iceberg-test/warehouse,数据库的名称为dlf_db,则dlf_dbOSS路径需要设置为oss://iceberg-test/warehouse/dlf_db.db

结果表示例

本示例为您介绍如何通过Datagen连接器随机生成流式数据写入Iceberg表。

CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

INSERT INTO dlf_iceberg SELECT * FROM datagen;

源表示例

  • 使用Hive Catalog,配合自建Hive Metastore(HMS)使用。

    您需要保证FlinkHMS集群网络互通,数据将存储在oss://<bucket>/<path>/<database-name>/flink_table目录下。

    CREATE TEMOPORY TABLE flink_table (
      id   BIGINT,
      data STRING
    ) WITH (
      'connector'='iceberg',
      'catalog-name'='<yourCatalogName>',
      'catalog-database'='<yourDatabaseName>',
      'uri'='thrift://<ip>:<post>',
      'warehouse'='oss://<bucket>/<path>',
      'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO',
      'access-key-id'='<aliyun ak>',
      'access-key-secret'='<aliyun sk>',
      'oss.endpoint'='<yourOSSEndpoint>'
    );
  • 使用DLF Catalog,将Iceberg源表数据写入到Iceberg结果表中。

    CREATE TEMPORARY TABLE src_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    CREATE TEMPORARY TABLE dst_iceberg (
      id    BIGINT,
      data  STRING
    ) WITH (
      'connector' = 'iceberg',
      'catalog-name' = '<yourCatalogName>',
      'catalog-database' = '<yourDatabaseName>',
      'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
      'oss.endpoint' = '<yourOSSEndpoint>',  
      'access.key.id' = '${secret_values.ak_id}',
      'access.key.secret' = '${secret_values.ak_secret}',
      'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
      'warehouse' = '<yourOSSWarehousePath>',
      'dlf.catalog-id' = '<yourCatalogId>',
      'dlf.endpoint' = '<yourDLFEndpoint>',  
      'dlf.region-id' = '<yourDLFRegionId>'
    );
    
    BEGIN STATEMENT SET;
    
    INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
    INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
    
    END;

相关文档

Flink支持的连接器,请参见支持的连接器

  • 本页导读 (1)
  • 背景信息
  • 特色功能
  • 使用限制
  • 语法结构
  • WITH参数
  • 通用(源表)
  • 结果表独有
  • 类型映射
  • 代码示例
  • 结果表示例
  • 源表示例
  • 相关文档