流式数据湖仓Paimon连接器推荐配合Paimon Catalog使用,本文为您介绍如何使用流式数据湖仓Paimon连接器。
背景信息
Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。目前阿里云开源大数据平台E-MapReduce常见的计算引擎(例如Flink、Spark、Hive或Trino)都与Paimon有着较为完善的集成度。您可以借助Apache Paimon快速地在HDFS或者云端OSS上构建自己的数据湖存储服务,并接入上述计算引擎实现数据湖的分析,详情请参见Apache Paimon。
| 类别 | 详情 | 
| 支持类型 | 源表、维表和结果表,数据摄入目标端 | 
| 运行模式 | 流模式和批模式 | 
| 数据格式 | 暂不支持 | 
| 特有监控指标 | 暂无 | 
| API种类 | SQL,数据摄入YAML作业 | 
| 是否支持更新或删除结果表数据 | 是 | 
特色功能
目前Apache Paimon提供以下核心能力:
- 基于HDFS或者对象存储构建低成本的轻量级数据湖存储服务。 
- 支持在流模式与批模式下读写大规模数据集。 
- 支持分钟级到秒级数据新鲜度的批查询和OLAP查询。 
- 支持消费与产生增量数据,可作为传统的离线数仓和新型的流式数仓的各级存储。 
- 支持预聚合数据,降低存储成本与下游计算压力。 
- 支持回溯历史版本的数据。 
- 支持高效的数据过滤。 
- 支持表结构变更。 
使用限制
- 仅Flink计算引擎VVR 6.0.6及以上版本支持Paimon连接器。 
- Paimon与VVR版本对应关系详情如下表所示。 - Paimon社区版本 - 实时计算Flink版引擎版本(VVR ) - 1.1 - 11.1 - 1.0 - 8.0.11 - 0.9 - 8.0.7、8.0.8、8.0.9、8.0.10 - 0.8 - 8.0.6 - 0.7 - 8.0.5 - 0.6 - 8.0.4 - 0.6 - 8.0.3 
SQL
Paimon连接器可以在SQL作业中使用,作为源表或者结果表。
语法结构
- 如果您在Paimon Catalog中创建Paimon表,则无需指定 - connector参数,此时创建Paimon表的语法结构如下。- CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );说明- 如果您已在Paimon Catalog中创建了Paimon表,后续无需再次创建表即可直接使用。 
- 如果您在其他Catalog中创建Paimon临时表,则需要指定connector参数与Paimon表的存储路径path,此时创建Paimon表的语法结构如下。 - CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- 如果指定路径不存在Paimon表数据文件,则会自动创建文件。 ... );
WITH参数
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| connector | 表类型。 | String | 否 | 无 | 
 | 
| path | 表存储路径。 | String | 否 | 无 | 
 | 
| auto-create | 创建Paimon临时表时,若指定路径不存在Paimon表文件,是否自动创建文件。 | Boolean | 否 | false | 参数取值如下: 
 | 
| bucket | 每个分区的分桶数。 | Integer | 否 | 1 | 写入Paimon表的数据将按 说明  建议每个Bucket的数据量在5 GB以下。 | 
| bucket-key | 分桶关键列。 | String | 否 | 无 | 指定将写入Paimon表的数据按哪些列的值打散至不同的Bucket中。 列名之间用英文逗号(,)分隔,例如 说明  
 | 
| changelog-producer | 增量数据产生机制。 | String | 否 | none | Paimon可以为任意输入数据流产生完整的增量数据(所有的update_after数据都有对应的update_before数据),方便下游消费者。增量数据产生机制的可选值如下: 
 关于增量数据产生机制的选择,详情请参见增量数据产生机制。 | 
| full-compaction.delta-commits | Full Compaction最大间隔。 | Integer | 否 | 无 | 该参数指定了每commit snapshot多少次之后,一定会进行一次Full Compaction。 | 
| lookup.cache-max-memory-size | Paimon维表的内存缓存大小。 | String | 否 | 256 MB | 该参数值会同时影响维表缓存大小和lookup changelog-producer的缓存大小,两个机制的缓存大小都由该参数配置。 | 
| merge-engine | 相同primary key数据的合并机制。 | String | 否 | deduplicate | 参数取值如下: 
 关于数据合并机制的具体分析,详情请参见数据合并机制。 | 
| partial-update.ignore-delete | 是否忽略delete(-D)类型的消息。 | Boolean | 否 | false | 参数取值如下: 
 说明  
 | 
| ignore-delete | 是否忽略delete(-D)类型的消息。 | Boolean | 否 | false | 参数取值同partial-update.ignore-delete。 说明  
 | 
