本文介绍如何使用Iceberg连接器。
背景信息
Apache Iceberg是一种开放的数据湖表格格式。您可以借助Apache Iceberg快速地在HDFS或者云端OSS上构建自己的数据湖存储服务,并借助开源大数据生态的Flink、Spark、Hive、Presto等计算引擎来实现数据湖的分析。
| 类别 | 详情 | 
| 支持类型 | 源表和结果表 | 
| 运行模式 | 批模式和流模式 | 
| 数据格式 | 暂不适用 | 
| 特有监控指标 | 暂无 | 
| API种类 | SQL | 
| 是否支持更新或删除结果表数据 | 是 | 
特色功能
目前Apache Iceberg提供以下核心能力:
- 基于HDFS或者对象存储构建低成本的轻量级数据湖存储服务。 
- 完善的ACID语义。 
- 支持历史版本回溯。 
- 支持高效的数据过滤。 
- 支持Schema Evolution。 
- 支持Partition Evolution。 
- 支持和自建Hive Metastore配合使用,详情请参见使用Hive Catalog,配合自建Hive Metastore(HMS)使用。 
您可以借助Flink高效的容错能力和流处理能力,把海量的日志行为数据实时导入到Apache Iceberg数据湖内,再借助Flink或者其他分析引擎来实现数据价值的提取。
使用限制
- 仅Flink计算引擎VVR 4.0.8及以上版本支持Iceberg连接器。Iceberg连接器需要搭配DLF Catalog一起使用,详情请参见管理DLF Catalog。 
- Iceberg连接器支持Apache Iceberg v1和v2表格式,详情请参见Iceberg Table Spec。 说明- 仅实时计算引擎VVR 8.0.7及以上版本支持v2表格式。 
- 流读模式下,仅支持将Append Only的Iceberg表作为源表。 
语法结构
CREATE TABLE iceberg_table (
  id    BIGINT,
  data  STRING
  PRIMARY KEY(`id`) NOT ENFORCED
)
 PARTITIONED BY (data)
 WITH (
 'connector' = 'iceberg',
  ...
);WITH参数
通用(源表)
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| connector | 源表类型 | String | 是 | 无 | 固定值为 | 
| catalog-name | Catalog名称 | String | 是 | 无 | 请填写为自定义的英文名。 | 
| catalog-database | 数据库名称 | String | 是 | default | 对应在DLF上创建的数据库名称,例如dlf_db。 说明  如果您没有创建对应的DLF数据库,请创建DLF数据库,详情请参见数据库表及函数。 | 
| io-impl | 分布式文件系统的实现类名 | String | 是 | 无 | 固定值为 | 
| oss.endpoint | 阿里云对象存储服务OSS的Endpoint | String | 否 | 无 | 请详情参见OSS地域和访问域名。 说明  
 | 
| 
 | 阿里云账号的AccessKey ID | String | 是 | 无 | 详情请参见如何查看AccessKey ID和AccessKey Secret信息? 重要  为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量。 | 
| 
 | 阿里云账号的AccessKey Secret | String | 是 | 无 | |
| catalog-impl | Catalog的Class类名 | String | 是 | 无 | 固定值为 | 
| warehouse | 表数据存放在OSS的路径 | String | 是 | 无 | 无。 | 
| dlf.catalog-id | 阿里云账号的账号ID | String | 是 | 无 | 可通过用户信息页面获取账号ID。 | 
| dlf.endpoint | DLF服务的Endpoint | String | 是 | 无 | 详情请参见已开通的地域和访问域名。 说明  
 | 
| dlf.region-id | DLF服务的地域名 | String | 是 | 无 | 详情请参见已开通的地域和访问域名。 说明  请和dlf.endpoint选择的地域保持一致。 | 
| uri | Hive metastore的thrift URI | String | 仅当使用Hive Catalog时,必填。 | 无 | 配合自建Hive Metastore使用。 | 
结果表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| write.operation | 写入操作模式 | String | 否 | upsert | 
 | 
| hive_sync.enable | 是否开启同步元数据到Hive功能 | boolean | 否 | false | 参数取值如下: 
 | 
