流存储Fluss

本文为您介绍如何使用流存储Fluss连接器。

背景信息

阿里云流存储Fluss采用“湖流一体”架构,实现与Apache Paimon等数据湖格式的无缝对接,在确保数据高时效性的基础上,显著降低实时数仓的构建与运维成本。通过统一的数据流与湖仓协同机制,Fluss助力企业打破数据孤岛,加速湖上数据价值的深度挖掘与高效共享。

类别

详情

支持类型

源表、维表、结果表,数据摄入目标端

运行模式

流模式和批模式

数据格式

暂不支持

API种类

SQL,Datastream和数据摄入YAML

是否支持更新或删除结果表数据

SQL

Fluss连接器可以在SQL作业中使用,作为源表,维表或者结果表。详情见:引擎对接 - Flink

数据摄入

Fluss连接器作为数据源可以在数据摄入YAML作业中使用。

语法结构

source:
   type: xxx

sink:
  type: fluss
  name: Fluss Sink
  bootstrap.servers: localhost:9123
  properties.client.security.protocol: sasl
  properties.client.security.sasl.mechanism: PLAIN
  properties.client.security.sasl.username: developer
  properties.client.security.sasl.password: developer-pass
  

pipeline:
  schema.change.behavior: IGNORE

表结构变更

当前不支持表结构变更的同步。在使用时,需确保上游无 Schema 变更,或显式设置 schema.change.behavior: IGNORE。Fluss 数据摄入连接器仅会创建新表,不会修改已有表的表结构。

当上游发生 Schema 变更且配置了 schema.change.behavior: IGNORE 时,Fluss 数据摄入连接器的行为如下:

  • 上游新增列:连接器将忽略该新增列,不进行同步。

  • 上游删除列:连接器会将该列的值置为 null

  • 上游重命名列:连接器会将原列的值置为 null,且不会同步新列。

  • 上游更改列类型:连接器将报错并拒绝处理。

配置项

参数

说明

是否必填

数据类型

默认值

备注

type

sink类型。

STRING

固定值为fluss。

name

sink名称。

STRING

无。

bootstrap.servers

Fluss Server地址

STRING

格式为host:port,host:port,host:port,以英文逗号(,)分割。

bucket.key

分桶键

String

指定每个 Fluss 表的数据分布策略。表之间用 ';' 分隔,分桶键之间用 ',' 分隔。格式:database1.table1:key1,key2;database1.table2:key3。 数据将根据分桶键的哈希值分配到各个桶中(分桶键必须是主键的子集,且不包含主键表的分区键)。 若表有主键但未指定分桶键,则分桶键默认为主键(不含分区键);若表无主键且未指定分桶键,则数据将随机分配到各个桶中。

bucket.keybucket.num值在创建表后就不再改变。

bucket.num

optional

String

每个 Fluss 表的桶数量。表之间用 ';' 分隔。格式:database1.table1:4;database1.table2:8。如果一张表未配置桶数量,默认会采用Server端的配置项default.bucket.number。

bucket.keybucket.num值在创建表后就不再改变。

properties.table.*

optional

String

将 Fluss table 支持的参数传递给 pipeline,参考,See Fluss table options.

properties.client.*

optional

String

将 Fluss client 支持的参数传递给 pipeline,See Fluss client options.

类型映射

数据摄入类型映射如下表所示。

CDC字段类型

Fluss 字段类型

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

TIMESTAMP_LTZ

TIMESTAMP_LTZ

CHAR(n)

CHAR(n)

VARCHAR(n)

STRING

BINARY(n)

暂不支持

VARBINARY(N)