| partition.default-name | 分区默认名称。 | String | 否 | __DEFAULT_PARTITION__ | 如果分区列的值为null或空字符串,将会采用该默认名称作为分区名。 | 
| partition.expiration-check-interval | 多久检查一次分区过期。 | String | 否 | 1h | 详情请参见如何设置分区自动过期? | 
| partition.expiration-time | 分区的过期时长。 | String | 否 | 无 | 当一个分区的存活时长超过该值时,该分区将会过期,默认永不过期。 一个分区的存活时长由该分区的分区值计算而来,详情请参见如何设置分区自动过期? | 
| partition.timestamp-formatter | 将时间字符串转换为时间戳的格式串。 | String | 否 | 无 | 设置从分区值提取分区存活时长的格式,详情请参见如何设置分区自动过期? | 
| partition.timestamp-pattern | 将分区值转换为时间字符串的格式串。 | String | 否 | 无 | 设置从分区值提取分区存活时长的格式,详情请参见如何设置分区自动过期? | 
| scan.bounded.watermark | 当Paimon源表产生的数据的watermark超过该值时,Paimon源表将会结束产生数据。 | Long | 否 | 无 | 无。 | 
| scan.mode | 指定Paimon源表的消费位点。 | String | 否 | default | 详情请参见如何设置Paimon源表的消费位点? | 
| scan.snapshot-id | 指定Paimon源表从哪个snapshot开始消费。 | Integer | 否 | 无 | 详情请参见如何设置Paimon源表的消费位点? | 
| scan.timestamp-millis | 指定Paimon源表从哪个时间点开始消费。 | Integer | 否 | 无 | 详情请参见如何设置Paimon源表的消费位点? | 
| snapshot.num-retained.max | 至多保留几个最新Snapshot不过期。 | Integer | 否 | 2147483647 | 只要满足该配置或snapshot.time-retained其中之一,并同时满足snapshot.num-retained.min,就会触发Snapshot过期。 | 
| snapshot.num-retained.min | 至少保留几个最新Snapshot不过期。 | Integer | 否 | 10 | 无。 | 
| snapshot.time-retained | Snapshot产生多久以后会过期。 | String | 否 | 1h | 只要满足该配置或snapshot.num-retained.max其中之一,并同时满足snapshot.num-retained.min,就会触发snapshot过期。 | 
| write-mode | Paimon表的写入模式。 | String | 否 | change-log | 参数取值如下: 
 关于写入模式的具体介绍,详情请参见写入模式。 | 
| scan.infer-parallelism | 是否自动推断Paimon源表的并发度。 | Boolean | 否 | true | 参数取值如下: 
 | 
| scan.parallelism | Paimon源表的并发度。 | Integer | 否 | 无 | 说明  在作业页签中,资源模式为专家模式时,该参数不生效。 | 
| sink.parallelism | Paimon结果表的并发度。 | Integer | 否 | 无 | 说明  在作业页签中,资源模式为专家模式时,该参数不生效。 | 
| sink.clustering.by-columns | 指定写入Paimon结果表的聚类列。 | String | 否 | 无 | 对于Paimon Append Only表(非主键表),在批作业中配置该参数可以启用聚类写入功能,使数据在特定列上按大小范围聚集分布,从而提升该表的查询速度。 多个列名请使用英文逗号(,)进行分隔,例如 聚类详情请参见Apache Paimon官方文档。 | 
| sink.delete-strategy | 设定校验策略,确保系统能正确处理回撤(-D/-U)类型消息。  | Enum | 否 | NONE | 校验策略取值及Sink算子应当正确处理回撤消息的行为如下: 
 说明  
 | 
更多配置项详情请参见Apache Paimon官方文档。
特色功能详解
数据新鲜度与一致性保证
Paimon结果表使用两阶段提交协议,在每次Flink作业的checkpoint期间提交写入的数据,因此数据新鲜度即为Flink作业的checkpoint间隔。每次提交将会产生至多两个snapshot。
当两个Flink作业同时写入一张Paimon表时,如果两个作业的数据没有写入同一个分桶,则能保证serializable级别的一致性。如果两个作业的数据写入了同一个分桶,则只能保证snapshot isolation级别的一致性。也就是说,表中的数据可能混合了两个作业的结果,但不会有数据丢失。
数据合并机制
当Paimon结果表收到多条具有相同primary key的数据时,为了保持primary key的唯一性,Paimon结果表会将这些数据合并成一条数据。通过指定merge-engine参数,您可以指定数据合并的具体行为。数据合并机制详情如下表所示。
| 合并机制 | 详情 | 
| 去重(Deduplicate) | 去重机制(deduplicate)是默认的数据合并机制。对于多条具有相同primary key的数据,Paimon结果表仅会保留最新一条数据,并丢弃其它具有primary key的数据。 说明  如果最新一条数据是一条delete消息,所有具有该primary key的数据都将被丢弃。 | 
| 部分更新(Partial Update) | 通过指定部分更新机制(partial-update),您可以通过多条消息对数据进行逐步更新,并最终得到完整的数据。具体来说,具有相同primary key的新数据将会覆盖原来的数据,但值为null的列不会进行覆盖。 例如,假设Paimon结果表按顺序收到了以下三条数据: 
 第一列是primary key,则最终结果为<1, 25.2, 10, 'This is a book'>。 说明  
 | 
