本文为您介绍如何使用流存储Fluss连接器。
背景信息
阿里云流存储Fluss版采用“湖流一体”架构,实现与Apache Paimon等数据湖格式的无缝对接,在确保数据高时效性的基础上,显著降低实时数仓的构建与运维成本。通过统一的数据流与湖仓协同机制,Fluss助力企业打破数据孤岛,加速湖上数据价值的深度挖掘与高效共享。
类别 | 详情 |
支持类型 | 源表、维表、结果表,数据摄入目标端 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
API种类 | SQL,Datastream和数据摄入YAML |
是否支持更新或删除结果表数据 | 是 |
SQL
Fluss连接器可以在SQL作业中使用,作为源表,维表或者结果表。详情见:引擎对接 - Flink
数据摄入
Fluss连接器作为数据源可以在数据摄入YAML作业中使用。
语法结构
source:
type: xxx
sink:
type: fluss
name: Fluss Sink
bootstrap.servers: localhost:9123
properties.client.security.protocol: sasl
properties.client.security.sasl.mechanism: PLAIN
properties.client.security.sasl.username: developer
properties.client.security.sasl.password: developer-pass
pipeline:
schema.change.behavior: IGNORE表结构变更
当前不支持表结构变更的同步。在使用时,需确保上游无 Schema 变更,或显式设置 schema.change.behavior: IGNORE。Fluss 数据摄入连接器仅会创建新表,不会修改已有表的表结构。
当上游发生 Schema 变更且配置了 schema.change.behavior: IGNORE 时,Fluss 数据摄入连接器的行为如下:
上游新增列:连接器将忽略该新增列,不进行同步。
上游删除列:连接器会将该列的值置为
null。上游重命名列:连接器会将原列的值置为
null,且不会同步新列。上游更改列类型:连接器将报错并拒绝处理。
配置项
参数 | 说明 | 是否必填 | 数据类型 | 默认值 | 备注 |
type | sink类型。 | 是 | STRING | 无 | 固定值为fluss。 |
name | sink名称。 | 否 | STRING | 无 | 无。 |
bootstrap.servers | Fluss Server地址 | 是 | STRING | 无 | 格式为host:port,host:port,host:port,以英文逗号(,)分割。 |
bucket.key | 分桶键 | 否 | String | 无 | 指定每个 Fluss 表的数据分布策略。表之间用 ';' 分隔,分桶键之间用 ',' 分隔。格式:database1.table1:key1,key2;database1.table2:key3。 数据将根据分桶键的哈希值分配到各个桶中(分桶键必须是主键的子集,且不包含主键表的分区键)。 若表有主键但未指定分桶键,则分桶键默认为主键(不含分区键);若表无主键且未指定分桶键,则数据将随机分配到各个桶中。 bucket.key和bucket.num值在创建表后就不再改变。 |
bucket.num | optional | 否 | String | 无 | 每个 Fluss 表的桶数量。表之间用 ';' 分隔。格式:database1.table1:4;database1.table2:8。如果一张表未配置桶数量,默认会采用Server端的配置项default.bucket.number。 bucket.key和bucket.num值在创建表后就不再改变。 |
properties.table.* | optional | 否 | String | 无 | 将 Fluss table 支持的参数传递给 pipeline,参考,See Fluss table options. |
properties.client.* | optional | 否 | String | 无 | 将 Fluss client 支持的参数传递给 pipeline,See Fluss client options. |
类型映射
数据摄入类型映射如下表所示。
CDC字段类型 | Fluss 字段类型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) | DECIMAL(p, s) |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
TIMESTAMP_LTZ | TIMESTAMP_LTZ |
CHAR(n) | CHAR(n) |
VARCHAR(n) | STRING |
BINARY(n) | 暂不支持 |
VARBINARY(N) |