本文为您介绍如何创建Dynamic Table。
使用限制
Dynamic Table的使用限制请参见Dynamic Table支持范围和限制。
语法
建表语法
Dynamic Table的建表语法如下:
CREATE DYNAMIC TABLE [IF NOT EXISTS] <schema.tablename>(
[col_name],
[col_name]
) [PARTITION BY LIST (col_name)]
WITH (
[refresh_mode='[full|incremental]',]
[auto_refresh_enable='[true|false',]
--增量刷新专用参数:
[incremental_auto_refresh_schd_start_time='[immediate|<timestamptz>]',]
[incremental_auto_refresh_interval='[<num> {minute|minutes|hour|hours]',]
[incremental_guc_hg_computing_resource='[ local | serverless]',]
[incremental_guc_hg_experimental_serverless_computing_required_cores='<num>',]
--全量刷新专用参数:
[full_auto_refresh_schd_start_time='[immediate|<timestamptz>]',]
[full_auto_refresh_interval='[<num> {minute|minutes|hour|hours]',]
[full_guc_hg_computing_resource='[ local | serverless]',]--hg_full_refresh_computing_resource默认serverless,可以db级别设置,用户可以不设置
[full_guc_hg_experimental_serverless_computing_required_cores='<num>',]
--共用参数,允许设置guc:
[refresh_guc_<guc>='xxx]',]
-- Dynamic Table的通用属性:
[orientation = '[column | row | row,column]',]
[table_group = '[tableGroupName]',]
[distribution_key = 'columnName[,...]]',]
[clustering_key = '[columnName{:asc]} [,...]]',]
[event_time_column = '[columnName [,...]]',]
[bitmap_columns = '[columnName [,...]]',]
[dictionary_encoding_columns = '[columnName [,...]]',]
[time_to_live_in_seconds = '<non_negative_literal>',]
[storage_mode = '[hot | cold]']
)
AS
<query> --query的定义
参数说明
刷新模式与资源
参数分类 | 参数名 | 描述 | 是否必填 | 默认值 |
参数分类 | 参数名 | 描述 | 是否必填 | 默认值 |
共用刷新参数 | refresh_mode | 指定数据的刷新模式,支持full、incremental两种刷新模式。 若不设置该参数,则表示不进行刷新。 | 否 | 无 |
auto_refresh_enable | 是否开启自动刷新。取值为:
| 否 | false | |
refresh_guc_<guc> | 支持对Refresh设置GUC参数,支持的GUC请参见GUC参数。 例如GUC参数中 | 否 | 无 | |
增量刷新(incremental) | incremental_auto_refresh_schd_start_time | 增量刷新的开始时间。取值为:
| 否 | immediate |
incremental_auto_refresh_interval | 增量刷新的时间间隔,单位有minute、minutes、hour和hours。
| 否 | 无 | |
incremental_guc_hg_computing_resource | 指定执行增量刷新的计算资源,取值为:
支持使用 | 否 | serverless | |
incremental_guc_hg_experimental_serverless_computing_required_cores | 如果使用Serverless资源执行刷新,则需要设置刷新的计算资源量。 不同规格的实例可使用的Serverless资源有一定的限制,详情请参见Serverless Computing使用指南。 | 否 | 无 | |
全量刷新(full) | full_auto_refresh_schd_start_time | 全量刷新的开始时间。取值为:
| 否 | immediate |
full_auto_refresh_interval | 全量刷新的时间间隔,单位有minute、minutes、hour和hours。
| 否 | 无 | |
full_guc_hg_computing_resource | 指定执行全量刷新的计算资源,取值为:
支持使用 | 否 | serverless | |
full_guc_hg_experimental_serverless_computing_required_cores | 如果使用Serverless执行刷新,则需要设置刷新的计算资源量。 不同规格的实例可使用的Serverless资源有一定的限制,详情请参见Serverless Computing使用指南。 | 否 | 无 |
表属性参数
参数 | 描述 | 是否必填 | 默认值 | |
full | incremental |
参数 | 描述 | 是否必填 | 默认值 | |
full | incremental | |||
col_name | Dynamic Table的字段名。 可以显式指定Dynamic Table的列名,但是不能指定列的属性和数据类型,引擎会自动推导。 若指定了列的属性和数据类型,会导致引擎推导不正确。 | 否 | Query列名 | Query列名 |
orientation | 指定Dynamic Table的存储模式,取值如下:
| 否 | column | column |
table_group | 指定Dynamic Table所在的Table Group,默认为当前数据库下的Default Table Group,详情请参见Table Group与Shard Count操作指南。 | 否 | 默认Table Group名称 | 默认Table Group名称 |
distribution_key | 指定Dynamic Table的Distribution Key,详情请参见分布键Distribution Key。 | 否 | 无 | 无 |
clustering_key | 指定Dynamic Table的Clustering Key,详情请参见聚簇索引Clustering Key。 | 否 | 允许设置,有默认推导值。 | 允许设置,有默认推导值。 |
event_time_column | 指定Dynamic Table的event_time_column,详情请参见Event Time Column(Segment Key)。 | 否 | 无 | 无 |
bitmap_columns | 指定Dynamic Table的bitmap_columns,详情请参见位图索引Bitmap。 | 否 | TEXT类型字段 | TEXT类型字段 |
dictionary_encoding_columns | 指定Dynamic Table的dictionary_encoding_columns,详情请参见字典编码Dictionary Encoding。 | 否 | TEXT类型字段 | TEXT类型字段 |
time_to_live_in_seconds | 指定Dynamic Table数据的生命周期。 | 否 | 永久 | 永久 |
storage_mode | Dynamic Table的存储模式,取值如下:
存储模式详情请参见数据分层存储。 | 否 | hot | hot |
PARTITION BY LIST | 是否为分区表,支持创建Dynamic Table分区表,使用方式与普通分区表相同,不同分区子表可以设置不同的刷新模式,来满足业务不同场景的时效性需求。 | 否 | 非分区表 | 非分区表 |
Query
Dynamic Table中数据生成的Query,设置的刷新模式不同,支持的Query类型和基表类型也不同,详情请参见Dynamic Table支持范围和限制。
全量刷新
全量刷新会将Query中的数据以全量的方式写入Dynamic Table。相比于增量刷新,全量刷新的优势在于:
支持更多的基表类型。
支持更丰富的Query类型、算子支持等。
全量刷新相比增量刷新,处理的数据量更多,消耗的资源可能更多,因此更推荐的应用场景包括:定期报表查看、定期回刷数据等。
更多关于全量刷新的信息请参见全量刷新。
增量刷新
增量刷新会感知基表数据的变化,然后将Query中的数据以增量的方式写入到Dynamic Table。相比于全量刷新,增量刷新处理的数据量更少,处理时效性会更高。在实际应用中,如果有分钟级近实时数据查询的需求,更推荐使用增量刷新模式,但在使用增量Dynamic Table时需要注意如下几点:
增量刷新Dynamic Table需要为基表开启Binlog,如果是维表JOIN,维表无需开启Binlog。
基表开启Binlog后,会有一定存储开销,您可参考查看表存储明细查询Binlog占用存储。
开启增量刷新后,系统会在后台生成一张状态表,用于记录中间聚合结果,关于状态表技术原理请参见增量刷新(Incremental)。状态表会存储中间聚合数据,因此会占用一定的存储,查看存储请参见查看Dynamic Table表结构和血缘。
当前Hologres增量刷新支持新增数据刷新以及全增量一体刷新,来满足业务的不同诉求,详情使用如下:
增量刷新
对于已经存在的基表,开启Binlog并为其创建增量Dynamic Table后,基表中已经存在的数据不会被同步到Dynamic Table中,只有新增的数据会被同步到Dynamic Table,因此可能会存在数据一致性问题。
当前增量刷新支持的Query和算子请参见支持的功能。但需注意,如果建表有多表JOIN语法,支持的JOIN语法如下:
维表JON
维表JOIN的语义是:对每条数据,只会关联当时维表的最新版本数据,即JOIN行为只发生在处理时间(Processing Time)。如果JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会同步更新。示例SQL如下:
维表的JOIN操作与参与表的数据量无关,只要在SQL中采用维表的JOIN语义即可。
CREATE TABLE users ( user_id INT, user_name TEXT, PRIMARY KEY (user_id) ); INSERT INTO users VALUES(1, 'hologres'); CREATE TABLE orders ( order_id INT, user_id INT, PRIMARY KEY (order_id) ) WITH (binlog_level = 'replica'); INSERT INTO orders VALUES(1, 1); CREATE DYNAMIC TABLE dt WITH (refresh_mode = 'incremental') AS SELECT order_id, orders.user_id, user_name -- FOR SYSTEM_TIME AS OF PROCTIME()用于标识users做为维表 FROM orders LEFT JOIN users FOR SYSTEM_TIME AS OF PROCTIME() ON orders.user_id = users.user_id; -- 刷新之后可以看到一条关联数据 REFRESH TABLE dt; SELECT * FROM dt; order_id | user_id | user_name ----------+---------+----------- 1 | 1 | hologres (1 row) UPDATE users SET user_name = 'dynamic table' WHERE user_id = 1; INSERT INTO orders VALUES(4, 1); -- 刷新之后可以看到两条关联数据,维表更新只对新增数据生效,无法订正已关联数据 REFRESH TABLE dt; SELECT * FROM dt;
返回结果如下:
order_id | user_id | user_name ----------+---------+--------------- 1 | 1 | hologres 4 | 1 | dynamic table (2 rows)
双流JOIN(多表JOIN)(Beta)
双流JOIN即多表JOIN,语义与OLAP查询相同,基于HASH JOIN实现,支持INNER JOIN、LEFT/RIGHT/FULL OUTER JOIN四种JOIN类型,从V3.0.26版本开始支持,在使用之前请先升级实例版本,同时需要执行如下GUC开启双流JOIN。
-- Session级别开启 SET hg_experimental_incremental_dynamic_table_enable_hash_join TO ON; --DB级别开启,新建连接生效 ALTER database <db_name> SET hg_experimental_incremental_dynamic_table_enable_hash_join TO ON;
示例SQL语法如下:
CREATE TABLE users ( user_id INT, user_name TEXT, PRIMARY KEY (user_id) ) WITH (binlog_level = 'replica'); INSERT INTO users VALUES(1, 'hologres'); CREATE TABLE orders ( order_id INT, user_id INT, PRIMARY KEY (order_id) ) WITH (binlog_level = 'replica'); INSERT INTO orders VALUES(1, 1); CREATE DYNAMIC TABLE dt WITH (refresh_mode = 'incremental') AS SELECT order_id, orders.user_id, user_name FROM orders LEFT JOIN users ON orders.user_id = users.user_id; -- 刷新之后可以看到一条关联数据 REFRESH TABLE dt; SELECT * FROM dt; order_id | user_id | user_name ----------+---------+----------- 1 | 1 | hologres (1 row) UPDATE users SET user_name = 'dynamic table' WHERE user_id = 1; INSERT INTO orders VALUES(4, 1); -- 刷新之后可以看到两条关联数据,维表更新对所有数据生效,可以订正已关联数据 REFRESH TABLE dt; SELECT * FROM dt;
返回结果如下:
order_id | user_id | user_name ----------+---------+--------------- 1 | 1 | dynamic table 4 | 1 | dynamic table (2 rows)
全增量一体刷新
当前增量Dynamic Table也支持全增量一体消费,即首先消费基表的全量数据,再消费基表新增的数据,该功能需要通过GUC来控制:
请在第一次Refresh前执行该GUC,否则全量数据将无法刷新到Dynamic Table中。
-- 打开全增量数据一体消费的GUC
SET hg_experimental_enable_hybrid_incremental_mode = true;
示例如下:
--全增量一体化增量Dynamic table demo
--准备基表,并开启binlog插入数据
CREATE TABLE base_sales(
day TEXT NOT NULL,
hour INT,
user_id BIGINT,
ts TIMESTAMPTZ,
amount FLOAT,
pk TEXT NOT NULL PRIMARY KEY
);
-- 为基表导入数据
INSERT INTO base_sales VALUES ('2024-08-29',1,222222,'2024-08-29 16:41:19.141528+08',5,'ddd');
-- 为基表打开Binlog
ALTER TABLE base_sales SET (binlog_level = replica);
-- 再为基表导入增量数据
INSERT INTO base_sales VALUES ('2024-08-29',2,3333,'2024-08-29 17:44:19.141528+08',100,'aaaaa');
-- 创建自动刷新的增量Dynamic Table,并开启全增量数据一体消费的GUC
SET hg_experimental_enable_hybrid_incremental_mode = true;
CREATE DYNAMIC TABLE sales_incremental
WITH (
refresh_mode='incremental',
auto_refresh_enable='true',
incremental_auto_refresh_schd_start_time = 'immediate',
incremental_auto_refresh_interval = '3 minutes',
incremental_guc_hg_experimental_enable_hybrid_incremental_mode= 'true')
AS
SELECT day, hour, SUM(amount), COUNT(1)
FROM base_sales
GROUP BY day, hour;
对比数据一致性:
查询基表
SELECT day, hour, SUM(amount), COUNT(1) FROM base_sales GROUP BY day, hour;
返回结果:
day hour sum count 2024-08-29 2 100 1 2024-08-29 1 5 1
查询Dynamic Table
SELECT * FROM sales_incremental;
返回结果:
day hour sum count 2024-08-29 1 5 1 2024-08-29 2 100 1
使用示例
示例1:创建全量刷新Dynamic Table并自动开始执行刷新
执行下述操作前,请先参考一键导入公共数据集将tpch_10g公共数据集的数据导入至Hologres。
--创建 “test” Schema
CREATE SCHEMA test;
--创建单表全量刷新的dynamic table,并立即开始刷新,每1小时刷新一次。
CREATE DYNAMIC TABLE test.thch_q1_full
WITH (
refresh_mode='full',
auto_refresh_enable='true',
full_auto_refresh_interval='1 hours',
full_guc_hg_computing_resource='serverless',
full_guc_hg_experimental_serverless_computing_required_cores='32'
)
AS
SELECT
l_returnflag,
l_linestatus,
SUM(l_quantity) AS sum_qty,
SUM(l_extendedprice) AS sum_base_price,
SUM(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
AVG(l_quantity) AS avg_qty,
AVG(l_extendedprice) AS avg_price,
AVG(l_discount) AS avg_disc,
COUNT(*) AS count_order
FROM
hologres_dataset_tpch_10g.lineitem
WHERE
l_shipdate <= DATE '1998-12-01' - INTERVAL '120' DAY
GROUP BY
l_returnflag,
l_linestatus;
示例2:创建增量刷新的Dynamic Table并指定开始刷新时间
执行下述操作前,请先参考一键导入公共数据集将tpch_10g公共数据集的数据导入至Hologres。
创建增量Dynamic Table示例如下:
在创建增量Dynamic Table之前,需要为基表开启Binlog(维表不需要开启)。
--为基表开binlog:
BEGIN;
CALL set_table_property('hologres_dataset_tpch_10g.lineitem', 'binlog.level', 'replica');
COMMIT;
--创建单表赠量刷新的dynamic table,并指定开始刷新时间,每3min刷新一次
CREATE DYNAMIC TABLE public.tpch_q1_incremental
WITH (
refresh_mode='incremental',
auto_refresh_enable='true',
incremental_auto_refresh_schd_start_time='2024-09-15 23:50:0',
incremental_auto_refresh_interval='3 minutes',
incremental_guc_hg_computing_resource='serverless',
incremental_guc_hg_experimental_serverless_computing_required_cores='30'
) AS SELECT
l_returnflag,
l_linestatus,
COUNT(*) AS count_order
FROM
hologres_dataset_tpch_10g.lineitem
WHERE
l_shipdate <= DATE '1998-12-01' - INTERVAL '120' DAY
GROUP BY
l_returnflag,
l_linestatus
;
示例3:创建多表JOIN的全量刷新Dynamic Table
--创建query为多表join的dynamic table,全量刷新模式,每3小时刷新一次。
CREATE DYNAMIC TABLE dt_q_full
WITH (
refresh_mode='full',
auto_refresh_enable='true',
full_auto_refresh_schd_start_time='immediate',
full_auto_refresh_interval='3 hours',
full_guc_hg_computing_resource='serverless',
full_guc_hg_experimental_serverless_computing_required_cores='64'
)
AS
SELECT
o_orderpriority,
COUNT(*) AS order_count
FROM
hologres_dataset_tpch_10g.orders
WHERE
o_orderdate >= DATE '1996-07-01'
AND o_orderdate < DATE '1996-07-01' + INTERVAL '3' MONTH
AND EXISTS (
SELECT
*
FROM
hologres_dataset_tpch_10g.lineitem
WHERE
l_orderkey = o_orderkey
AND l_commitdate < l_receiptdate
)
GROUP BY
o_orderpriority;
示例4:创建维表JOIN的增量刷新Dynamic Table
创建维表JOIN的增量Dynamic Table示例如下:
在创建增量Dynamic Table之前,需要为基表开启Binlog(维表不需要开启)。
维表JOIN的语义是:对每条数据,只会关联当时维表的最新版本数据,即JOIN行为只发生在处理时间(Processing Time)。如果JOIN行为发生后,维表中的数据发生了变化(新增、更新或删除),则已关联的维表数据不会被同步变化。SQL示例如下:
--明细表
BEGIN;
CREATE TABLE public.sale_detail(
app_id TEXT,
uid TEXT,
product TEXT,
gmv BIGINT,
order_time TIMESTAMPTZ
);
--为基表开binlog,维表不需要开启
CALL set_table_property('public.sale_detail', 'binlog.level', 'replica');
COMMIT;
--属性表
CREATE TABLE public.user_info(
uid TEXT,
province TEXT,
city TEXT
);
CREATE DYNAMIC TABLE public.dt_sales_incremental
WITH (
refresh_mode='incremental',
auto_refresh_enable='true',
incremental_auto_refresh_schd_start_time='2024-09-15 00:00:00',
incremental_auto_refresh_interval='5 minutes',
incremental_guc_hg_computing_resource='serverless',
incremental_guc_hg_experimental_serverless_computing_required_cores='128')
AS
SELECT
sale_detail.app_id,
sale_detail.uid,
product,
SUM(sale_detail.gmv) AS sum_gmv,
sale_detail.order_time,
user_info.province,
user_info.city
FROM public.sale_detail
INNER JOIN public.user_info FOR SYSTEM_TIME AS OF PROCTIME()
ON sale_detail.uid =user_info.uid
GROUP BY sale_detail.app_id,sale_detail.uid,sale_detail.product,sale_detail.order_time,user_info.province,user_info.city;
示例5:创建分区Dynamic Table
以实时交易大屏为例,业务通常会有近实时查看当天数据,同时又需要修正历史数据的需求。在这种场景下,我们通常使用Dynamic Table增量刷新和全量刷新来实现。做法如下:
创建分区表作为基表,最新分区采用实时/近实时写入方式,历史分区偶尔有数据修正的操作。
创建Dynamic Table,将其作为分区父表,最新的分区使用增量刷新模式,满足业务的近实时数据分析需求。
将历史分区切换成全量刷新模式,如果源表的历史分区进行过数据修正/回刷,则Dynamic Table的历史分区也可以使用全量刷新模式进行一次回刷,同时建议结合Serverless提升回刷速度。
示例如下:
准备基表和数据。
基表为分区表,最新分区采用实时数据写入方式。
-- 创建分区源表 CREATE TABLE base_sales( uid INT, opreate_time TIMESTAMPTZ, amount FLOAT, tt TEXT NOT NULL, ds TEXT, PRIMARY KEY(ds) ) PARTITION BY LIST (ds) ; --历史分区 CREATE TABLE base_sales_20240615 PARTITION OF base_sales FOR VALUES IN ('20240615'); INSERT INTO base_sales_20240615 VALUES (2,'2024-06-15 16:18:25.387466+08','111','2','20240615'); --最新分区,一般是实时写入 CREATE TABLE base_sales_20240616 PARTITION OF base_sales FOR VALUES IN ('20240616'); INSERT INTO base_sales_20240616 VALUES (1,'2024-06-16 16:08:25.387466+08','2','1','20240616');
创建Dynamic Table分区父表,父表只设置Query的定义,不设置刷新模式。
--创建extension CREATE EXTENSION roaringbitmap; CREATE DYNAMIC TABLE partition_dt_base_sales PARTITION BY LIST (ds) as SELECT public.RB_BUILD_AGG(uid), opreate_time, amount, tt, ds, COUNT(1) FROM base_sales GROUP BY opreate_time ,amount,tt,ds;
创建子表,并为子表设置刷新模式。
您可手动创建Dynamic Table分区子表,也可以使用DataWorks数据开发动态创建Dynamic Table分区子表。最新分区创建为增量刷新模式,历史分区设置为全量刷新模式。
-- 为基表打开Binlog ALTER TABLE base_sales SET (binlog_level = replica); -- 假设历史Dynamic Table分区子表如下: CREATE DYNAMIC TABLE partition_dt_base_sales_20240615 PARTITION OF partition_dt_base_sales FOR VALUES IN ('20240615') WITH ( refresh_mode='incremental', auto_refresh_enable='true', full_auto_refresh_schd_start_time='immediate', full_auto_refresh_interval='30 minutes' ); -- 创建新的Dynamic Table分区子表,并指定最新分区的刷新模式为增量刷新,建表成功后立即开始启动刷新,刷新间隔为30分钟,并使用本实例资源刷新。 CREATE DYNAMIC TABLE partition_dt_base_sales_20240616 PARTITION OF partition_dt_base_sales FOR VALUES IN ('20240616') WITH ( refresh_mode='incremental', auto_refresh_enable='true', full_auto_refresh_schd_start_time='immediate', full_auto_refresh_interval='30 minutes' ); --将历史分区转换为全量刷新模式 ALTER DYNAMIC TABLE partition_dt_base_sales_20240615 SET (refresh_mode = 'full'); --如果历史分区需要数据订正,可以执行一次refresh,建议配合serverless来执行。 SET hg_computing_resource = 'serverless'; REFRESH DYNAMIC TABLE partition_dt_base_sales_20240615;
下一步:管理Dynamic Table
Dynamic Table创建成功后,您可以执行如下操作:
查看Dynamic Table DDL、血缘关系等。详情请参见查看Dynamic Table表结构和血缘。
修改Dynamic Table相关属性,详情请参见ALTER DYNAMIC TABLE。
- 本页导读 (1)
- 使用限制
- 语法
- 建表语法
- 参数说明
- 全量刷新
- 增量刷新
- 增量刷新
- 全增量一体刷新
- 使用示例
- 示例1:创建全量刷新Dynamic Table并自动开始执行刷新
- 示例2:创建增量刷新的Dynamic Table并指定开始刷新时间
- 示例3:创建多表JOIN的全量刷新Dynamic Table
- 示例4:创建维表JOIN的增量刷新Dynamic Table
- 示例5:创建分区Dynamic Table
- 下一步:管理Dynamic Table