| 预聚合(Aggregation) | 部分场景下,可能只关心聚合后的值。预聚合机制(aggregation)将具有相同primary key的数据根据您指定的聚合函数进行聚合。对于不属于primary key的每一列,都需要通过 price列将会根据max函数进行聚合,而sales列将会根据sum函数进行聚合。给定两条输入数据 <1, 23.0, 15>和 <1, 30.2, 20>,最终结果为<1, 30.2, 35>。当前支持的聚合函数与对应的数据类型如下: 
 说明  
 | 
增量数据产生机制
通过changelog-producer参数设置相应的增量数据产生机制,Paimon可以为任意输入数据流产生完整的增量数据(所有的update_after数据都有对应的update_before数据)。以下列举了所有的增量数据产生机制,更加详细的介绍请参见Apache Paimon官方文档。
| 机制 | 详情 | 
| None | 设置 例如,假设下游消费者需要计算某一列的总和,如果消费者只看到了最新数据5,它无法断定该如何更新总和。因为如果之前的数据是4,它应该将总和增加1;如果之前的数据是6,它应该将总和减去1。此类消费者对update_before较为敏感,建议不要将增量数据产生机制配置为None,但是其他增量数据产生机制会带来性能损耗。 说明  如果您的下游是数据库之类的对update_before数据不敏感的消费者,则可以将增量数据产生机制配置为None。因此,建议您根据实际需要配置增量数据产生机制。 | 
| Input | 设置 因此,只有当输入数据流本身是完整的增量数据时(例如CDC数据),才能使用这一增量数据产生机制。 | 
| Lookup | 设置 与下文的Full Compaction机制相比,Lookup机制产生增量数据的时效性更好,但总体来看耗费的资源更多。 推荐在对增量数据的新鲜度有较高要求(例如分钟级)的情况下使用。 | 
| Full Compaction | 设置 与上文的Lookup机制相比,Full Compaction机制产生增量数据的时效性更差,但它利用了数据的full compaction过程,不产生额外计算,因此总体来看耗费的资源更少。 推荐在对增量数据的新鲜度要求不高(例如小时级)的情况下使用。 | 
写入模式
Paimon表目前支持的写入模式如下。
| 模式 | 详情 | 
| Change-log | change-log写入模式是Paimon表的默认写入模式。该写入模式支持根据primary key对数据进行插入、删除与更新,您也可以在该写入模式下使用上文提到的数据合并机制与增量数据产生机制。 | 
| Append-only | append-only写入模式仅支持数据的插入,且不支持primary key。该模式比change-log模式更加高效,可在对数据新鲜度要求一般的场景下(例如分钟级新鲜度)作为消息队列的替代品。 关于append-only写入模式的详细介绍,请参见Apache Paimon官方文档。在使用append-only写入模式时,需要注意以下两点: 
 | 
作为CTAS和CDAS的目标端
Paimon表支持实时同步单表或整库级别的数据,在同步过程之中如果上游的表结构发生了变更也会实时同步到Paimon表中。详见管理Paimon表和管理Paimon Catalog。
数据摄入
Paimon连接器可以用于数据摄入YAML作业开发,作为目标端写入。
语法结构
sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse配置项
| 参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 | 
| type | 连接器类型。 | 是 | STRING | 无 | 固定值为 | 
| name | 目标端名称。 | 否 | STRING | 无 | Sink的名称。 | 
| catalog.properties.metastore | Paimon Catalog的类型。 | 否 | STRING | filesystem | 取值如下: 
 | 
| catalog.properties.* | 创建Paimon Catalog的参数。 | 否 | STRING | 无 | 详情请参见管理Paimon Catalog。 | 
| table.properties.* | 创建Paimon table的参数。 | 否 | STRING | 无 | 详情请参见Paimon table options。 | 
| catalog.properties.warehouse | 文件存储的根目录。 | 否 | STRING | 无 | 仅在 | 
| commit.user | 提交数据文件时的用户名。 | 否 | STRING | 无 | 说明  建议为不同的作业设置不同的用户名,方便在出现提交冲突时定位冲突的作业。 | 
| partition.key | 每个分区表的分区字段。 | 否 | STRING | 无 | 不同的表使用 | 
使用示例
使用Paimon作为数据摄入目标端时,根据Paimon Catalog的类型不同可以参考下面的示例进行配置。
- Paimon Catalog为filesystem,写入阿里云OSS的配置示例: - source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxx- 其中,catalog.properties前缀的参数含义请参见创建Paimon Filesystem Catalog。 
- Paimon Catalog为rest,写入阿里云DLF2.5的配置示例: - source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlf- 其中,catalog.properties前缀的参数含义请参见Flink CDC对接Paimon Catalog配置参数。