Create External Dynamic Table

本文将会介绍External Dynamic TableCreate命令。

使用限制

  • Hologres V4.0及以上版本的实例支持创建External Dynamic Table。

  • 仅支持创建Paimon格式的External Dynamic Table。

  • 如果实例是计算组型,External Dynamic Table只能使用Init Warehouse的资源执行刷新,暂不支持使用其他Warehouse资源刷新。

  • 创建External Dynamic Table之前,必须先创建External DatabaseExternal Schema,详见CREATE EXTERNAL DATABASECREATE EXTERNAL SCHEMA

  • 使用增量刷新时,当Source表是Paimon PK表时,该Source表必须在数据导入前已开启Changelog。

语法说明

External Dynamic Table的语法如下:

CREATE EXTERNAL DYNAMIC TABLE {external_db}.{external_schema}.{table_name} 
(
  [column_name1,]
  ...
)
[LOGICAL PARTITION BY LIST(column_name1)]
WITH (
  -- dynamic table相关属性
  freshness = '<num> {minutes | hours}',
  [auto_refresh_enable = {true | false},] -- 非必填
  [auto_refresh_mode = {'full' | 'incremental' | 'auto'},] -- 非必填
 
  -- paimon表相关属性
  ["bucket" = '<num> ',]
  ["bucket-key" = '<column_name1>',]

  -- dynamic table分区表属性
  [auto_refresh_partition_active_time = '<num> {minutes | hours | days}',] -- 非必填
  [partition_key_time_format = {'YYYYMMDDHH24' | 'YYYY-MM-DD-HH24' | 'YYYY-MM-DD_HH24' | 'YYYYMMDD' | 'YYYY-MM-DD' | 'YYYYMM' | 'YYYY-MM' | 'YYYY'},] --非必填

  -- dynamic table 计算资源属性
  [computing_resource = {'local' | 'serverless'},] -- 非必填
  [refresh_guc_hg_experimental_serverless_computing_required_cores=xxx,]--非必填,serverless计算资源规格
) AS 
<query>; -- query的定义

Dynamic Table属性

External Dynamic Table的参数设置同Dynamic Table一致,详情使用可以参考文档CREATE DYNAMIC TABLE。重点参数如下:

说明:

  • 如果External Dynamic Tablebase表是Hologres内表,CREATE EXTERNAL DYNAMIC TABLE时必须在内表所在的DB中执行,不能跨DB,且这些内表必须都是同一个DB中的表。

  • 如果External Dynamic Tablebase表是Paimon表,CREATE EXTERNAL DYNAMIC TABLE必须在非External Database 下执行。

参数说明:

参数名

描述

是否必填

默认值

freshness

数据的新鲜度,单位为minutes | hours,最小值为1mins。详细使用见文档CREATE DYNAMIC TABLE

auto_refresh_mode

刷新模式。取值如下:

  • auto:自动模式。如果Query支持增量刷新,则优先执行增量刷新,否则退化为全量刷新。

  • incremental:增量刷新,每次只刷新增量数据。使用详情请参见增量刷新

  • full:全量刷新,每次都以全量的方式刷新表数据。使用详情请参见全量刷新

auto

auto_refresh_enable

开启或关闭自动刷新。取值为:truefalse。详细使用文档CREATE DYNAMIC TABLE

true

computing_resource

刷新的计算资源。取值为:

  • serverless(默认):默认使用Serverless资源,如果不满足Serverless的执行条件,将会默认退化为Local资源。Serverless使用详情请参见Serverless Computing使用指南

  • local:本实例资源。

说明:如果实例是计算组型实例,External Dynamic Table目前只能使用Init Warehouse的资源执行刷新,暂不支持使用其他Warehouse资源刷新。

serverless

LOGICAL PARTITION BY LIST(<partition_key>)

创建Dynamic Table为逻辑分区表,还需要为分区表设置auto_refresh_partition_active_timepartition_key_time_format两个参数,才能使用。

auto_refresh_partition_active_time

仅在External Dynamic Table为逻辑分区时生效。表示分区的刷新时间范围,取值单位包括minutes | hours | days。详细使用见文档CREATE DYNAMIC TABLE

partition_key_time_format

仅在External Dynamic Table为逻辑分区时生效。表示分区的格式。详细的分区格式见CREATE DYNAMIC TABLE

Paimon属性

创建External Dynamic Table时,会自动创建一张Paimon表,可以根据业务需求给Paimon表设置相关的参数,来实现更好的性能。当前支持Paimon表类型:

  • Merge On Read(默认类型)

  • Copy On Write

  • Merge On Write

  • Merge Read Optimized

