CREATE ETL语句用于在流引擎中创建ETL任务。
引擎与版本
CREATE ETL仅适用于流引擎。要求3.1.8及以上版本。
您可以通过控制台查看并升级小版本。
语法
create_etl_statement ::= CREATE ETL [IF NOT EXISTS] etl_name
[WITH etl_properties]
AS INSERT INTO [[catalog_name.]db_name.]table_name column_list
select_statement
etl_properties ::= '(' property_definition (',' property_definition)* ')'
property_definition ::= property_name '=' property_value
column_list ::= '(' column_name (',' column_name)* ')'
使用说明
ETL名称(etl_name)
必填参数。ETL名称的设置需遵循以下规则:
可包含数字、大写英文字符、小写英文字符、半角句号(.)、中划线(-)和下划线(_)。
不能以半角句号(.)或中划线(-)开头。
长度为1~255字符。
ETL属性(etl_properties)
您可以通过WITH
关键字添加以下ETL属性:
设置时,属性名前后需添加反引号(`),属性值前后需添加单引号(')。例如`parallelism` = '2'
。
属性 | 数据类型 | 说明 | 默认值 |
parallelism | INTEGER | 任务并行度。 | 1 |
sink.ignore-update-before | BOOLEAN | Sink时是否忽略-U。 | false |
sink.ignore-delete | BOOLEAN | Sink时是否忽略-D。 | false |
sink.null-mode | STRING | Sink时是否写入Null值,取值如下:
| NO_OP |
udf.xxxx | STRING | 配置UDF,需先上传UDF jar。参数格式如下: | 无 |
stream.xxx | ANY | 流引擎作业参数,例如: | 无 |
指定结果表
参数 | 是否必填 | 说明 |
catalog_name | 否 | 结果表的Catalog。 |
db_name | 否 | 结果表所在数据库。 |
table_name | 是 | 结果表的名称。 |
column_name | 是 | 结果表的列名。 |
SQL查询语句(select_statement)
用于筛选数据的SQL语句,例如 SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;
。
示例
假设宽表引擎中的源表source
和结果表sink
的结构如下:
-- 源表source
CREATE TABLE source(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
-- 结果表1:sink
CREATE TABLE sink(p1 INT, c1 DOUBLE, PRIMARY KEY(p1));
示例一:创建ETL filter1,将源表
source
中符合条件的数据插入到结果表sink
中。CREATE ETL IF NOT EXISTS filter1 AS INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1) SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;
示例二:创建ETL filter2,将源表
source
中符合条件的数据插入到结果表sink
中,同时添加属性。CREATE ETL IF NOT EXISTS filter2 WITH ( `parallelism` = '2', `stream.execution.checkpointing.interval` = '30000' ) AS INSERT INTO `lindorm_table`.`default`.`sink` (p1, c1) SELECT p1, c1 FROM `lindorm_table`.`default`.`source` WHERE c1 > 10;