管理表

更新时间:
复制为 MD 格式

本文将介绍如何创建Fluss表。

使用限制

仅实时计算引擎VVR 11.2及以上版本进行管理。

创建主键表

  1. 登录实时计算控制台

  2. 单击流计算Flink目标工作空间操作列下的控制台

  3. 在左侧导航栏,单击数据开发 > 数据查询 > 新建临时查询脚本

  4. 编写SQL并运行。

CREATE TABLE `my-catalog`.`my_db`.`my_pk_table` (
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (shop_id, user_id) NOT ENFORCED
) WITH (
  'bucket.num' = '4'
);
说明

建表时必须规划分桶数量。

日志表支持后期调整分桶配置,但主键表的分桶数一旦确立,便无法修改。因此,建议在创建表时直接根据数据规模设定分桶,计算公式如下:

分桶数 = 单分区总数据量 ÷ 目标桶大小

创建日志表

CREATE TABLE `my-catalog`.`my_db`.`my_log_table` (
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) WITH (
  'bucket.num' = '8'
);

创建分区表

  • 目前 Fluss 的分区键只支持 String 类型。

  • 对于分区主键表,其分区键(下面例子中的 dt 字段)必须是主键的子集。

分区主键表

分区键为 dt,主键为 [dt, shot_id, user_id]。

CREATE TABLE `my-catalog`.`my_db`.`my_part_pk_table` (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
  'bucket.num' = '4'
);

分区日志表

创建了一张分区日志表,其分区键为 dt。

CREATE TABLE `my-catalog`.`my_db`.`my_part_log_table` (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT
) PARTITIONED BY (dt) WITH (
  'bucket.num' = '4'
);

新建分区

ALTER TABLE my-catalog.my_db.my_part_pk_table 
ADD PARTITION (dt = '2025-03-05');

创建自动分区表

自动分区表在分区表的基础上,加入了自动分区的逻辑,Fluss 会根据你配置的自动分区策略为你提前创建好分区。

自动分区主键表

分区键为 dt,自动分区创建间隔为天(day)。

CREATE TABLE `my-catalog`.`my_db`.`my_auto_part_pk_table` (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
  'bucket.num' = '4'
  'table.auto-partition.enabled' = 'true',
  'table.auto-partition.time-unit' = 'day'
);

自动分区日志表

分区键为 dt,自动分区创建间隔为天(day)。

CREATE TABLE `my-catalog`.`my_db`.`my_auto_part_log_table` (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT
) PARTITIONED BY (dt) WITH (
  'bucket.num' = '4'
  'table.auto-partition.enabled' = 'true',
  'table.auto-partition.time-unit' = 'day'
);

更新表

新增列

说明

该功能要求 Flink 实时计算引擎(VVR)版本为 11.5 及以上,同时 Fluss 版本需为 0.8-ali-3.0 及以上。为避免影响读写操作,建议先将 Flink 作业升级到 11.5 及以上版本,然后再将 Fluss 集群升级到 0.8-ali-3.0。

Fluss 支持通过添加新列来演进表的结构,支持添加任意数据类型的列,包括复杂类型(如 ROWMAP 和 ARRAY)。

这是一个轻量级、仅涉及元数据的操作,具有以下优势:

  • 零数据重写:添加列时无需重写或迁移现有的数据文件。

  • 即时执行:无论表的规模如何,该操作都能在毫秒级别完成。

  • 高可用性:在整个表结构演进过程中,表始终保持在线且完全可用,不会对正在运行的客户端造成任何中断。

目前,此功能具有以下限制:

  • 列顺序:新增列将始终追加到现有列列表的末尾。

  • 可为空性:为了确保与现有数据的兼容性,只能向表中添加可为空的列。

  • 嵌套字段:目前不支持在现有的嵌套 ROW 类型中添加字段。此类操作属于“更新列类型”的范畴,将在未来的版本中提供支持。

您可以使用 ALTER TABLE 语句添加单个列或多个列。

-- Add a single column at the end of the table
ALTER TABLE `my-catalog`.`my_db`.`my_pk_table` ADD user_email STRING COMMENT 'User email address';

-- Add multiple columns at the end of the table
ALTER TABLE `my-catalog`.`my_db`.`my_pk_table` ADD (
    user_email STRING COMMENT 'User email address',
    order_quantity INT
);

复制表(Schema 复制)

Fluss 支持通过create table like语法创建一张 schema,分区信息和表参数相同的表。

该能力仍是创建Fluss表,需在数据开发 > 数据查询 > 新建临时查询脚本中使用。

-- 有一张 datagen 的临时表
CREATE TEMPORARY TABLE datagen (
 user_id BIGINT,
 item_id BIGINT,
 behavior STRING,
 dt STRING,
 hh STRING
) WITH (
 'connector' = 'datagen',
 'rows-per-second' = '10'
);
-- 创建一张去掉 like option 的 Fluss 表
CREATE TABLE my-catalog.my_db.my_like_db LIKE datagen (EXCLUDING OPTIONS);

删除表

DROP TABLE `my-catalog`.`my_db`.`my_db`;