流式数据湖仓Paimon

流式数据湖仓Paimon连接器推荐配合Paimon Catalog使用,本文为您介绍如何使用流式数据湖仓Paimon连接器。

背景信息

Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。目前阿里云开源大数据平台E-MapReduce常见的计算引擎(例如Flink、Spark、Hive或Trino)都与Paimon有着较为完善的集成度。您可以借助Apache Paimon快速地在HDFS或者云端OSS上构建自己的数据湖存储服务,并接入上述计算引擎实现数据湖的分析,详情请参见Apache Paimon

类别

详情

支持类型

源表、维表和结果表,数据摄入目标端

运行模式

流模式和批模式

数据格式

暂不支持

特有监控指标

暂无

API种类

SQL,数据摄入YAML作业

是否支持更新或删除结果表数据

特色功能

目前Apache Paimon提供以下核心能力:

  • 基于HDFS或者对象存储构建低成本的轻量级数据湖存储服务。

  • 支持在流模式与批模式下读写大规模数据集。

  • 支持分钟级到秒级数据新鲜度的批查询和OLAP查询。

  • 支持消费与产生增量数据,可作为传统的离线数仓和新型的流式数仓的各级存储。

  • 支持预聚合数据,降低存储成本与下游计算压力。

  • 支持回溯历史版本的数据。

  • 支持高效的数据过滤。

  • 支持表结构变更。

使用限制

  • 仅Flink计算引擎VVR 6.0.6及以上版本支持Paimon连接器。

  • Paimon与VVR版本对应关系详情如下表所示。

    Paimon社区版本

    实时计算Flink版引擎版本(VVR )

    0.9

    8.0.7、8.0.8、8.0.9

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

SQL

Paimon连接器可以在SQL作业中使用,作为源表或者结果表。

