本文为您介绍全量数据湖Iceberg源表的DDL定义、WITH参数、类型映射和代码示例。

什么是数据湖Iceberg

Apache Iceberg是一种开放的数据湖表格格式。您可以借助Apache Iceberg快速地在HDFS或者云端OSS上构建自己的数据湖存储服务,并借助开源大数据生态的Flink、Spark、Hive、Presto等计算引擎来实现数据湖的分析。目前Apache Iceberg提供以下核心能力:
  • 基于HDFS或者对象存储构建低成本的轻量级数据湖存储服务。
  • 完善的ACID语义。
  • 支持历史版本回溯。
  • 支持高效的数据过滤。
  • 支持Schema Evolution。
  • 支持Partition Evolution。
您可以借助Flink高效的容错能力和流处理能力,把海量的日志行为数据实时导入到Apache Iceberg数据湖内,再借助Flink或者其他分析引擎来实现数据价值的提取。
说明 Iceberg Connector既可以作为Flink Stream作业的结果表,也可以作为Flink批作业的源表和结果表。本文主要介绍Iceberg表作为Flink批作业的源表。

使用限制

仅Flink计算引擎vvr-4.0.8-flink-1.13及以上版本支持Iceberg Connector。

DDL定义

CREATE TABLE dlf_iceberg (
  id    BIGINT,
  data  STRING
) WITH (
  'connector' = 'iceberg',
  'catalog-name' = '<yourCatalogName>',
  'catalog-type' = 'custom',
  'catalog-database' = '<yourDatabaseName>',
  'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
  'oss.endpoint' = '<yourOSSEndpoint>',  
  'access.key.id' = '<yourAccessKeyId>',
  'access.key.secret' = '<yourAccessKeySecret>',
  'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
  'warehouse' = '<yourOSSWarehousePath>',
  'dlf.catalog-id' = '<yourCatalogId>',
  'dlf.endpoint' = '<yourDLFEndpoint>',  
  'dlf.region-id' = '<yourDLFRegionId>'
);

WITH参数

参数 说明 是否必选 备注
connector 源表类型。 固定值为iceberg
catalog-name Catalog名称。 请填写为自定义的英文名。
catalog-type Catalog类型。 固定值为custom。
catalog-database 数据库名称。 对应用户在DLF上创建的数据库名称,例如dlf_db。
io-impl 分布式文件系统的实现类名。 固定值为org.apache.iceberg.aliyun.oss.OSSFileIO
oss.endpoint 阿里云对象存储服务OSS的Endpoint。 请详情参见访问域名和数据中心
说明
  • 推荐您为oss.endpoint参数配置OSS的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则oss.endpoint需要配置为oss-cn-hangzhou-internal.aliyuncs.com。
  • 如果您需要跨VPC访问OSS,则请参见如何访问跨VPC里的存储资源?
access.key.id 阿里云账号的Access Key。 获取方法请参见获取AccessKey
access.key.secret 阿里云帐号的Access Secret。 获取方法请参见获取AccessKey
catalog-impl Catalog的Class类名。 固定值为org.apache.iceberg.aliyun.dlf.DlfCatalog
warehouse 表数据存放在OSS的路径。 无。
dlf.catalog-id 阿里云帐号的帐号ID。 登录账号信息,请通过用户信息页面获取。获取登录账号
dlf.endpoint DLF服务的Endpoint。 详情请参见已开通的地域和访问域名
说明
  • 推荐您为dlf.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com。
  • 如果您需要跨VPC访问DLF,则请参见如何访问跨VPC里的存储资源?
dlf.region-id DLF服务的地域名。 详情请参见已开通的地域和访问域名
说明 请和dlf.endpoint选择的地域保持一致。

类型映射

Iceberg字段类型 Flink字段类型
BOOLEAN BOOLEAN
INT INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL(P,S) DECIMAL(P,S)
DATE DATE
TIME TIME
说明 Iceberg时间戳精度为微秒,Flink时间戳精度为毫秒。在使用Flink读取Iceberg数据时,时间精度会对齐到毫秒。
TIMESTAMP TIMESTAMP
TIMESTAMPTZ TIMESTAMP_LTZ
STRING STRING
FIXED(L) BYTES
BINARY VARBINARY
STRUCT<...> ROW
LIST<E> LIST
MAP<K, V> MAP

代码示例

以下内容将为您介绍如何通过Flink作业将Iceberg源表的数据导入到另一个Iceberg结果表中,实现基本的ETL操作。

  1. 确认已成功创建了DLF数据库。
    如果您没有创建对应的DLF数据库,请创建DLF数据库,详情请参见创建元数据库
    说明 创建数据库选择OSS路径时,请将名称设置为${warehouse}/${database_name}.db。例如,如果您将warehouse地址设置为oss://iceberg-test/warehouse,数据库的名称设置为dlf_db,则dlf_db的OSS路径需要设置为oss://iceberg-test/warehouse/dlf_db.db
  2. 编写Flink Batch SQL作业生成测试数据,并导入到另一个Iceberg表中。
    1. 在左侧导航栏,单击作业开发
    2. 在页面左上角,单击新建,文件类型选择为批作业的SQL。
      新建批作业
    3. 在文本编辑区域编写SQL,代码示例如下。
      CREATE TEMPORARY TABLE src_iceberg (
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'iceberg',
        'catalog-name' = '<yourCatalogName>',
        'catalog-type' = 'custom',
        'catalog-database' = '<yourDatabaseName>',
        'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
        'oss.endpoint' = '<yourOSSEndpoint>',  
        'access.key.id' = '<yourAccessKeyId>',
        'access.key.secret' = '<yourAccessKeySecret>',
        'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
        'warehouse' = '<yourOSSWarehousePath>',
        'dlf.catalog-id' = '<yourCatalogId>',
        'dlf.endpoint' = '<yourDLFEndpoint>',  
        'dlf.region-id' = '<yourDLFRegionId>'
      );
      
      CREATE TEMPORARY TABLE dst_iceberg (
        id    BIGINT,
        data  STRING
      ) WITH (
        'connector' = 'iceberg',
        'catalog-name' = '<yourCatalogName>',
        'catalog-type' = 'custom',
        'catalog-database' = '<yourDatabaseName>',
        'io-impl' = 'org.apache.iceberg.aliyun.oss.OSSFileIO',
        'oss.endpoint' = '<yourOSSEndpoint>',  
        'access.key.id' = '<yourAccessKeyId>',
        'access.key.secret' = '<yourAccessKeySecret>',
        'catalog-impl' = 'org.apache.iceberg.aliyun.dlf.DlfCatalog',
        'warehouse' = '<yourOSSWarehousePath>',
        'dlf.catalog-id' = '<yourCatalogId>',
        'dlf.endpoint' = '<yourDLFEndpoint>',  
        'dlf.region-id' = '<yourDLFRegionId>'
      );
      
      BEGIN STATEMENT SET;
      
      INSERT INTO src_iceberg VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD'), (5, 'EEE');
      INSERT INTO dst_iceberg SELECT * FROM src_iceberg;
      
      END;
  3. 在作业开发页面右侧高级配置面版中,引擎版本配置为vvr-4.0.8-flink-1.13
    引擎版本
  4. 单击验证
  5. 单击上线
  6. 在OSS控制台查看写入的测试数据。
    等整个批作业完成后,您就可以在OSS路径下查看dst_iceberg表中写入的数据了。