支持的完整Paimon参数见文档,重点关注的参数说明如下:

说明:

  • 不支持给External Dynamic Table设置主键。当External Dynamic Table是增量刷新时,引擎会自动推导主键。全量刷新模式不会推导主键。

  • Paimon表的分区键不能等于主键,所以当External Dynamic Table是分区表时,SQL中必须GROUP BY多个Key。

参数说明:

参数名称

描述

示例

默认值

file_format

Paimon表的格式,当前仅支持ORC、Parquet格式。

"file_format" = 'orc'

orc

bucket

BucketPaimon表读写操作的最小单元。定义该属性后,非分区表的所有数据以及分区表每个分区的数据,都会被划分到不同的分桶中,以便同一作业使用多个并发同时读写Paimon表,加快读写效率。详情,请参见Data Distribution

"bucket" = '1'

1

bucket-key

用于指定数据的分桶方式。

"bucket-key"='id'

changelog-producer

Paimon表需要将数据的增删与更新改写为完整的变更数据(类似于数据库的Binlog),才能让下游进行流式消费。通过设置该参数,Paimon将以不同的方式产生变更数据。详情,请参见Changelog Producer

"changelog-producer"='input'

deletion-vectors.enabled

是否开启MergeOnWrite模式。

"deletion-vectors.enabled"='true'

false

使用示例

示例1:读内表增量回写到Paimon

本次示例使用TPCH-100G数据集来演示如何将Hologres的数据,经过External Dynamic Table加工后回写到Paimon。步骤如下:

步骤1:准备一张Hologres内表

Hologres的内部DB准备一张内表,本次示例使用TPCH-100G数据集中的LINEITEM,详细导入见文档一键导入公共数据集

步骤2:创建External DatabaseExternal schema

说明:本次示例使用的是DLF 2.0

需要在Hologres中创建External DatabaseExternal Schema,用于高效读取Paimon数据。

--holo中创建EXTERNAL DATABASE
DROP EXTERNAL DATABASE ext_db_dlf;
CREATE EXTERNAL DATABASE ext_db_dlf
WITH 
  metastore_type 'dlf-rest' 
  catalog_type 'paimon' 
  dlf_catalog '<paimon_catalog_id>';  --修改为自己在DLF中创建的Paimon Catalog的ID

--创建EXTERNAL SCHEMA 
 CREATE EXTERNAL SCHEMA IF NOT EXISTS ext_db_dlf.github_events;

步骤3:创建External Dynamic Table增量刷新至Paimon

创建External Dynamic Table,使用增量刷新模型,对Hologres内表的数据进行加工并自动回写到Paimon。

---在external database上创建一张external dynamic table,source是holo内表
DROP EXTERNAL DYNAMIC TABLE ext_db_dlf.github_events.paimon_dt_lineitem;