语法结构

  • 如果您在Paimon Catalog中创建Paimon表,则无需指定connector参数,此时创建Paimon表的语法结构如下。

    CREATE TABLE `<your-paimon-catalog>`.`<your-db>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    说明

    如果您已在Paimon Catalog中创建了Paimon表,后续无需再次创建表即可直接使用。

  • 如果您在其他Catalog中创建Paimon临时表,则需要指定connector参数与Paimon表的存储路径path,此时创建Paimon表的语法结构如下。

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create' = 'true', -- 如果指定路径不存在Paimon表数据文件,则会自动创建文件。
      ...
    );

WITH参数

参数

说明

数据类型

是否必填

默认值

备注

connector

表类型。

String

  • 如果在Paimon Catalog中创建Paimon表,则无需填写。

  • 如果在其他Catalog中创建Paimon临时表,则固定值为paimon

path

表存储路径。

String

  • 如果在Paimon Catalog中创建Paimon表,则无需填写。

  • 如果在其他Catalog中创建Paimon临时表,则为表在HDFS或OSS中的存储目录。

auto-create

创建Paimon临时表时,若指定路径不存在Paimon表文件,是否自动创建文件。

Boolean

false

参数取值如下:

  • false(默认):如果指定路径不存在Paimon表文件,则报错。

  • true:如果指定路径不存在,则Flink系统自动创建Paimon表文件。

bucket

每个分区的分桶数。

Integer

1

写入Paimon表的数据将按bucket-key打散至每个bucket中。

说明

建议每个Bucket的数据量在5 GB以下。

bucket-key

分桶关键列。

String

指定将写入Paimon表的数据按哪些列的值打散至不同的Bucket中。

列名之间用英文逗号(,)分隔,例如'bucket-key' = 'order_id,cust_id'会将数据按order_id列和cust_id列的值进行打散。

说明
  • 如果该参数未填写,则按primary key进行打散。

  • 如果Paimon表未指定primary key,则按所有列的值进行打散。

changelog-producer

增量数据产生机制。

String

none

Paimon可以为任意输入数据流产生完整的增量数据(所有的update_after数据都有对应的update_before数据),方便下游消费者。增量数据产生机制的可选值如下:

  • none(默认值):不额外产生增量数据。下游仍然可以流读Paimon表,但读到的增量数据是不完整的(只有update_after数据,没有对应的update_before数据)。

  • input:将输入数据流双写至增量数据文件中,作为增量数据。

  • full-compaction:每次Full Compaction产生完整的增量数据。

  • lookup:每次commit snapshot前产生完整的增量数据。

关于增量数据产生机制的选择,详情请参见增量数据产生机制

full-compaction.delta-commits

Full Compaction最大间隔。

Integer

该参数指定了每commit snapshot多少次之后,一定会进行一次Full Compaction。

lookup.cache-max-memory-size

Paimon维表的内存缓存大小。

String

256 MB

该参数值会同时影响维表缓存大小和lookup changelog-producer的缓存大小,两个机制的缓存大小都由该参数配置。

merge-engine

相同primary key数据的合并机制。

String

deduplicate

参数取值如下:

  • deduplicate:仅保留最新一条。

  • partial-update:用最新数据中非null的列更新相同primary key的现有数据,其它列保持不变。

  • aggregation:通过指定聚合函数进行预聚合。

关于数据合并机制的具体分析,详情请参见数据合并机制

partial-update.ignore-delete

是否忽略delete(-D)类型的消息。

Boolean

false

参数取值如下:

  • true:忽略delete消息。

  • false:不忽略delete消息。您需要通过sequence.field等配置项来设定Sink对于delete数据的处理策略,否则可能会抛出IllegalStateException或IllegalArgumentException报错。

说明
  • 在实时计算引擎VVR 8.0.6及以下版本,该参数只在partial update场景下,merge-engine = partial-update时生效。

  • 在实时计算引擎VVR 8.0.7及以上版本,该参数兼容适配非partial update场景,与ignore-delete参数功能一致,推荐替换成ignore-delete

  • 请您根据实际业务场景,判断出现的delete类型数据是否符合预期,从而决定是否启用该参数。如果delete类型数据所代表的作业语义不符合预期,则抛出错误是更合适的选择。

ignore-delete

是否忽略delete(-D)类型的消息。

Boolean

false

参数取值同partial-update.ignore-delete

说明
  • 仅实时计算引擎VVR 8.0.7及以上版本支持该参数。

  • 与partial-update.ignore-delete参数功能一致,推荐使用ignore-delete参数,并避免同时配置这两参数。

partition.default-name

分区默认名称。

String

__DEFAULT_PARTITION__

如果分区列的值为null或空字符串,将会采用该默认名称作为分区名。

partition.expiration-check-interval

多久检查一次分区过期。

String

1h

详情请参见如何设置分区自动过期?

partition.expiration-time

分区的过期时长。

String

当一个分区的存活时长超过该值时,该分区将会过期,默认永不过期。

一个分区的存活时长由该分区的分区值计算而来,详情请参见如何设置分区自动过期?

partition.timestamp-formatter

将时间字符串转换为时间戳的格式串。

String

设置从分区值提取分区存活时长的格式,详情请参见如何设置分区自动过期?

partition.timestamp-pattern

将分区值转换为时间字符串的格式串。

String

设置从分区值提取分区存活时长的格式,详情请参见如何设置分区自动过期?

scan.bounded.watermark

当Paimon源表产生的数据的watermark超过该值时,Paimon源表将会结束产生数据。

Long

无。

scan.mode

指定Paimon源表的消费位点。

String

default

详情请参见如何设置Paimon源表的消费位点?

scan.snapshot-id

指定Paimon源表从哪个snapshot开始消费。

Integer

详情请参见如何设置Paimon源表的消费位点?

scan.timestamp-millis

指定Paimon源表从哪个时间点开始消费。

Integer

详情请参见如何设置Paimon源表的消费位点?

snapshot.num-retained.max

至多保留几个最新Snapshot不过期。

Integer

2147483647

只要满足该配置或snapshot.time-retained其中之一,并同时满足snapshot.num-retained.min,就会触发Snapshot过期。

snapshot.num-retained.min

至少保留几个最新Snapshot不过期。

Integer

10

无。

snapshot.time-retained

Snapshot产生多久以后会过期。

String

1h

只要满足该配置或snapshot.num-retained.max其中之一,并同时满足snapshot.num-retained.min,就会触发snapshot过期。

write-mode

Paimon表的写入模式。

String

change-log

参数取值如下:

  • change-log:Paimon表支持根据primary key进行数据的插入、删除和更新。

  • append-only:Paimon表只接受数据的插入,且不支持primary key。该模式比change-log模式更加高效。

关于写入模式的具体介绍,详情请参见写入模式

scan.infer-parallelism

是否自动推断Paimon源表的并发度。

Boolean

false

参数取值如下:

  • true:将会根据分桶数自动推断Paimon源表的并发度。

  • false:按VVP配置的默认并发。如果是专家模式就按用户配置的并发。

scan.parallelism

Paimon源表的并发度。

Integer

说明

在作业部署详情 > 资源配置页签中,资源模式为专家模式时,该参数不生效。

sink.parallelism

Paimon结果表的并发度。

Integer

说明

在作业部署详情 > 资源配置页签中,资源模式为专家模式时,该参数不生效。

sink.clustering.by-columns

指定写入Paimon结果表的聚类列。

String

对于Paimon Append Only表(非主键表),在批作业中配置该参数可以启用聚类写入功能,使数据在特定列上按大小范围聚集分布,从而提升该表的查询速度。

多个列名请使用英文逗号(,)进行分隔,例如'col1,col2'

聚类详情请参见Apache Paimon官方文档

sink.delete-strategy​

设定校验策略,确保系统能正确处理回撤(-D/-U)类型消息。

​​

Enum

NONE

校验策略取值及Sink算子应当正确处理回撤消息的行为如下:​

  • ​NONE(默认值):不做校验。​

  • IGNORE_DELETE:Sink算子应当忽略-U和-D类型的消息,不发生回撤。

  • NON_PK_FIELD_TO_NULL:Sink算子应当忽略-U类型的消息,但是在收到-D类型的消息时,保持主键值不变、回撤Schema中其他非主键值。

    主要用在多个Sink同时写入同一张表时部分更新的场景。​

  • DELETE_ROW_ON_PK:Sink算子应当忽略-U类型的消息,但是在收到-D类型的消息时删除主键对应的行。​

  • CHANGELOG_STANDARD:Sink算子应当在收到-U和-D类型的数据时均会删除主键对应的行。​

说明
  • 仅实时计算引擎VVR 8.0.8及以上版本支持该参数。

  • Paimon Sink处理回撤消息的行为实际由ignore-delete、merge-engine等其他配置项的值决定。本配置项不直接影响这部分行为,而是会校验这部分行为是否符合预期策略。在不符合预期策略的情况下,相关校验步骤将终止,并在作业报错中提示您如何修改ignore-delete、merge-engine等其他配置项以符合预期。

说明

更多配置项详情请参见Apache Paimon官方文档

特色功能详解

数据新鲜度与一致性保证

Paimon结果表使用两阶段提交协议,在每次Flink作业的checkpoint期间提交写入的数据,因此数据新鲜度即为Flink作业的checkpoint间隔。每次提交将会产生至多两个snapshot。

当两个Flink作业同时写入一张Paimon表时,如果两个作业的数据没有写入同一个分桶,则能保证serializable级别的一致性。如果两个作业的数据写入了同一个分桶,则只能保证snapshot isolation级别的一致性。也就是说,表中的数据可能混合了两个作业的结果,但不会有数据丢失。

数据合并机制

当Paimon结果表收到多条具有相同primary key的数据时,为了保持primary key的唯一性,Paimon结果表会将这些数据合并成一条数据。通过指定merge-engine参数,您可以指定数据合并的具体行为。数据合并机制详情如下表所示。

合并机制

详情

去重(Deduplicate)

去重机制(deduplicate)是默认的数据合并机制。对于多条具有相同primary key的数据,Paimon结果表仅会保留最新一条数据,并丢弃其它具有primary key的数据。

说明

如果最新一条数据是一条delete消息,所有具有该primary key的数据都将被丢弃。

部分更新(Partial Update)

通过指定部分更新机制(partial-update),您可以通过多条消息对数据进行逐步更新,并最终得到完整的数据。具体来说,具有相同primary key的新数据将会覆盖原来的数据,但值为null的列不会进行覆盖。

例如,假设Paimon结果表按顺序收到了以下三条数据:

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

第一列是primary key,则最终结果为<1, 25.2, 10, 'This is a book'>。

说明
  • 如果需要流读partial-update的结果,必须将changelog-producer参数设置为lookup或full-compaction。

  • partial-update无法处理delete消息。您可以设置partial-update.ignore-delete参数以忽略delete消息。

预聚合(Aggregation)

部分场景下,可能只关心聚合后的值。预聚合机制(aggregation)将具有相同primary key的数据根据您指定的聚合函数进行聚合。对于不属于primary key的每一列,都需要通过fields.<field-name>.aggregate-function指定一个聚合函数,否则该列将默认使用last_non_null_value聚合函数。例如,考虑以下Paimon表的定义。

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

price列将会根据max函数进行聚合,而sales列将会根据sum函数进行聚合。给定两条输入数据 <1, 23.0, 15>和 <1, 30.2, 20>,最终结果为<1, 30.2, 35>。当前支持的聚合函数与对应的数据类型如下:

  • sum:支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。

  • min和max:支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ。

  • last_value和last_non_null_value:支持所有数据类型。

  • listagg:支持STRING。

  • bool_and和bool_or:支持BOOLEAN。

说明
  • 只有sum函数支持回撤与删除数据,其它聚合函数不支持回撤与删除。如果您需要某些列忽略回撤与删除消息,可以设置'fields.${field_name}.ignore-retract'='true'

  • 如果需要流读aggregation的结果,必须将changelog-producer参数设置为lookup或full-compaction。

增量数据产生机制

通过changelog-producer参数设置相应的增量数据产生机制,Paimon可以为任意输入数据流产生完整的增量数据(所有的update_after数据都有对应的update_before数据)。以下列举了所有的增量数据产生机制,更加详细的介绍请参见Apache Paimon官方文档

机制

详情

None

设置changelog-producer为none(默认值)后,此时,对于同一个primary key,下游的Paimon源表只能看到数据的最新情况。但这些最新情况无法让下游消费者方便地了解完整的增量数据,从而进行正确的计算。因为它只能确定对应数据是否被删除了,或最新数据是什么,无法得知更改之前的数据是什么。

例如,假设下游消费者需要计算某一列的总和,如果消费者只看到了最新数据5,它无法断定该如何更新总和。因为如果之前的数据是4,它应该将总和增加1;如果之前的数据是6,它应该将总和减去1。此类消费者对update_before较为敏感,建议不要将增量数据产生机制配置为None,但是其他增量数据产生机制会带来性能损耗。

说明

如果您的下游是数据库之类的对update_before数据不敏感的消费者,则可以将增量数据产生机制配置为None。因此,建议您根据实际需要配置增量数据产生机制。

Input

设置changelog-producer为input后,Paimon结果表会将输入数据流双写至增量数据文件中,作为增量数据。

因此,只有当输入数据流本身是完整的增量数据时(例如CDC数据),才能使用这一增量数据产生机制。

Lookup

设置changelog-producer为lookup后,Paimon结果表会通过一种类似于维表的点查机制,在每次commit snapshot之前产生本次snapshot对应的完整增量数据。无论输入数据是否为完整的增量数据,这一增量数据产生机制均能产生完整的增量数据。

与下文的Full Compaction机制相比,Lookup机制产生增量数据的时效性更好,但总体来看耗费的资源更多。

推荐在对增量数据的新鲜度有较高要求(例如分钟级)的情况下使用。

Full Compaction

设置changelog-producer为full-compaction后,Paimon结果表会在每一次full compaction时产生完整的增量数据。无论输入数据是否为完整的增量数据,这一增量数据产生机制均能产生完整的增量数据。Full compaction的时间间隔由full-compaction.delta-commits参数指定。

与上文的Lookup机制相比,Full Compaction机制产生增量数据的时效性更差,但它利用了数据的full compaction过程,不产生额外计算,因此总体来看耗费的资源更少。

推荐在对增量数据的新鲜度要求不高(例如小时级)的情况下使用。

写入模式

Paimon表目前支持的写入模式如下。

模式

详情

Change-log

change-log写入模式是Paimon表的默认写入模式。该写入模式支持根据primary key对数据进行插入、删除与更新,您也可以在该写入模式下使用上文提到的数据合并机制与增量数据产生机制。

Append-only

append-only写入模式仅支持数据的插入,且不支持primary key。该模式比change-log模式更加高效,可在对数据新鲜度要求一般的场景下(例如分钟级新鲜度)作为消息队列的替代品。

关于append-only写入模式的详细介绍,请参见Apache Paimon官方文档。在使用append-only写入模式时,需要注意以下两点:

  • 建议您根据实际需求设置bucket-key参数,否则Paimon表将根据所有列的值进行分桶,计算效率较低。

  • append-only写入模式可在一定程度上保证数据的产出顺序,具体的产出顺序为:

    1. 如果两条数据来自不同的分区,若设置了scan.plan-sort-partition参数,则分区值较小的数据将首先产出。否则来自较早创建的分区的数据将首先产出。

    2. 如果两条数据来自同一分区的同一分桶,则较早写入的数据将首先产出。

    3. 如果两条数据来自同一分区的不同分桶,由于不同分桶由不同的并发进行处理,因此不保证两条数据的产出顺序。

作为CTAS和CDAS的目标端

Paimon表支持实时同步单表或整库级别的数据,在同步过程之中如果上游的表结构发生了变更也会实时同步到Paimon表中。详见管理Paimon表管理Paimon Catalog

数据摄入

Paimon连接器可以用于数据摄入YAML作业开发,作为目标端写入。

语法结构

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

配置项

参数

说明

是否必填

数据类型

默认值

备注

type

连接器类型。

STRING

固定值为paimon

name

目标端名称。

STRING

Sink的名称。

catalog.properties.metastore

Paimon Catalog的类型。

STRING

filesystem

取值如下:

  • filesystem(默认值)

  • dlf-paimon

catalog.properties.*

创建Paimon Catalog的参数。

STRING

详情请参见管理Paimon Catalog

table.properties.*

创建Paimon table的参数。

STRING

详情请参见Paimon table options

catalog.properties.warehouse

文件存储的根目录。

STRING

仅在catalog.properties.metastore设置为 filesystem时生效。

commit.user

提交数据文件时的用户名。

STRING

说明

建议为不同的作业设置不同的用户名,方便在出现提交冲突时定位冲突的作业。

partition.key

每个分区表的分区字段。

STRING

不同的表使用;分割,不同的字段使用,分割,表与字段使用:分割。例如:testdb.table1:id1,id2;testdb.table2:name

使用示例

使用Paimon作为数据摄入目标端:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: ${mysql.source.table}
  server-id: 8601-8604

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

常见问题