| hive_sync.mode | Hive数据同步模式 | String | 否 | hms | 
 | 
| hive_sync.db | 同步到Hive的数据库名称 | String | 否 | 当前Table在Catalog中的数据库名 | 无。 | 
| hive_sync.table | 同步到Hive的表名称 | String | 否 | 当前Table名 | 无。 | 
| dlf.catalog.region | DLF服务的地域名 | String | 否 | 无 | 详情请参见已开通的地域和访问域名。 说明  
 | 
| dlf.catalog.endpoint | DLF服务的Endpoint | String | 否 | 无 | 详情请参见已开通的地域和访问域名。 说明  
 | 
类型映射
| Iceberg字段类型 | Flink字段类型 | 
| BOOLEAN | BOOLEAN | 
| INT | INT | 
| LONG | BIGINT | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE | 
| DECIMAL(P,S) | DECIMAL(P,S) | 
| DATE | DATE | 
| TIME | TIME 说明  Iceberg时间戳精度为微秒,Flink时间戳精度为毫秒。在使用Flink读取Iceberg数据时,时间精度会对齐到毫秒。 | 
| TIMESTAMP | TIMESTAMP | 
| TIMESTAMPTZ | TIMESTAMP_LTZ | 
| STRING | STRING | 
| FIXED(L) | BYTES | 
| BINARY | VARBINARY | 
| STRUCT<...> | ROW | 
| LIST<E> | LIST | 
| MAP<K,V> | MAP | 
代码示例
请确认您已创建了OSS Bucket和DLF数据库。详情请参见控制台创建存储空间和数据库表及函数。
在创建DLF数据库选择路径时,建议按照${warehouse}/${database_name}.db格式填写。例如,如果warehouse地址为oss://iceberg-test/warehouse,数据库的名称为dlf_db,则dlf_db的OSS路径需要设置为oss://iceberg-test/warehouse/dlf_db.db。
结果表示例
本示例为您介绍如何通过Datagen连接器随机生成流式数据写入Iceberg表。
CREATE TEMPORARY TABLE datagen(
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '${secret_values.ak_id}',
  'access.key.secret' = '${secret_values.ak_secret}',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);
INSERT INTO dlf_iceberg SELECT * FROM datagen;源表示例
- 使用Hive Catalog,配合自建Hive Metastore(HMS)使用。 - 您需要保证Flink与HMS集群网络互通,数据将存储在 - oss://<bucket>/<path>/<database-name>/flink_table目录下。- CREATE TEMOPORY TABLE flink_table ( id BIGINT, data STRING ) WITH ( 'connector'='iceberg', 'catalog-name'='<yourCatalogName>', 'catalog-database'='<yourDatabaseName>', 'uri'='thrift://<ip>:<post>', 'warehouse'='oss://<bucket>/<path>', 'io-impl'='org.apache.iceberg.aliyun.oss.OSSFileIO', 'access-key-id'='<aliyun ak>', 'access-key-secret'='<aliyun sk>', 'oss.endpoint'='<yourOSSEndpoint>' );
- 使用DLF Catalog,将Iceberg源表数据写入到Iceberg结果表中。 - CREATE TEMPORARY TABLE src_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); CREATE TEMPORARY TABLE dst_iceberg ( id BIGINT, data STRING ) WITH ( 'connector' = 'iceberg', 'catalog-name' = '<yourCatalogName>', 'catalog-database' = '<yourDatabaseName>', 'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO', 'oss.endpoint' = '<yourOSSEndpoint>', 'access.key.id' = '${secret_values.ak_id}', 'access.key.secret' = '${secret_values.ak_secret}', 'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog', 'warehouse' = '<yourOSSWarehousePath>', 'dlf.catalog-id' = '<yourCatalogId>', 'dlf.endpoint' = '<yourDLFEndpoint>', 'dlf.region-id' = '<yourDLFRegionId>' ); BEGIN STATEMENT SET; INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE'); INSERT INTO dst_iceberg SELECT * FROM src_iceberg; END;
相关文档
Flink支持的连接器,请参见支持的连接器。