本文介绍如何使用Iceberg连接器。
背景信息
Apache Iceberg是一种开放的数据湖表格格式。您可以借助Apache Iceberg快速地在HDFS或者云端OSS上构建自己的数据湖存储服务,并借助开源大数据生态的Flink、Spark、Hive、Presto等计算引擎来实现数据湖的分析。
|
类别 |
详情 |
|
支持类型 |
源表和结果表,数据摄入目标端 |
|
运行模式 |
批模式和流模式 |
|
数据格式 |
暂不适用 |
|
特有监控指标 |
暂无 |
|
API种类 |
SQL,数据摄入YAML作业 |
|
是否支持更新或删除结果表数据 |
是 |
特色功能
目前Apache Iceberg提供以下核心能力:
-
基于HDFS或者对象存储构建低成本的轻量级数据湖存储服务。
-
完善的ACID语义。
-
支持历史版本回溯。
-
支持高效的数据过滤。
-
支持Schema Evolution。
-
支持Partition Evolution。
您可以借助Flink高效的容错能力和流处理能力,把海量的日志行为数据实时导入到Apache Iceberg数据湖内,再借助Flink或者其他分析引擎来实现数据价值的提取。
使用限制
-
仅Flink计算引擎VVR 4.0.8及以上版本支持Iceberg连接器。Iceberg连接器需要搭配DLF Catalog一起使用,详情请参见管理DLF-Legacy Catalog。
-
Iceberg连接器支持Apache Iceberg v1和v2表格式,详情请参见Iceberg Table Spec。
说明仅实时计算引擎VVR 8.0.7及以上版本支持v2表格式。
-
流读模式下,仅支持将Append Only的Iceberg表作为源表。
语法结构
CREATE TABLE iceberg_table (
id BIGINT,
data STRING
PRIMARY KEY(`id`) NOT ENFORCED
)
PARTITIONED BY (data)
WITH (
'connector' = 'iceberg',
...
);
WITH参数
通用(源表)
|
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
|
connector |
源表类型 |
String |
是 |
无 |
固定值为 |
|
catalog-name |
Catalog名称 |
String |
是 |
无 |
请填写为自定义的英文名。 |
|
catalog-database |
数据库名称 |
String |
是 |
default |
对应在DLF上创建的数据库名称,例如dlf_db。 说明
如果您没有创建对应的DLF数据库,请创建DLF数据库,详情请参见数据库表及函数。 |
|
io-impl |
分布式文件系统的实现类名 |
String |
是 |
无 |
固定值为 |
|
oss.endpoint |
阿里云对象存储服务OSS的Endpoint |
String |
否 |
无 |
请详情参见地域和Endpoint。 说明
|
|
阿里云账号的AccessKey ID |
String |
是 |
无 |
详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要
为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 |
|
阿里云账号的AccessKey Secret |
String |
是 |
无 |
|
|
catalog-impl |
Catalog的Class类名 |
String |
是 |
无 |
固定值为 |
|
warehouse |
表数据存放在OSS的路径 |
String |
是 |
无 |
无。 |
|
dlf.catalog-id |
阿里云账号的账号ID |
String |
是 |
无 |
可通过用户信息页面获取账号ID。 |
|
dlf.endpoint |
DLF服务的Endpoint |
String |
是 |
无 |
详情请参见已开通的地域和访问域名。 说明
|
|
dlf.region-id |
DLF服务的地域名 |
String |
是 |
无 |
详情请参见已开通的地域和访问域名。 说明
请和dlf.endpoint选择的地域保持一致。 |
结果表独有
|
参数 |
说明 |
数据类型 |
是否必填 |
默认值 |
备注 |
|
write.operation |
写入操作模式 |
String |
否 |
upsert |
|
|
hive_sync.enable |
是否开启同步元数据到Hive功能 |
boolean |
否 |
false |
参数取值如下:
|
|
hive_sync.mode |
Hive数据同步模式 |
String |
否 |
hms |
|
|
hive_sync.db |
同步到Hive的数据库名称 |
String |
否 |
当前Table在Catalog中的数据库名 |
无。 |
|
hive_sync.table |
同步到Hive的表名称 |
String |
否 |
当前Table名 |
无。 |
|
dlf.catalog.region |
DLF服务的地域名 |
String |
否 |
无 |
详情请参见已开通的地域和访问域名。 说明
|
|
dlf.catalog.endpoint |
DLF服务的Endpoint |
String |
否 |
无 |
详情请参见已开通的地域和访问域名。 说明
|
类型映射
|
Iceberg字段类型 |
Flink字段类型 |
|
BOOLEAN |
BOOLEAN |
|
INT |
INT |
|
LONG |
BIGINT |
|
FLOAT |
FLOAT |
|
DOUBLE |
DOUBLE |
|
DECIMAL(P,S) |
DECIMAL(P,S) |
|
DATE |
DATE |
|
TIME |
TIME 说明
Iceberg时间戳精度为微秒,Flink时间戳精度为毫秒。在使用Flink读取Iceberg数据时,时间精度会对齐到毫秒。 |
|
TIMESTAMP |
TIMESTAMP |
|
TIMESTAMPTZ |
TIMESTAMP_LTZ |
|
STRING |
STRING |
|
FIXED(L) |
BYTES |
|
BINARY |
VARBINARY |
|
STRUCT<...> |
ROW |
|
LIST<E> |
LIST |
|
MAP<K,V> |
MAP |
代码示例
请确认您已创建了OSS Bucket和DLF数据库。详情请参见控制台创建存储空间和数据库表及函数。
在创建DLF数据库选择路径时,建议按照${warehouse}/${database_name}.db格式填写。例如,如果warehouse地址为oss://iceberg-test/warehouse,数据库的名称为dlf_db,则dlf_db的OSS路径需要设置为oss://iceberg-test/warehouse/dlf_db.db。
结果表示例
本示例为您介绍如何通过Datagen连接器随机生成流式数据写入Iceberg表。
CREATE TEMPORARY TABLE datagen(
id BIGINT,
data STRING
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
id BIGINT,
data STRING
) WITH (
'connector' = 'iceberg',
'catalog-name' = '<yourCatalogName>',
'catalog-database' = '<yourDatabaseName>',
'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
'oss.endpoint' = '<yourOSSEndpoint>',
'access.key.id' = '${secret_values.ak_id}',
'access.key.secret' = '${secret_values.ak_secret}',
'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
'warehouse' = '<yourOSSWarehousePath>',
'dlf.catalog-id' = '<yourCatalogId>',
'dlf.endpoint' = '<yourDLFEndpoint>',
'dlf.region-id' = '<yourDLFRegionId>'
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;
源表示例
-
使用DLF Catalog,将Iceberg源表数据写入到Iceberg结果表中。
CREATE TEMPORARY TABLE src_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); CREATE TEMPORARY TABLE dst_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); BEGIN STATEMENT SET; INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE'); INSERT INTO dst_iceberg SELECT * FROM src_iceberg; END;
数据摄入
Iceberg连接器可以用于数据摄入YAML作业开发,作为目标端写入。
语法结构
sink:
type: iceberg
name: Iceberg Sink
catalog.properties.rest.signing-region: cn-beijing
catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg
catalog.properties.warehouse: flink_iceberg
catalog.properties.type: rest
catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO
配置项
|
参数 |
说明 |
是否必填 |
数据类型 |
默认值 |
备注 |
|
type |
连接器类型。 |
是 |
STRING |
无 |
固定值为 |
|
name |
目标端名称。 |
否 |
STRING |
无 |
Sink的名称。 |
|
catalog.properties.rest.signing-region |
DLF的Region ID,详见服务接入点。 |
是 |
STRING |
无 |
无 |
|
catalog.properties.uri |
访问DLF Rest Catalog的URI,详见Iceberg REST。 |
是 |
STRING |
无 |
无 |
|
catalog.properties.warehouse |
DLF Catalog名称。 |
是 |
STRING |
无 |
无 |
|
catalog.properties.warehouse |
文件存储的根目录。 |
否 |
STRING |
无 |
无 |
|
catalog.properties.type |
Catalog类型,固定为rest。 |
是 |
STRING |
rest |
无 |
|
catalog.properties.io-impl |
固定值:org.apache.iceberg.rest.DlfFileIO。 |
是 |
STRING |
org.apache.iceberg.rest.DlfFileIO |
无 |
|
partition.key |
每个分区表的分区字段。 |
否 |
STRING |
无 |
每个分区表的分区键,允许为多个表设置多个主键。表之间用 对于需要进行隐式转换的分区,我们可以直接在分区字段上添加隐式转换的函数,例如 |
|
table.properties.* |
创建Iceberg table的参数。 |
否 |
String |
无 |
详情请参见Iceberg table options。 |
复用已有 Catalog
自VVR 11.5版本起,您可以在Flink CDC数据摄入作业中直接引用“数据管理”页面中创建的内置Iceberg Catalog,减少手写连接属性工作量。
sink:
type: iceberg
using.built-in-catalog: iceberg_catalog
目前,数据摄入作业支持自动复用所有 Iceberg Catalog 参数,等价于在 YAML 作业中手动配置 catalog.properties.前缀的参数。
如果希望覆盖以上自动复用的参数,可显式写出相应的 YAML 参数,其具备更高的优先级。
使用示例
Iceberg Catalog为DLF Catalog,写入阿里云数据湖构建的配置示例:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: iceberg name: Iceberg Sink catalog.properties.rest.signing-region: cn-beijing catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com/iceberg catalog.properties.warehouse: flink_iceberg catalog.properties.type: rest catalog.properties.io-impl: org.apache.iceberg.rest.DlfFileIO其中,catalog.properties前缀的参数含义请参见创建Iceberg DLF Catalog。
表结构变更
目前,Iceberg作为数据摄入目标端支持以下表结构变更事件:
-
CREATE TABLE EVENT
-
ADD COLUMN EVENT
-
ALTER COLUMN TYPE EVENT(不支持修改主键列的类型)
-
RENAME COLUMN EVENT
-
DROP COLUMN EVENT
-
TRUNCATE TABLE EVENT
-
DROP TABLE EVENT
在下游 Iceberg 表已经存在时,会优先使用已有的表结构进行写入,不会尝试重复建表。
相关文档
Flink支持的连接器,请参见支持的连接器。