CREATE EXTERNAL DYNAMIC TABLE ext_db_dlf.github_events.paimon_dt_lineitem 
WITH 
 (freshness = '5 minutes',
  auto_refresh_mode = 'incremental',
  auto_refresh_enable = 'true',
  refresh_guc_hg_external_table_dml_use_distribution_property = 'off'--有bug,需要先用这个绕过
 ) 
  AS 
  SELECT
        l_returnflag,
        l_linestatus,
        SUM(l_quantity) AS sum_qty,
        SUM(l_extendedprice) AS sum_base_price,
        SUM(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
        SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
        AVG(l_quantity) AS avg_qty,
        AVG(l_extendedprice) AS avg_price,
        AVG(l_discount) AS avg_disc,
        COUNT(*) AS count_order
FROM
        hologres_dataset_tpch_100g.LINEITEM
WHERE
        l_shipdate <= DATE '1998-12-01' - INTERVAL '120' DAY
GROUP BY
        l_returnflag,
        l_linestatus;

External Dynamic Table的刷新执行完成后,可以在Paimon中查看创建表以及相关schema:

image.png

步骤4:查询Paimon表数据

External Dynamic Table加工完成的数据,可以通过external table的方式直接查询。示例sql如下:

SELECT * FROM ext_db_dlf.github_events.paimon_dt_lineitem;

结果输出如下:

 l_returnflag | l_linestatus |    sum_qty    |  sum_base_price   |   sum_disc_price    |      sum_charge       |  avg_qty  |  avg_price   | avg_disc | count_order 
--------------+--------------+---------------+-------------------+---------------------+-----------------------+-----------+--------------+----------+-------------
 A            | F            | 3775127758.00 |  5660776097194.45 |  5377736398183.9374 |  5592847429515.927026 | 25.499370 | 38236.116984 | 0.050002 |   148047881
 N            | F            |   98553062.00 |   147771098385.98 |   140384965965.0348 |   145999793032.775829 | 25.501557 | 38237.199389 | 0.049985 |     3864590
 N            | O            | 7269911583.00 | 10901214476134.28 | 10356163586785.0119 | 10770418891237.393182 | 25.499873 | 38236.997134 | 0.049998 |   285095988
 R            | F            | 3775724970.00 |  5661603032745.34 |  5378513563915.4097 |  5593662252666.916161 | 25.500066 | 38236.697258 | 0.050001 |   148067261
(4 rows)

示例2:读Paimon外表增量写回到Paimon

本示例将会使用GitHub公共事件作为示例数据,介绍如何将Paimon的数据通过External Dynamic Table增量加工至Paimon。

步骤1:准备一张Paimon

Hologres中,通过external table的方式创建一张Paimon表,用于接收Flink实时写入的数据。其DDL如下:

说明:本次示例使用的DLF版本是2.0.

CREATE EXTERNAL TABLE ext_db_dlf.github_events.gh_event_ods(
id TEXT,
created_at INT8,
type TEXT,
actor_id TEXT,
actor_login TEXT,
repo_id TEXT,
repo_name TEXT,
org TEXT,
org_login TEXT) 
WITH (
table_format = 'paimon',
file_format = 'orc',
bucket_keys = 'id',
change_log_producer = 'input');

步骤2:使用FlinkPaimon表实时写入数据。

准备一张Paimon表,并使用Flink写入公共数据集,详情参考OpenLake:Flink+Paimon+Hologres一体化实时湖仓分析,Flink中的示例代码如下:

-- 创建一张临时表来实时摄取sls中的数据
CREATE TEMPORARY TABLE sls_input (
    id STRING, -- 每个事件的唯一ID。
    created_at BIGINT, -- 事件时间,单位秒。
    type STRING, -- GitHub事件类型,如:ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
    actor_id STRING, -- GitHub用户ID。
    actor_login STRING, -- GitHub用户名。
    repo_id STRING, -- GitHub仓库ID。
    repo_name STRING, -- GitHub仓库名,如:apache/flink, apache/spark, alibaba/fastjson等。
    org STRING, -- GitHub组织ID。
    org_login STRING -- GitHub组织名,如: apache,google,alibaba等。
) 
WITH (
    'connector' = 'sls', -- 实时采集的GitHub事件存放在阿里云SLS中
    'project' = 'github-events-hangzhou', -- 存放公开数据的SLS项目。需改为您的实际地域信息
    'endPoint' = 'https://cn-hangzhou-intranet.log.aliyuncs.com', -- 公开数据仅限VVP通过私网地址访问。本示例以杭州为例,您需要修改为您的实际地域信息。
    'logStore' = 'realtime-github-events',           -- 存放公开数据的SLS logStore
    'accessId' =  '<ACCESS_ID>',        -- 访问公开数据集的只读AK    
    'accessKey' = '<ACCESS_KEY>'   -- 访问公开数据集的只读secret
);

-- 将临时表数据写入Hologres中创建的Paimon格式的External Table
INSERT INTO hm_paimon_catalog.github_events.gh_event_ods
SELECT * FROM sls_input 
WHERE id IS NOT NULL 
AND created_at IS NOT NULL 
AND TO_TIMESTAMP(created_at * 1000) >= DATE_ADD(CURRENT_DATE,-1);

步骤3:创建External Dynamic Table

Hologres中,创建External Dynamic Table,设置刷新模式为增量刷新,对源表(Paimon表)进行加工。示例代码如下:

--增量刷新
CREATE EXTERNAL DYNAMIC TABLE ext_db_dlf.github_events.dt_gh_event_ods_incremental
WITH 
 (freshness = '3 minutes',
  auto_refresh_mode = 'incremental',
  auto_refresh_enable = 'true'
 ) 
  AS 
  SELECT
    actor_login,
    COUNT(*) AS events
FROM
    ext_db_dlf.github_events.gh_event_ods
GROUP BY
    actor_login;

External Dynamic Table刷新完成后,可以在Paimon中看到表已经创建成功,表详情如下:

image.png

步骤4:查询Paimon表数据

External Dynamic Table加工完成的数据,可以通过External Table的方式直接查询。示例sql如下:

SELECT * FROM ext_db_dlf.github_events.dt_gh_event_ods_incremental ORDER BY events DESC LIMIT 5;

结果输出如下:

     actor_login     | events 
---------------------+--------
 github-actions[bot] |  38666
 dependabot[bot]     |   3034
 swa-runner-app[bot] |   2908
 pull[bot]           |   2683
 renovate[bot]       |   2602
(5 rows)