流存储Fluss

本文为您介绍如何使用流存储Fluss连接器。

背景信息

阿里云流存储Fluss采用“湖流一体”架构,实现与Apache Paimon等数据湖格式的无缝对接,在确保数据高时效性的基础上,显著降低实时数仓的构建与运维成本。通过统一的数据流与湖仓协同机制,Fluss助力企业打破数据孤岛,加速湖上数据价值的深度挖掘与高效共享。

类别

详情

支持类型

源表、维表、结果表,数据摄入目标端

运行模式

流模式和批模式

数据格式

暂不支持

API种类

SQL,Datastream和数据摄入YAML

是否支持更新或删除结果表数据

SQL

Fluss连接器可以在SQL作业中使用,作为源表,维表或者结果表。详情请参见:引擎对接 - Flink

特色功能

功能

详情

全增量一体化消费

支持全量、增量及全增量一体化消费。

多流写入的宽表Merge与局部更新

只更新修改部分列的数据,而非整行更新。

Delta Join(多流Join新范式)

新型的 Join 方案,能够在保持双流 Join 语义的同时,将数据更新行为下沉到Fluss表中完成,从而显著降低Flink的资源消耗,提升作业的稳定性和执行效率。

湖流一体

将实时流式存储(Fluss)与基于 Paimon 的数据湖(由 DLF 统一管理元数据)进行深度集成,实现“一份数据,两种视图”,即在同一个逻辑表中,同时提供低延迟的实时数据访问和高吞吐的历史数据分析能力。

管理分区

支持分区表创建、读写数据,并可自动创建和清理分区。

前提条件

  • 对目标表有读写权限,参考授权访问Fluss集群

  • 连接阿里云流存储Fluss集群时,Flink工作空间和Fluss集群需要在同一VPC内。

数据摄入

Fluss连接器作为数据下游可以在数据摄入YAML作业中使用。

语法结构

source:
   type: xxx

sink:
  type: fluss
  name: Fluss Sink
  bootstrap.servers: localhost:9123
  properties.client.security.protocol: sasl
  properties.client.security.sasl.mechanism: PLAIN
  properties.client.security.sasl.username: developer
  properties.client.security.sasl.password: developer-pass
  

pipeline:
  schema.change.behavior: IGNORE

表结构变更

当前不支持表结构变更的同步。在使用时,需确保上游无 Schema 变更,或显式设置 schema.change.behavior: IGNORE。Fluss 数据摄入连接器仅会创建新表,不会修改已有表的表结构。

当上游发生 Schema 变更且配置了 schema.change.behavior: IGNORE 时,Fluss 数据摄入连接器的行为如下:

  • 上游新增列:连接器将忽略该新增列,不进行同步。

  • 上游删除列:连接器会将该列的值置为 null

  • 上游重命名列:连接器会将原列的值置为 null,且不会同步新列。

  • 上游更改列类型:连接器将报错并拒绝处理。

配置项

参数

说明

是否必填

数据类型

默认值

备注

type

sink类型。

STRING

固定值为fluss。

name

sink名称。

STRING

无。

bootstrap.servers

Fluss Server地址

STRING

格式为host:port,host:port,host:port,以英文逗号(,)分隔。

bucket.key

分桶键

String

指定每个 Fluss 表的数据分布策略。表之间用';'分隔,分桶键之间用','分隔。

格式:database1.table1:key1,key2;database1.table2:key3

数据将根据分桶键的哈希值分配到各个桶中(分桶键必须是主键的子集,且不包含主键表的分区键)。 若表有主键但未指定分桶键,则分桶键默认为主键(不含分区键);若表无主键且未指定分桶键,则数据将随机分配到各个桶中。

bucket.keybucket.num值在创建表后就不再改变。

bucket.num

optional

String

每个 Fluss 表的桶数量。表之间用';'分隔。

格式:database1.table1:4;database1.table2:8

如果一张表未配置桶数量,默认会采用Server端的配置项default.bucket.number

bucket.keybucket.num值在创建表后就不再改变。

properties.table.*

optional

String

将 Fluss table 支持的参数传递给 pipeline,详情请参见Fluss table options

properties.client.*

optional

String

将 Fluss client 支持的参数传递给 pipeline,详情请参见Fluss client options

类型映射

数据摄入类型映射如下表所示。

CDC字段类型

Fluss 字段类型

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

TIMESTAMP_LTZ

TIMESTAMP_LTZ

CHAR(n)

CHAR(n)

VARCHAR(n)

STRING

BINARY(n)

暂不支持

VARBINARY(N)