本文将介绍如何创建Fluss表。
使用限制
仅实时计算引擎VVR 11.2及以上版本进行管理。
创建主键表
登录实时计算控制台。
单击流计算Flink目标工作空间操作列下的控制台。
在左侧导航栏,单击。
编写SQL并运行。
CREATE TABLE `my-catalog`.`my_db`.`my_pk_table` (
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (shop_id, user_id) NOT ENFORCED
) WITH (
'bucket.num' = '4'
);建表时必须规划分桶数量。
日志表支持后期调整分桶配置,但主键表的分桶数一旦确立,便无法修改。因此,建议在创建表时直接根据数据规模设定分桶,计算公式如下:
分桶数 = 单分区总数据量 ÷ 目标桶大小
创建日志表
CREATE TABLE `my-catalog`.`my_db`.`my_log_table` (
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) WITH (
'bucket.num' = '8'
);创建分区表
目前 Fluss 的分区键只支持 String 类型。
对于分区主键表,其分区键(下面例子中的 dt 字段)必须是主键的子集。
分区主键表
分区键为 dt,主键为 [dt, shot_id, user_id]。
CREATE TABLE `my-catalog`.`my_db`.`my_part_pk_table` (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket.num' = '4'
);分区日志表
创建了一张分区日志表,其分区键为 dt。
CREATE TABLE `my-catalog`.`my_db`.`my_part_log_table` (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT
) PARTITIONED BY (dt) WITH (
'bucket.num' = '4'
);新建分区
ALTER TABLE my-catalog.my_db.my_part_pk_table
ADD PARTITION (dt = '2025-03-05');创建自动分区表
自动分区表在分区表的基础上,加入了自动分区的逻辑,Fluss 会根据你配置的自动分区策略为你提前创建好分区。
自动分区主键表
分区键为 dt,自动分区创建间隔为天(day)。
CREATE TABLE `my-catalog`.`my_db`.`my_auto_part_pk_table` (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket.num' = '4'
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'day'
);自动分区日志表
分区键为 dt,自动分区创建间隔为天(day)。
CREATE TABLE `my-catalog`.`my_db`.`my_auto_part_log_table` (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT
) PARTITIONED BY (dt) WITH (
'bucket.num' = '4'
'table.auto-partition.enabled' = 'true',
'table.auto-partition.time-unit' = 'day'
);更新表
新增列
该功能要求 Flink 实时计算引擎(VVR)版本为 11.5 及以上,同时 Fluss 版本需为 0.8-ali-3.0 及以上。为避免影响读写操作,建议先将 Flink 作业升级到 11.5 及以上版本,然后再将 Fluss 集群升级到 0.8-ali-3.0。
Fluss 支持通过添加新列来演进表的结构,支持添加任意数据类型的列,包括复杂类型(如 ROW、MAP 和 ARRAY)。
这是一个轻量级、仅涉及元数据的操作,具有以下优势:
零数据重写:添加列时无需重写或迁移现有的数据文件。
即时执行:无论表的规模如何,该操作都能在毫秒级别完成。
高可用性:在整个表结构演进过程中,表始终保持在线且完全可用,不会对正在运行的客户端造成任何中断。
目前,此功能具有以下限制:
列顺序:新增列将始终追加到现有列列表的末尾。
可为空性:为了确保与现有数据的兼容性,只能向表中添加可为空的列。
嵌套字段:目前不支持在现有的嵌套
ROW类型中添加字段。此类操作属于“更新列类型”的范畴,将在未来的版本中提供支持。
您可以使用 ALTER TABLE 语句添加单个列或多个列。
-- Add a single column at the end of the table
ALTER TABLE `my-catalog`.`my_db`.`my_pk_table` ADD user_email STRING COMMENT 'User email address';
-- Add multiple columns at the end of the table
ALTER TABLE `my-catalog`.`my_db`.`my_pk_table` ADD (
user_email STRING COMMENT 'User email address',
order_quantity INT
);复制表(Schema 复制)
Fluss 支持通过create table like语法创建一张 schema,分区信息和表参数相同的表。
该能力仍是创建Fluss表,需在中使用。
-- 有一张 datagen 的临时表
CREATE TEMPORARY TABLE datagen (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
);
-- 创建一张去掉 like option 的 Fluss 表
CREATE TABLE my-catalog.my_db.my_like_db LIKE datagen (EXCLUDING OPTIONS);删除表
DROP TABLE `my-catalog`.`my_db`.`my_db`;